From c64531d70c739edbe58195116cf0723017555b02 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:54:51 +0300 Subject: [PATCH] Use live QUIC pressure for endpoint ranking --- .../rap-node-agent/cmd/rap-node-agent/main.go | 84 +++++++++++++++++++ .../cmd/rap-node-agent/main_test.go | 35 ++++++++ .../internal/mesh/fabric_quic_transport.go | 42 ++++++++++ .../mesh/fabric_quic_transport_test.go | 12 +++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 4 + 5 files changed, 177 insertions(+) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 4c30cfc..8d88258 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -5333,6 +5333,10 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me if meshState.VPNFabricSessionDialStats != nil { capacityPressure = meshState.VPNFabricSessionDialStats.capacityPressureForScoring(2 * time.Minute) } + capacityPressure = mergeEndpointCapacityPressure( + capacityPressure, + quicEndpointCapacityPressureForScoring(candidates, meshState.VPNFabricQUICTransport, time.Now().UTC()), + ) ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{ ChannelClass: mesh.SyntheticChannelFabricControl, Now: time.Now().UTC(), @@ -5370,6 +5374,86 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me return out } +func quicEndpointCapacityPressureForScoring(candidates []mesh.PeerEndpointCandidate, transport *mesh.QUICFabricTransport, now time.Time) map[string]mesh.EndpointCandidateCapacityPressure { + if len(candidates) == 0 || transport == nil { + return nil + } + return quicEndpointCapacityPressureForScoringFromSnapshot(candidates, transport.Snapshot(), now) +} + +func quicEndpointCapacityPressureForScoringFromSnapshot(candidates []mesh.PeerEndpointCandidate, snapshot mesh.QUICFabricTransportSnapshot, now time.Time) map[string]mesh.EndpointCandidateCapacityPressure { + if len(candidates) == 0 { + return nil + } + if len(snapshot.Connections) == 0 { + return nil + } + pressureByEndpoint := map[string]mesh.QUICFabricConnSnapshot{} + for _, conn := range snapshot.Connections { + endpoint := normalizeQUICCapacityEndpoint(conn.Endpoint) + if endpoint == "" || conn.CapacityPressurePercent <= 0 { + continue + } + existing := pressureByEndpoint[endpoint] + if conn.CapacityPressurePercent > existing.CapacityPressurePercent { + pressureByEndpoint[endpoint] = conn + } + } + if len(pressureByEndpoint) == 0 { + return nil + } + if now.IsZero() { + now = time.Now().UTC() + } + out := map[string]mesh.EndpointCandidateCapacityPressure{} + for _, candidate := range candidates { + endpointID := strings.TrimSpace(candidate.EndpointID) + if endpointID == "" { + continue + } + conn, ok := pressureByEndpoint[normalizeQUICCapacityEndpoint(candidate.Address)] + if !ok { + continue + } + count := int64((conn.CapacityPressurePercent + 9) / 10) + if count <= 0 { + count = 1 + } + out[endpointID] = mesh.EndpointCandidateCapacityPressure{ + EndpointID: endpointID, + Count: count, + LastSeenUnixSec: now.Unix(), + } + } + return out +} + +func mergeEndpointCapacityPressure(primary, secondary map[string]mesh.EndpointCandidateCapacityPressure) map[string]mesh.EndpointCandidateCapacityPressure { + if len(primary) == 0 { + return secondary + } + if len(secondary) == 0 { + return primary + } + out := make(map[string]mesh.EndpointCandidateCapacityPressure, len(primary)+len(secondary)) + for endpointID, pressure := range primary { + out[endpointID] = pressure + } + for endpointID, pressure := range secondary { + existing, ok := out[endpointID] + if !ok || pressure.Count > existing.Count || pressure.LastSeenUnixSec > existing.LastSeenUnixSec { + out[endpointID] = pressure + } + } + return out +} + +func normalizeQUICCapacityEndpoint(endpoint string) string { + endpoint = strings.TrimRight(strings.TrimSpace(endpoint), "/") + endpoint = strings.TrimPrefix(endpoint, "quic://") + return strings.ToLower(endpoint) +} + func mergedEndpointCandidateObservations(remote map[string]mesh.EndpointCandidateHealthObservation, local map[string]mesh.EndpointCandidateHealthObservation) map[string]mesh.EndpointCandidateHealthObservation { if len(remote) == 0 && len(local) == 0 { return nil diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 208db1b..d132e38 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -1132,6 +1132,41 @@ func TestVPNFabricSessionTargetsUseCapacityPressureForLoadSpread(t *testing.T) { } } +func TestQUICEndpointCapacityPressureForScoringUsesLiveSnapshot(t *testing.T) { + now := time.Now().UTC() + pressure := quicEndpointCapacityPressureForScoringFromSnapshot([]mesh.PeerEndpointCandidate{ + { + EndpointID: "node-b-quic-a", + Transport: "direct_quic", + Address: "quic://NODE-B-A.example.test:19443/", + }, + { + EndpointID: "node-b-quic-b", + Transport: "direct_quic", + Address: "quic://node-b-b.example.test:19443", + }, + }, mesh.QUICFabricTransportSnapshot{ + Connections: []mesh.QUICFabricConnSnapshot{ + { + Endpoint: "node-b-a.example.test:19443", + ActiveStreams: 5, + MaxStreams: 10, + CapacityPressurePercent: 50, + }, + }, + }, now) + if len(pressure) != 1 { + t.Fatalf("pressure count = %d, want 1: %+v", len(pressure), pressure) + } + got := pressure["node-b-quic-a"] + if got.EndpointID != "node-b-quic-a" || got.Count != 5 || got.LastSeenUnixSec != now.Unix() { + t.Fatalf("unexpected pressure: %+v", got) + } + if _, ok := pressure["node-b-quic-b"]; ok { + t.Fatalf("unpressured endpoint was included: %+v", pressure) + } +} + func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { now := time.Now().UTC() merged := mergedEndpointCandidateObservations( diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go index e8c9619..206661d 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -54,9 +54,21 @@ type QUICFabricTransportSnapshot struct { MaxStreamsPerConn int `json:"max_streams_per_conn"` SaturatedConnections int `json:"saturated_connections"` CapacityPressurePercent int `json:"capacity_pressure_percent"` + Connections []QUICFabricConnSnapshot `json:"connections,omitempty"` Stats QUICFabricTransportStats `json:"stats"` } +type QUICFabricConnSnapshot struct { + PeerID string `json:"peer_id,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + CertSHA256 string `json:"cert_sha256,omitempty"` + ActiveStreams int `json:"active_streams"` + MaxStreams int `json:"max_streams"` + CapacityPressurePercent int `json:"capacity_pressure_percent"` + Saturated bool `json:"saturated"` + LastUsedUnixSec int64 `json:"last_used_unix_sec,omitempty"` +} + type quicFabricSession struct { conn *quic.Conn stream *quic.Stream @@ -313,6 +325,20 @@ func quicFabricConnKey(target FabricTransportTarget) string { return peerID + "\x00" + endpoint + "\x00" + normalizeCertSHA256(target.PeerCertSHA256) } +func parseQUICFabricConnKey(key string) (peerID string, endpoint string, certSHA256 string) { + parts := strings.SplitN(key, "\x00", 3) + if len(parts) > 0 { + peerID = parts[0] + } + if len(parts) > 1 { + endpoint = parts[1] + } + if len(parts) > 2 { + certSHA256 = parts[2] + } + return peerID, endpoint, certSHA256 +} + func (t *QUICFabricTransport) Close() error { if t == nil { return nil @@ -359,6 +385,22 @@ func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot { default: snapshot.ActiveCount++ snapshot.ActiveStreams += entry.activeStreams + peerID, endpoint, certSHA256 := parseQUICFabricConnKey(key) + connSnapshot := QUICFabricConnSnapshot{ + PeerID: peerID, + Endpoint: endpoint, + CertSHA256: certSHA256, + ActiveStreams: entry.activeStreams, + MaxStreams: limit, + Saturated: entry.activeStreams >= limit, + } + if !entry.lastUsed.IsZero() { + connSnapshot.LastUsedUnixSec = entry.lastUsed.UTC().Unix() + } + if limit > 0 { + connSnapshot.CapacityPressurePercent = (entry.activeStreams * 100) / limit + } + snapshot.Connections = append(snapshot.Connections, connSnapshot) if entry.activeStreams >= limit { snapshot.SaturatedConnections++ } diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go index 080540b..b6b2a11 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go @@ -195,6 +195,13 @@ func TestQUICFabricTransportReusesConnectionForPeerEndpoint(t *testing.T) { if snapshot.ActiveCount != 1 || snapshot.Stats.Opens != 1 || snapshot.Stats.Reuses != 1 { t.Fatalf("unexpected quic transport snapshot: %+v", snapshot) } + if len(snapshot.Connections) != 1 || + snapshot.Connections[0].PeerID != "node-b" || + snapshot.Connections[0].Endpoint != server.Addr().String() || + snapshot.Connections[0].ActiveStreams != 2 || + snapshot.Connections[0].MaxStreams != defaultQUICFabricMaxStreamsPerConn { + t.Fatalf("unexpected quic connection snapshot: %+v", snapshot.Connections) + } } func TestQUICFabricTransportPrunesIdleConnections(t *testing.T) { @@ -319,6 +326,11 @@ func TestQUICFabricTransportLimitsStreamsPerConnection(t *testing.T) { snapshot.Stats.StreamLimitRejects != 1 { t.Fatalf("unexpected stream limit snapshot: %+v", snapshot) } + if len(snapshot.Connections) != 1 || + !snapshot.Connections[0].Saturated || + snapshot.Connections[0].CapacityPressurePercent != 100 { + t.Fatalf("unexpected saturated connection snapshot: %+v", snapshot.Connections) + } if err := first.Close(); err != nil { t.Fatalf("close first stream: %v", err) } diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 18d0253..741feae 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -423,6 +423,10 @@ the transport's cumulative eviction counters, keeping successive heartbeats consistent. `mesh-live-smoke` reports QUIC fabric capacity-pressure percentage from the transport snapshot, verifying that the capacity fields are populated. +QUIC fabric snapshots now include per cached connection pressure, endpoint, and +saturation state; VPN fabric endpoint ranking consumes that live local pressure +before stream-limit rejection, spreading new sessions away from already busy +QUIC carriers. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.