diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 1b2e662..2d0b8fd 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -48,6 +48,7 @@ const ( maxMeshRendezvousLeaseReportEntries = 20 maxVPNFabricEndpointHealthReportEntries = 32 maxVPNFabricEndpointObservationEntries = 256 + maxVPNFabricCapacityCounterEntries = 32 vpnFabricEndpointObservationMaxAge = 30 * time.Minute meshRendezvousLeaseReportSchema = "c17z18.mesh_rendezvous_lease_report.v1" meshRendezvousLeaseTelemetryCapability = "mesh_rendezvous_lease_telemetry" @@ -429,6 +430,16 @@ type vpnFabricSessionDialStats struct { LastSelectedUnixSec atomic.Int64 LastCapacityUnixSec atomic.Int64 LastFailureUnixSec atomic.Int64 + capacityMu sync.Mutex + capacityByEndpoint map[string]vpnFabricCapacityCounter +} + +type vpnFabricCapacityCounter struct { + EndpointID string `json:"endpoint_id,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + Transport string `json:"transport,omitempty"` + Count int64 `json:"count"` + LastSeenUnixSec int64 `json:"last_seen_unix_sec"` } type vpnFabricEndpointObservationStore struct { @@ -438,7 +449,9 @@ type vpnFabricEndpointObservationStore struct { } func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats { - return &vpnFabricSessionDialStats{} + return &vpnFabricSessionDialStats{ + capacityByEndpoint: map[string]vpnFabricCapacityCounter{}, + } } func newVPNFabricEndpointObservationStore(reporterNodeID ...string) *vpnFabricEndpointObservationStore { @@ -637,13 +650,102 @@ func (s *vpnFabricSessionDialStats) ObserveCapacityLimited(target mesh.FabricTra return } s.ObserveCandidateFailure("capacity_limited") - s.LastCapacityEndpoint.Store(strings.TrimSpace(target.Endpoint)) + endpoint := strings.TrimSpace(target.Endpoint) + endpointID := strings.TrimSpace(target.EndpointID) + s.LastCapacityEndpoint.Store(endpoint) transport := strings.TrimSpace(target.Transport) if transport == "" { transport = "legacy_peer_endpoint" } s.LastCapacityTransport.Store(transport) - s.LastCapacityUnixSec.Store(time.Now().UTC().Unix()) + observedAt := time.Now().UTC().Unix() + s.LastCapacityUnixSec.Store(observedAt) + key := endpointID + if key == "" { + key = transport + "|" + endpoint + } + if key == "|" { + key = "unknown" + } + s.capacityMu.Lock() + if s.capacityByEndpoint == nil { + s.capacityByEndpoint = map[string]vpnFabricCapacityCounter{} + } + counter := s.capacityByEndpoint[key] + counter.EndpointID = endpointID + counter.Endpoint = endpoint + counter.Transport = transport + counter.Count++ + counter.LastSeenUnixSec = observedAt + s.capacityByEndpoint[key] = counter + s.pruneCapacityCountersLocked(maxVPNFabricCapacityCounterEntries) + s.capacityMu.Unlock() +} + +func (s *vpnFabricSessionDialStats) pruneCapacityCountersLocked(maxEntries int) { + if s == nil || maxEntries <= 0 || len(s.capacityByEndpoint) <= maxEntries { + return + } + values := make([]vpnFabricCapacityCounter, 0, len(s.capacityByEndpoint)) + for _, counter := range s.capacityByEndpoint { + values = append(values, counter) + } + sort.SliceStable(values, func(i, j int) bool { + if values[i].LastSeenUnixSec != values[j].LastSeenUnixSec { + return values[i].LastSeenUnixSec > values[j].LastSeenUnixSec + } + if values[i].Count != values[j].Count { + return values[i].Count > values[j].Count + } + return values[i].Endpoint < values[j].Endpoint + }) + keep := make(map[string]struct{}, maxEntries) + for _, counter := range values[:maxEntries] { + key := strings.TrimSpace(counter.EndpointID) + if key == "" { + key = strings.TrimSpace(counter.Transport) + "|" + strings.TrimSpace(counter.Endpoint) + } + if key == "|" { + key = "unknown" + } + keep[key] = struct{}{} + } + for key := range s.capacityByEndpoint { + if _, ok := keep[key]; !ok { + delete(s.capacityByEndpoint, key) + } + } +} + +func (s *vpnFabricSessionDialStats) capacityCountersSnapshot(maxEntries int) []vpnFabricCapacityCounter { + if s == nil { + return nil + } + s.capacityMu.Lock() + defer s.capacityMu.Unlock() + if len(s.capacityByEndpoint) == 0 { + return []vpnFabricCapacityCounter{} + } + values := make([]vpnFabricCapacityCounter, 0, len(s.capacityByEndpoint)) + for _, counter := range s.capacityByEndpoint { + values = append(values, counter) + } + sort.SliceStable(values, func(i, j int) bool { + if values[i].LastSeenUnixSec != values[j].LastSeenUnixSec { + return values[i].LastSeenUnixSec > values[j].LastSeenUnixSec + } + if values[i].Count != values[j].Count { + return values[i].Count > values[j].Count + } + if values[i].EndpointID != values[j].EndpointID { + return values[i].EndpointID < values[j].EndpointID + } + return values[i].Endpoint < values[j].Endpoint + }) + if maxEntries <= 0 || maxEntries > len(values) { + maxEntries = len(values) + } + return values[:maxEntries] } func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() { @@ -702,6 +804,7 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any "last_selected_unix_sec": s.LastSelectedUnixSec.Load(), "last_capacity_unix_sec": s.LastCapacityUnixSec.Load(), "last_failure_unix_sec": s.LastFailureUnixSec.Load(), + "capacity_pressure": s.capacityCountersSnapshot(maxVPNFabricCapacityCounterEntries), } if value, ok := s.LastTransport.Load().(string); ok && value != "" { report["last_transport"] = value diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 9c92c73..bb5ed0f 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -777,8 +777,14 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { stats := newVPNFabricSessionDialStats() stats.Attempts.Add(1) stats.ObserveCapacityLimited(mesh.FabricTransportTarget{ - Endpoint: "quic://node-b.example.test:19443", - Transport: "direct_quic", + EndpointID: "node-b-quic", + Endpoint: "quic://node-b.example.test:19443", + Transport: "direct_quic", + }) + stats.ObserveCapacityLimited(mesh.FabricTransportTarget{ + EndpointID: "node-b-quic", + Endpoint: "quic://node-b.example.test:19443", + Transport: "direct_quic", }) stats.ObserveCandidateFailure("session_open_failed") stats.ObserveSelected(mesh.FabricTransportTarget{ @@ -790,8 +796,8 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { report := stats.Report(time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)) if report["attempts"] != int64(1) || report["selected"] != int64(1) || - report["candidate_failures"] != int64(2) || - report["capacity_limited"] != int64(1) || + report["candidate_failures"] != int64(3) || + report["capacity_limited"] != int64(2) || report["session_open_failures"] != int64(1) || report["quic_selected"] != int64(1) || report["pinned_cert_selected"] != int64(1) || @@ -802,6 +808,17 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { report["last_failure_reason"] != "session_open_failed" { t.Fatalf("unexpected dial stats report: %+v", report) } + capacityPressure, ok := report["capacity_pressure"].([]vpnFabricCapacityCounter) + if !ok || len(capacityPressure) != 1 { + t.Fatalf("capacity pressure counters missing: %+v", report["capacity_pressure"]) + } + if capacityPressure[0].EndpointID != "node-b-quic" || + capacityPressure[0].Endpoint != "quic://node-b.example.test:19443" || + capacityPressure[0].Transport != "direct_quic" || + capacityPressure[0].Count != 2 || + capacityPressure[0].LastSeenUnixSec <= 0 { + t.Fatalf("unexpected capacity pressure counter: %+v", capacityPressure[0]) + } } func TestFabricSessionOpenFailureReasonClassifiesCapacity(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 7ff927c..76e918b 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -374,6 +374,9 @@ but saturated carrier. VPN fabric dial telemetry records the last capacity-limited endpoint and transport, making stream saturation visible without poisoning endpoint health observations. +The same dial telemetry now keeps bounded per-endpoint capacity-pressure +counters, so operators can see whether stream saturation is occasional or +concentrated on a specific QUIC carrier. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.