From d67b80a7aa6d761576d743984a6e3b15305fd03d Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 11:54:05 +0300 Subject: [PATCH] Report QUIC fabric capacity pressure --- .../rap-node-agent/cmd/rap-node-agent/main.go | 64 +++++++++++++------ .../cmd/rap-node-agent/main_test.go | 7 +- .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index fafd006..aa3b05c 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -409,23 +409,26 @@ type fabricServiceChannelAccessStats struct { } type vpnFabricSessionDialStats struct { - Attempts atomic.Int64 - Selected atomic.Int64 - CandidateFailures atomic.Int64 - TransportFailures atomic.Int64 - SessionOpenFailures atomic.Int64 - StreamOpenFailures atomic.Int64 - CapacityLimited atomic.Int64 - AllCandidatesFailed atomic.Int64 - QUICSelected atomic.Int64 - WebSocketSelected atomic.Int64 - LegacySelected atomic.Int64 - PinnedCertSelected atomic.Int64 - LastTransport atomic.Value - LastEndpoint atomic.Value - LastFailureReason atomic.Value - LastSelectedUnixSec atomic.Int64 - LastFailureUnixSec atomic.Int64 + Attempts atomic.Int64 + Selected atomic.Int64 + CandidateFailures atomic.Int64 + TransportFailures atomic.Int64 + SessionOpenFailures atomic.Int64 + StreamOpenFailures atomic.Int64 + CapacityLimited atomic.Int64 + AllCandidatesFailed atomic.Int64 + QUICSelected atomic.Int64 + WebSocketSelected atomic.Int64 + LegacySelected atomic.Int64 + PinnedCertSelected atomic.Int64 + LastTransport atomic.Value + LastEndpoint atomic.Value + LastCapacityEndpoint atomic.Value + LastCapacityTransport atomic.Value + LastFailureReason atomic.Value + LastSelectedUnixSec atomic.Int64 + LastCapacityUnixSec atomic.Int64 + LastFailureUnixSec atomic.Int64 } type vpnFabricEndpointObservationStore struct { @@ -610,6 +613,20 @@ func (s *vpnFabricSessionDialStats) ObserveCandidateFailure(reason string) { s.LastFailureUnixSec.Store(time.Now().UTC().Unix()) } +func (s *vpnFabricSessionDialStats) ObserveCapacityLimited(target mesh.FabricTransportTarget) { + if s == nil { + return + } + s.ObserveCandidateFailure("capacity_limited") + s.LastCapacityEndpoint.Store(strings.TrimSpace(target.Endpoint)) + transport := strings.TrimSpace(target.Transport) + if transport == "" { + transport = "legacy_peer_endpoint" + } + s.LastCapacityTransport.Store(transport) + s.LastCapacityUnixSec.Store(time.Now().UTC().Unix()) +} + func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() { if s == nil { return @@ -664,6 +681,7 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any "legacy_selected": s.LegacySelected.Load(), "pinned_cert_selected": s.PinnedCertSelected.Load(), "last_selected_unix_sec": s.LastSelectedUnixSec.Load(), + "last_capacity_unix_sec": s.LastCapacityUnixSec.Load(), "last_failure_unix_sec": s.LastFailureUnixSec.Load(), } if value, ok := s.LastTransport.Load().(string); ok && value != "" { @@ -672,6 +690,12 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any if value, ok := s.LastEndpoint.Load().(string); ok && value != "" { report["last_endpoint"] = value } + if value, ok := s.LastCapacityEndpoint.Load().(string); ok && value != "" { + report["last_capacity_endpoint"] = value + } + if value, ok := s.LastCapacityTransport.Load().(string); ok && value != "" { + report["last_capacity_transport"] = value + } if value, ok := s.LastFailureReason.Load().(string); ok && value != "" { report["last_failure_reason"] = value } @@ -5037,8 +5061,10 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if err != nil { cancel() reason := fabricSessionOpenFailureReason(err) - meshState.VPNFabricSessionDialStats.ObserveCandidateFailure(reason) - if reason != "capacity_limited" { + if reason == "capacity_limited" { + meshState.VPNFabricSessionDialStats.ObserveCapacityLimited(selectedTarget) + } else { + meshState.VPNFabricSessionDialStats.ObserveCandidateFailure(reason) meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, reason) } log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=%s error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, reason, err) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index cdd818e..10418cb 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -776,7 +776,10 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { func TestVPNFabricSessionDialStatsReport(t *testing.T) { stats := newVPNFabricSessionDialStats() stats.Attempts.Add(1) - stats.ObserveCandidateFailure("capacity_limited") + stats.ObserveCapacityLimited(mesh.FabricTransportTarget{ + Endpoint: "quic://node-b.example.test:19443", + Transport: "direct_quic", + }) stats.ObserveCandidateFailure("session_open_failed") stats.ObserveSelected(mesh.FabricTransportTarget{ Endpoint: "quic://node-b.example.test:19443", @@ -794,6 +797,8 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { report["pinned_cert_selected"] != int64(1) || report["last_transport"] != "direct_quic" || report["last_endpoint"] != "quic://node-b.example.test:19443" || + report["last_capacity_endpoint"] != "quic://node-b.example.test:19443" || + report["last_capacity_transport"] != "direct_quic" || report["last_failure_reason"] != "session_open_failed" { t.Fatalf("unexpected dial stats report: %+v", report) } diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 3824733..3d9cc6f 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -371,6 +371,9 @@ profiles. QUIC stream-limit rejects are classified as capacity pressure instead of peer endpoint failure, so local health feedback does not incorrectly demote a healthy but saturated carrier. +VPN fabric dial telemetry records the last capacity-limited endpoint and +transport, making stream saturation visible without poisoning endpoint health +observations. Cached QUIC carrier idle TTL is configurable through `RAP_VPN_FABRIC_QUIC_IDLE_TTL_SECONDS` / `-vpn-fabric-quic-idle-ttl` and propagated by host-agent install profiles.