Report fabric peer session telemetry

This commit is contained in:
2026-05-16 09:57:06 +03:00
parent 057e3b65a7
commit 58c810d2e8
5 changed files with 73 additions and 3 deletions
@@ -2492,7 +2492,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
payload.Capabilities["fabric_data_session_v1"] = true payload.Capabilities["fabric_data_session_v1"] = true
} }
if cfg.VPNFabricSessionTransportEnabled { if cfg.VPNFabricSessionTransportEnabled {
payload.Metadata["vpn_fabric_session_transport_report"] = map[string]any{ report := map[string]any{
"schema_version": "rap.vpn_fabric_session_transport_report.v1", "schema_version": "rap.vpn_fabric_session_transport_report.v1",
"enabled": true, "enabled": true,
"transport": "fabric_session_websocket_binary_frames", "transport": "fabric_session_websocket_binary_frames",
@@ -2500,6 +2500,10 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
"gated": true, "gated": true,
"observed_at": observedAt.UTC().Format(time.RFC3339Nano), "observed_at": observedAt.UTC().Format(time.RFC3339Nano),
} }
if meshState != nil && meshState.VPNFabricSessionPeers != nil {
report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot()
}
payload.Metadata["vpn_fabric_session_transport_report"] = report
payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_fabric_session_transport"] = true
payload.Capabilities["vpn_packet_batch_binary_frames"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true
} }
@@ -709,7 +709,9 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
}, state.Identity{ }, state.Identity{
ClusterID: "cluster-1", ClusterID: "cluster-1",
NodeID: "node-a", NodeID: "node-a",
}, nil, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)) }, &syntheticMeshState{
VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(),
}, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC))
report, ok := payload.Metadata["mesh_endpoint_report"].(map[string]any) report, ok := payload.Metadata["mesh_endpoint_report"].(map[string]any)
if !ok { if !ok {
@@ -733,7 +735,9 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
if payload.Capabilities["vpn_fabric_session_transport"] != true || payload.Capabilities["vpn_packet_batch_binary_frames"] != true { if payload.Capabilities["vpn_fabric_session_transport"] != true || payload.Capabilities["vpn_packet_batch_binary_frames"] != true {
t.Fatalf("vpn fabric session capabilities missing: %+v", payload.Capabilities) t.Fatalf("vpn fabric session capabilities missing: %+v", payload.Capabilities)
} }
if report, ok := payload.Metadata["vpn_fabric_session_transport_report"].(map[string]any); !ok || report["packet_payload"] != "rap.vpn_packet_batch.fabric.v1" { if report, ok := payload.Metadata["vpn_fabric_session_transport_report"].(map[string]any); !ok ||
report["packet_payload"] != "rap.vpn_packet_batch.fabric.v1" ||
report["peer_sessions"] == nil {
t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata) t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata)
} }
} }
@@ -10,6 +10,7 @@ import (
type FabricSessionPeerManager struct { type FabricSessionPeerManager struct {
mu sync.Mutex mu sync.Mutex
sessions map[string]*FabricSessionPump sessions map[string]*FabricSessionPump
stats FabricSessionPeerManagerStats
} }
type FabricSessionPeerTarget struct { type FabricSessionPeerTarget struct {
@@ -19,6 +20,21 @@ type FabricSessionPeerTarget struct {
Pump FabricSessionPumpOptions Pump FabricSessionPumpOptions
} }
type FabricSessionPeerManagerStats struct {
Opens uint64 `json:"opens"`
Reuses uint64 `json:"reuses"`
ClosedEvicted uint64 `json:"closed_evicted"`
ClosePeerCalls uint64 `json:"close_peer_calls"`
CloseAllCalls uint64 `json:"close_all_calls"`
}
type FabricSessionPeerManagerSnapshot struct {
SchemaVersion string `json:"schema_version"`
ActiveCount int `json:"active_count"`
ClosedCount int `json:"closed_count"`
Stats FabricSessionPeerManagerStats `json:"stats"`
}
func NewFabricSessionPeerManager() *FabricSessionPeerManager { func NewFabricSessionPeerManager() *FabricSessionPeerManager {
return &FabricSessionPeerManager{ return &FabricSessionPeerManager{
sessions: map[string]*FabricSessionPump{}, sessions: map[string]*FabricSessionPump{},
@@ -37,7 +53,9 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession
if pump := m.sessions[key]; pump != nil { if pump := m.sessions[key]; pump != nil {
if pump.Closed() { if pump.Closed() {
delete(m.sessions, key) delete(m.sessions, key)
m.stats.ClosedEvicted++
} else { } else {
m.stats.Reuses++
m.mu.Unlock() m.mu.Unlock()
return pump, nil return pump, nil
} }
@@ -54,7 +72,9 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession
if existing := m.sessions[key]; existing != nil { if existing := m.sessions[key]; existing != nil {
if existing.Closed() { if existing.Closed() {
delete(m.sessions, key) delete(m.sessions, key)
m.stats.ClosedEvicted++
} else { } else {
m.stats.Reuses++
m.mu.Unlock() m.mu.Unlock()
_ = pump.Close() _ = pump.Close()
return existing, nil return existing, nil
@@ -64,6 +84,7 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession
m.sessions = map[string]*FabricSessionPump{} m.sessions = map[string]*FabricSessionPump{}
} }
m.sessions[key] = pump m.sessions[key] = pump
m.stats.Opens++
m.mu.Unlock() m.mu.Unlock()
return pump, nil return pump, nil
} }
@@ -77,6 +98,7 @@ func (m *FabricSessionPeerManager) ClosePeer(target FabricSessionPeerTarget) err
return err return err
} }
m.mu.Lock() m.mu.Lock()
m.stats.ClosePeerCalls++
pump := m.sessions[key] pump := m.sessions[key]
delete(m.sessions, key) delete(m.sessions, key)
m.mu.Unlock() m.mu.Unlock()
@@ -91,6 +113,7 @@ func (m *FabricSessionPeerManager) Close() error {
return nil return nil
} }
m.mu.Lock() m.mu.Lock()
m.stats.CloseAllCalls++
sessions := m.sessions sessions := m.sessions
m.sessions = map[string]*FabricSessionPump{} m.sessions = map[string]*FabricSessionPump{}
m.mu.Unlock() m.mu.Unlock()
@@ -103,6 +126,26 @@ func (m *FabricSessionPeerManager) Close() error {
return firstErr return firstErr
} }
func (m *FabricSessionPeerManager) Snapshot() FabricSessionPeerManagerSnapshot {
if m == nil {
return FabricSessionPeerManagerSnapshot{SchemaVersion: "rap.fabric_session_peer_manager.v1"}
}
m.mu.Lock()
defer m.mu.Unlock()
snapshot := FabricSessionPeerManagerSnapshot{
SchemaVersion: "rap.fabric_session_peer_manager.v1",
Stats: m.stats,
}
for _, pump := range m.sessions {
if pump == nil || pump.Closed() {
snapshot.ClosedCount++
continue
}
snapshot.ActiveCount++
}
return snapshot
}
func fabricSessionPeerKey(target FabricSessionPeerTarget) (string, error) { func fabricSessionPeerKey(target FabricSessionPeerTarget) (string, error) {
peerID := strings.TrimSpace(target.PeerID) peerID := strings.TrimSpace(target.PeerID)
baseURL := strings.TrimRight(strings.TrimSpace(target.BaseURL), "/") baseURL := strings.TrimRight(strings.TrimSpace(target.BaseURL), "/")
@@ -53,6 +53,14 @@ func TestFabricSessionPeerManagerReusesPeerPump(t *testing.T) {
if opened != 1 { if opened != 1 {
t.Fatalf("opened sessions = %d, want 1", opened) t.Fatalf("opened sessions = %d, want 1", opened)
} }
snapshot := manager.Snapshot()
if snapshot.SchemaVersion != "rap.fabric_session_peer_manager.v1" ||
snapshot.ActiveCount != 1 ||
snapshot.ClosedCount != 0 ||
snapshot.Stats.Opens != 1 ||
snapshot.Stats.Reuses != 1 {
t.Fatalf("snapshot = %+v", snapshot)
}
if err := first.Send(ctx, fabricproto.Frame{ if err := first.Send(ctx, fabricproto.Frame{
Type: fabricproto.FramePing, Type: fabricproto.FramePing,
Sequence: 1, Sequence: 1,
@@ -115,6 +123,9 @@ func TestFabricSessionPeerManagerClosePeerReopens(t *testing.T) {
if opened != 2 { if opened != 2 {
t.Fatalf("opened sessions = %d, want 2", opened) t.Fatalf("opened sessions = %d, want 2", opened)
} }
if snapshot := manager.Snapshot(); snapshot.Stats.ClosePeerCalls != 1 || snapshot.Stats.Opens != 2 {
t.Fatalf("snapshot = %+v", snapshot)
}
} }
func TestFabricSessionPeerManagerReopensClosedPump(t *testing.T) { func TestFabricSessionPeerManagerReopensClosedPump(t *testing.T) {
@@ -163,6 +174,12 @@ func TestFabricSessionPeerManagerReopensClosedPump(t *testing.T) {
if opened != 2 { if opened != 2 {
t.Fatalf("opened sessions = %d, want 2", opened) t.Fatalf("opened sessions = %d, want 2", opened)
} }
snapshot := manager.Snapshot()
if snapshot.ActiveCount != 1 ||
snapshot.Stats.Opens != 2 ||
snapshot.Stats.ClosedEvicted != 1 {
t.Fatalf("snapshot = %+v", snapshot)
}
} }
func TestFabricSessionPeerManagerRejectsIncompleteTarget(t *testing.T) { func TestFabricSessionPeerManagerRejectsIncompleteTarget(t *testing.T) {
@@ -289,6 +289,8 @@ long-lived peer session for gateway packet transport and falls back to the
existing HTTP production envelope path when the peer session is unavailable. existing HTTP production envelope path when the peer session is unavailable.
Peer session reuse now evicts closed pumps before reuse, so failed WebSocket Peer session reuse now evicts closed pumps before reuse, so failed WebSocket
sessions can be reopened on the next transport acquisition. sessions can be reopened on the next transport acquisition.
Heartbeat telemetry includes peer session manager counters for active sessions,
reuses, opens, closed-pump evictions, and explicit close operations.
Deliverables: Deliverables: