From 58c810d2e83934fd337cb6625468e7419b469149 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 09:57:06 +0300 Subject: [PATCH] Report fabric peer session telemetry --- .../rap-node-agent/cmd/rap-node-agent/main.go | 6 ++- .../cmd/rap-node-agent/main_test.go | 8 +++- .../internal/mesh/fabric_session_manager.go | 43 +++++++++++++++++++ .../mesh/fabric_session_manager_test.go | 17 ++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 + 5 files changed, 73 insertions(+), 3 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 950d996..ba84f2f 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -2492,7 +2492,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn payload.Capabilities["fabric_data_session_v1"] = true } if cfg.VPNFabricSessionTransportEnabled { - payload.Metadata["vpn_fabric_session_transport_report"] = map[string]any{ + report := map[string]any{ "schema_version": "rap.vpn_fabric_session_transport_report.v1", "enabled": true, "transport": "fabric_session_websocket_binary_frames", @@ -2500,6 +2500,10 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn "gated": true, "observed_at": observedAt.UTC().Format(time.RFC3339Nano), } + if meshState != nil && meshState.VPNFabricSessionPeers != nil { + report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot() + } + payload.Metadata["vpn_fabric_session_transport_report"] = report payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true } diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index e8b3182..544e516 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -709,7 +709,9 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { }, state.Identity{ ClusterID: "cluster-1", NodeID: "node-a", - }, nil, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)) + }, &syntheticMeshState{ + VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(), + }, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)) report, ok := payload.Metadata["mesh_endpoint_report"].(map[string]any) if !ok { @@ -733,7 +735,9 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { if payload.Capabilities["vpn_fabric_session_transport"] != true || payload.Capabilities["vpn_packet_batch_binary_frames"] != true { t.Fatalf("vpn fabric session capabilities missing: %+v", payload.Capabilities) } - if report, ok := payload.Metadata["vpn_fabric_session_transport_report"].(map[string]any); !ok || report["packet_payload"] != "rap.vpn_packet_batch.fabric.v1" { + if report, ok := payload.Metadata["vpn_fabric_session_transport_report"].(map[string]any); !ok || + report["packet_payload"] != "rap.vpn_packet_batch.fabric.v1" || + report["peer_sessions"] == nil { t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata) } } diff --git a/agents/rap-node-agent/internal/mesh/fabric_session_manager.go b/agents/rap-node-agent/internal/mesh/fabric_session_manager.go index 4b83a82..177f063 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_session_manager.go +++ b/agents/rap-node-agent/internal/mesh/fabric_session_manager.go @@ -10,6 +10,7 @@ import ( type FabricSessionPeerManager struct { mu sync.Mutex sessions map[string]*FabricSessionPump + stats FabricSessionPeerManagerStats } type FabricSessionPeerTarget struct { @@ -19,6 +20,21 @@ type FabricSessionPeerTarget struct { Pump FabricSessionPumpOptions } +type FabricSessionPeerManagerStats struct { + Opens uint64 `json:"opens"` + Reuses uint64 `json:"reuses"` + ClosedEvicted uint64 `json:"closed_evicted"` + ClosePeerCalls uint64 `json:"close_peer_calls"` + CloseAllCalls uint64 `json:"close_all_calls"` +} + +type FabricSessionPeerManagerSnapshot struct { + SchemaVersion string `json:"schema_version"` + ActiveCount int `json:"active_count"` + ClosedCount int `json:"closed_count"` + Stats FabricSessionPeerManagerStats `json:"stats"` +} + func NewFabricSessionPeerManager() *FabricSessionPeerManager { return &FabricSessionPeerManager{ sessions: map[string]*FabricSessionPump{}, @@ -37,7 +53,9 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession if pump := m.sessions[key]; pump != nil { if pump.Closed() { delete(m.sessions, key) + m.stats.ClosedEvicted++ } else { + m.stats.Reuses++ m.mu.Unlock() return pump, nil } @@ -54,7 +72,9 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession if existing := m.sessions[key]; existing != nil { if existing.Closed() { delete(m.sessions, key) + m.stats.ClosedEvicted++ } else { + m.stats.Reuses++ m.mu.Unlock() _ = pump.Close() return existing, nil @@ -64,6 +84,7 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession m.sessions = map[string]*FabricSessionPump{} } m.sessions[key] = pump + m.stats.Opens++ m.mu.Unlock() return pump, nil } @@ -77,6 +98,7 @@ func (m *FabricSessionPeerManager) ClosePeer(target FabricSessionPeerTarget) err return err } m.mu.Lock() + m.stats.ClosePeerCalls++ pump := m.sessions[key] delete(m.sessions, key) m.mu.Unlock() @@ -91,6 +113,7 @@ func (m *FabricSessionPeerManager) Close() error { return nil } m.mu.Lock() + m.stats.CloseAllCalls++ sessions := m.sessions m.sessions = map[string]*FabricSessionPump{} m.mu.Unlock() @@ -103,6 +126,26 @@ func (m *FabricSessionPeerManager) Close() error { return firstErr } +func (m *FabricSessionPeerManager) Snapshot() FabricSessionPeerManagerSnapshot { + if m == nil { + return FabricSessionPeerManagerSnapshot{SchemaVersion: "rap.fabric_session_peer_manager.v1"} + } + m.mu.Lock() + defer m.mu.Unlock() + snapshot := FabricSessionPeerManagerSnapshot{ + SchemaVersion: "rap.fabric_session_peer_manager.v1", + Stats: m.stats, + } + for _, pump := range m.sessions { + if pump == nil || pump.Closed() { + snapshot.ClosedCount++ + continue + } + snapshot.ActiveCount++ + } + return snapshot +} + func fabricSessionPeerKey(target FabricSessionPeerTarget) (string, error) { peerID := strings.TrimSpace(target.PeerID) baseURL := strings.TrimRight(strings.TrimSpace(target.BaseURL), "/") diff --git a/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go b/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go index 9848a67..0ec399e 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_session_manager_test.go @@ -53,6 +53,14 @@ func TestFabricSessionPeerManagerReusesPeerPump(t *testing.T) { if opened != 1 { t.Fatalf("opened sessions = %d, want 1", opened) } + snapshot := manager.Snapshot() + if snapshot.SchemaVersion != "rap.fabric_session_peer_manager.v1" || + snapshot.ActiveCount != 1 || + snapshot.ClosedCount != 0 || + snapshot.Stats.Opens != 1 || + snapshot.Stats.Reuses != 1 { + t.Fatalf("snapshot = %+v", snapshot) + } if err := first.Send(ctx, fabricproto.Frame{ Type: fabricproto.FramePing, Sequence: 1, @@ -115,6 +123,9 @@ func TestFabricSessionPeerManagerClosePeerReopens(t *testing.T) { if opened != 2 { t.Fatalf("opened sessions = %d, want 2", opened) } + if snapshot := manager.Snapshot(); snapshot.Stats.ClosePeerCalls != 1 || snapshot.Stats.Opens != 2 { + t.Fatalf("snapshot = %+v", snapshot) + } } func TestFabricSessionPeerManagerReopensClosedPump(t *testing.T) { @@ -163,6 +174,12 @@ func TestFabricSessionPeerManagerReopensClosedPump(t *testing.T) { if opened != 2 { t.Fatalf("opened sessions = %d, want 2", opened) } + snapshot := manager.Snapshot() + if snapshot.ActiveCount != 1 || + snapshot.Stats.Opens != 2 || + snapshot.Stats.ClosedEvicted != 1 { + t.Fatalf("snapshot = %+v", snapshot) + } } func TestFabricSessionPeerManagerRejectsIncompleteTarget(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 70cd011..354d47e 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -289,6 +289,8 @@ long-lived peer session for gateway packet transport and falls back to the existing HTTP production envelope path when the peer session is unavailable. Peer session reuse now evicts closed pumps before reuse, so failed WebSocket sessions can be reopened on the next transport acquisition. +Heartbeat telemetry includes peer session manager counters for active sessions, +reuses, opens, closed-pump evictions, and explicit close operations. Deliverables: