diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 0bd3eb4..088fa11 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -10,6 +10,7 @@ import ( "crypto/x509/pkix" "encoding/hex" "encoding/json" + "errors" "fmt" "log" "math/big" @@ -414,6 +415,7 @@ type vpnFabricSessionDialStats struct { TransportFailures atomic.Int64 SessionOpenFailures atomic.Int64 StreamOpenFailures atomic.Int64 + CapacityLimited atomic.Int64 AllCandidatesFailed atomic.Int64 QUICSelected atomic.Int64 WebSocketSelected atomic.Int64 @@ -601,6 +603,8 @@ func (s *vpnFabricSessionDialStats) ObserveCandidateFailure(reason string) { s.SessionOpenFailures.Add(1) case "stream_open_failed": s.StreamOpenFailures.Add(1) + case "capacity_limited": + s.CapacityLimited.Add(1) } s.LastFailureReason.Store(strings.TrimSpace(reason)) s.LastFailureUnixSec.Store(time.Now().UTC().Unix()) @@ -653,6 +657,7 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any "transport_failures": s.TransportFailures.Load(), "session_open_failures": s.SessionOpenFailures.Load(), "stream_open_failures": s.StreamOpenFailures.Load(), + "capacity_limited": s.CapacityLimited.Load(), "all_candidates_failed": s.AllCandidatesFailed.Load(), "quic_selected": s.QUICSelected.Load(), "websocket_selected": s.WebSocketSelected.Load(), @@ -5024,9 +5029,12 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st session, err := carrier.Connect(dialCtx, selectedTarget) if err != nil { cancel() - meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("session_open_failed") - meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, "session_open_failed") - log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) + reason := fabricSessionOpenFailureReason(err) + meshState.VPNFabricSessionDialStats.ObserveCandidateFailure(reason) + if reason != "capacity_limited" { + meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, reason) + } + log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=%s error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, reason, err) continue } streamID := uint64(time.Now().UnixNano()) @@ -5065,6 +5073,16 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st return nil } +func fabricSessionOpenFailureReason(err error) string { + if err == nil { + return "" + } + if errors.Is(err, mesh.ErrQUICFabricStreamLimitReached) { + return "capacity_limited" + } + return "session_open_failed" +} + func vpnFabricSessionTarget(meshState *syntheticMeshState, nextHop string) (mesh.FabricTransportTarget, bool) { targets := vpnFabricSessionTargets(meshState, nextHop) if len(targets) == 0 { diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 3e45b6d..cdd818e 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -5,6 +5,7 @@ import ( "crypto/ed25519" "encoding/base64" "encoding/json" + "errors" "fmt" "io" "log" @@ -775,6 +776,7 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { func TestVPNFabricSessionDialStatsReport(t *testing.T) { stats := newVPNFabricSessionDialStats() stats.Attempts.Add(1) + stats.ObserveCandidateFailure("capacity_limited") stats.ObserveCandidateFailure("session_open_failed") stats.ObserveSelected(mesh.FabricTransportTarget{ Endpoint: "quic://node-b.example.test:19443", @@ -785,7 +787,8 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { report := stats.Report(time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)) if report["attempts"] != int64(1) || report["selected"] != int64(1) || - report["candidate_failures"] != int64(1) || + report["candidate_failures"] != int64(2) || + report["capacity_limited"] != int64(1) || report["session_open_failures"] != int64(1) || report["quic_selected"] != int64(1) || report["pinned_cert_selected"] != int64(1) || @@ -796,6 +799,15 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { } } +func TestFabricSessionOpenFailureReasonClassifiesCapacity(t *testing.T) { + if got := fabricSessionOpenFailureReason(mesh.ErrQUICFabricStreamLimitReached); got != "capacity_limited" { + t.Fatalf("failure reason = %q, want capacity_limited", got) + } + if got := fabricSessionOpenFailureReason(errors.New("dial failed")); got != "session_open_failed" { + t.Fatalf("failure reason = %q, want session_open_failed", got) + } +} + func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) { store := newVPNFabricEndpointObservationStore("node-a") base := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go index c788461..f798f50 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -18,6 +18,13 @@ import ( const fabricQUICNextProto = "rap-fabric-data-session-v1" const defaultQUICFabricConnIdleTTL = 5 * time.Minute const defaultQUICFabricMaxStreamsPerConn = 64 +const ErrQUICFabricStreamLimitReached = quicFabricError("quic fabric stream limit reached") + +type quicFabricError string + +func (e quicFabricError) Error() string { + return string(e) +} type QUICFabricTransport struct { Config *quic.Config @@ -231,7 +238,7 @@ func (t *QUICFabricTransport) reserveStream(key string, conn *quic.Conn) error { } if entry.activeStreams >= limit { t.stats.StreamLimitRejects++ - return fmt.Errorf("quic fabric stream limit reached") + return ErrQUICFabricStreamLimitReached } entry.activeStreams++ entry.lastUsed = time.Now() diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 5a4b7f8..c90a201 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -368,6 +368,9 @@ The per-connection QUIC stream limit is configurable through `RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN` / `-vpn-fabric-quic-max-streams-per-conn` and propagated by host-agent install profiles. +QUIC stream-limit rejects are classified as capacity pressure instead of peer +endpoint failure, so local health feedback does not incorrectly demote a healthy +but saturated carrier. Deliverables: