From ebdae833fcd0cc4a1ec9572d5f7bf2ce623899b9 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 13:00:24 +0300 Subject: [PATCH] Summarize QUIC pressure in heartbeat --- .../rap-node-agent/cmd/rap-node-agent/main.go | 49 +++++++++++++++++++ .../cmd/rap-node-agent/main_test.go | 40 +++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 ++ 3 files changed, 92 insertions(+) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index f5a0773..dc78edf 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -49,6 +49,7 @@ const ( maxVPNFabricEndpointHealthReportEntries = 32 maxVPNFabricEndpointObservationEntries = 256 maxVPNFabricCapacityCounterEntries = 32 + maxVPNFabricQUICPressureReportEntries = 16 vpnFabricEndpointObservationMaxAge = 30 * time.Minute meshRendezvousLeaseReportSchema = "c17z18.mesh_rendezvous_lease_report.v1" meshRendezvousLeaseTelemetryCapability = "mesh_rendezvous_lease_telemetry" @@ -442,6 +443,16 @@ type vpnFabricCapacityCounter struct { LastSeenUnixSec int64 `json:"last_seen_unix_sec"` } +type vpnFabricQUICPressureEntry struct { + PeerID string `json:"peer_id,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + ActiveStreams int `json:"active_streams"` + MaxStreams int `json:"max_streams"` + CapacityPressurePercent int `json:"capacity_pressure_percent"` + Saturated bool `json:"saturated,omitempty"` + LastUsedUnixSec int64 `json:"last_used_unix_sec,omitempty"` +} + type vpnFabricEndpointObservationStore struct { reporterNodeID string mu sync.Mutex @@ -855,6 +866,43 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any return report } +func vpnFabricQUICPressureReport(snapshot mesh.QUICFabricTransportSnapshot, maxEntries int) []vpnFabricQUICPressureEntry { + if len(snapshot.Connections) == 0 { + return []vpnFabricQUICPressureEntry{} + } + entries := make([]vpnFabricQUICPressureEntry, 0, len(snapshot.Connections)) + for _, conn := range snapshot.Connections { + if conn.CapacityPressurePercent <= 0 && conn.ActiveStreams <= 0 && !conn.Saturated { + continue + } + entries = append(entries, vpnFabricQUICPressureEntry{ + PeerID: conn.PeerID, + Endpoint: conn.Endpoint, + ActiveStreams: conn.ActiveStreams, + MaxStreams: conn.MaxStreams, + CapacityPressurePercent: conn.CapacityPressurePercent, + Saturated: conn.Saturated, + LastUsedUnixSec: conn.LastUsedUnixSec, + }) + } + sort.SliceStable(entries, func(i, j int) bool { + if entries[i].CapacityPressurePercent != entries[j].CapacityPressurePercent { + return entries[i].CapacityPressurePercent > entries[j].CapacityPressurePercent + } + if entries[i].ActiveStreams != entries[j].ActiveStreams { + return entries[i].ActiveStreams > entries[j].ActiveStreams + } + if entries[i].PeerID != entries[j].PeerID { + return entries[i].PeerID < entries[j].PeerID + } + return entries[i].Endpoint < entries[j].Endpoint + }) + if maxEntries <= 0 || maxEntries > len(entries) { + maxEntries = len(entries) + } + return entries[:maxEntries] +} + func newFabricServiceChannelAccessStats() *fabricServiceChannelAccessStats { return &fabricServiceChannelAccessStats{} } @@ -3170,6 +3218,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn if meshState != nil && meshState.VPNFabricQUICTransport != nil { quicSnapshot := meshState.VPNFabricQUICTransport.Snapshot() report["quic_sessions"] = quicSnapshot + report["quic_capacity_pressure"] = vpnFabricQUICPressureReport(quicSnapshot, maxVPNFabricQUICPressureReportEntries) report["quic_max_streams_per_conn"] = meshState.VPNFabricQUICTransport.MaxStreamsPerConn report["quic_idle_ttl_seconds"] = int(meshState.VPNFabricQUICTransport.IdleTTL.Seconds()) } diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 3a6eb7b..6cdd6b6 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -765,6 +765,8 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata) } else if report["quic_sessions"] == nil || report["quic_max_streams_per_conn"] != 24 { t.Fatalf("vpn fabric quic session report missing: %+v", report) + } else if report["quic_capacity_pressure"] == nil { + t.Fatalf("vpn fabric quic pressure report missing: %+v", report) } if payload.Capabilities["vpn_fabric_session_stream_shards"] != true { t.Fatalf("vpn fabric stream shard capability missing: %+v", payload.Capabilities) @@ -1185,6 +1187,44 @@ func TestMergeEndpointCapacityPressureKeepsStrongerSignal(t *testing.T) { } } +func TestVPNFabricQUICPressureReportRanksBusyConnections(t *testing.T) { + report := vpnFabricQUICPressureReport(mesh.QUICFabricTransportSnapshot{ + Connections: []mesh.QUICFabricConnSnapshot{ + { + PeerID: "node-c", + Endpoint: "node-c.example.test:19443", + ActiveStreams: 1, + MaxStreams: 10, + CapacityPressurePercent: 10, + }, + { + PeerID: "node-b", + Endpoint: "node-b.example.test:19443", + ActiveStreams: 9, + MaxStreams: 10, + CapacityPressurePercent: 90, + Saturated: true, + LastUsedUnixSec: 100, + }, + { + PeerID: "idle", + Endpoint: "idle.example.test:19443", + ActiveStreams: 0, + MaxStreams: 10, + }, + }, + }, 1) + if len(report) != 1 { + t.Fatalf("report count = %d, want 1: %+v", len(report), report) + } + if report[0].PeerID != "node-b" || + report[0].CapacityPressurePercent != 90 || + !report[0].Saturated || + report[0].LastUsedUnixSec != 100 { + t.Fatalf("unexpected pressure report: %+v", report[0]) + } +} + func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { now := time.Now().UTC() merged := mergedEndpointCandidateObservations( diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index d83e204..b5e8c81 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -432,6 +432,9 @@ heartbeats and diagnostics stay stable across reports. When local live QUIC pressure and recent capacity-limit counters overlap, the ranking input keeps the stronger pressure signal rather than allowing a weak fresh sample to hide a saturated endpoint. +Heartbeat VPN fabric reports now include a bounded `quic_capacity_pressure` +summary sorted by busiest cached QUIC connection, making overload diagnosis +visible without digging through the full carrier snapshot. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.