diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 2d0b8fd..0a65caa 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -748,6 +748,37 @@ func (s *vpnFabricSessionDialStats) capacityCountersSnapshot(maxEntries int) []v return values[:maxEntries] } +func (s *vpnFabricSessionDialStats) capacityPressureForScoring(maxAge time.Duration) map[string]mesh.EndpointCandidateCapacityPressure { + if s == nil { + return nil + } + now := time.Now().UTC() + s.capacityMu.Lock() + defer s.capacityMu.Unlock() + out := make(map[string]mesh.EndpointCandidateCapacityPressure, len(s.capacityByEndpoint)) + for _, counter := range s.capacityByEndpoint { + endpointID := strings.TrimSpace(counter.EndpointID) + if endpointID == "" || counter.Count <= 0 { + continue + } + if maxAge > 0 && counter.LastSeenUnixSec > 0 { + lastSeen := time.Unix(counter.LastSeenUnixSec, 0).UTC() + if now.Sub(lastSeen) > maxAge { + continue + } + } + out[endpointID] = mesh.EndpointCandidateCapacityPressure{ + EndpointID: endpointID, + Count: counter.Count, + LastSeenUnixSec: counter.LastSeenUnixSec, + } + } + if len(out) == 0 { + return nil + } + return out +} + func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() { if s == nil { return @@ -5254,12 +5285,18 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me out := make([]mesh.FabricTransportTarget, 0, len(meshState.PeerEndpointCandidates[nextHop])+1) seen := map[string]struct{}{} if candidates := meshState.PeerEndpointCandidates[nextHop]; len(candidates) > 0 { + var capacityPressure map[string]mesh.EndpointCandidateCapacityPressure + if meshState.VPNFabricSessionDialStats != nil { + capacityPressure = meshState.VPNFabricSessionDialStats.capacityPressureForScoring(2 * time.Minute) + } ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{ - ChannelClass: mesh.SyntheticChannelFabricControl, - Now: time.Now().UTC(), - MaxVerificationAge: 5 * time.Minute, - Observations: mergedEndpointCandidateObservations(meshState.PeerEndpointObservations, meshState.VPNFabricEndpointObservations.Snapshot()), - MaxObservationAge: 5 * time.Minute, + ChannelClass: mesh.SyntheticChannelFabricControl, + Now: time.Now().UTC(), + MaxVerificationAge: 5 * time.Minute, + Observations: mergedEndpointCandidateObservations(meshState.PeerEndpointObservations, meshState.VPNFabricEndpointObservations.Snapshot()), + MaxObservationAge: 5 * time.Minute, + CapacityPressure: capacityPressure, + MaxCapacityPressureAge: 2 * time.Minute, }) for _, item := range ranked { endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/") diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index bb5ed0f..83789d9 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -1086,6 +1086,47 @@ func TestVPNFabricSessionTargetsUseRemoteHealthObservations(t *testing.T) { } } +func TestVPNFabricSessionTargetsUseCapacityPressureForLoadSpread(t *testing.T) { + now := time.Now().UTC() + stats := newVPNFabricSessionDialStats() + for i := 0; i < 8; i++ { + stats.ObserveCapacityLimited(mesh.FabricTransportTarget{ + EndpointID: "node-b-quic-a", + Endpoint: "quic://node-b-a.example.test:19443", + Transport: "direct_quic", + }) + } + targets := vpnFabricSessionTargets(&syntheticMeshState{ + VPNFabricSessionDialStats: stats, + PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{ + "node-b": { + { + EndpointID: "node-b-quic-a", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b-a.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-quic-b", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b-b.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 5, + LastVerifiedAt: &now, + }, + }, + }, + }, "node-b") + if len(targets) != 2 || targets[0].EndpointID != "node-b-quic-b" { + t.Fatalf("targets did not spread away from pressured endpoint: %+v", targets) + } +} + func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { now := time.Now().UTC() merged := mergedEndpointCandidateObservations( diff --git a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go index 707ed6a..9dce6b5 100644 --- a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go +++ b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go @@ -7,12 +7,14 @@ import ( ) type EndpointCandidateScoreOptions struct { - ChannelClass string - PreferredRegion string - Now time.Time - MaxVerificationAge time.Duration - Observations map[string]EndpointCandidateHealthObservation - MaxObservationAge time.Duration + ChannelClass string + PreferredRegion string + Now time.Time + MaxVerificationAge time.Duration + Observations map[string]EndpointCandidateHealthObservation + MaxObservationAge time.Duration + CapacityPressure map[string]EndpointCandidateCapacityPressure + MaxCapacityPressureAge time.Duration } type EndpointCandidateHealthObservation struct { @@ -27,6 +29,12 @@ type EndpointCandidateHealthObservation struct { ObservedAt time.Time `json:"observed_at,omitempty"` } +type EndpointCandidateCapacityPressure struct { + EndpointID string `json:"endpoint_id,omitempty"` + Count int64 `json:"count"` + LastSeenUnixSec int64 `json:"last_seen_unix_sec"` +} + type ScoredPeerEndpointCandidate struct { Candidate PeerEndpointCandidate `json:"candidate"` Score int `json:"score"` @@ -185,6 +193,11 @@ func scorePeerEndpointCandidate(candidate PeerEndpointCandidate, opts EndpointCa score += observationScore reasons = append(reasons, observationReasons...) } + if pressure, ok := opts.CapacityPressure[candidate.EndpointID]; ok { + pressureScore, pressureReasons := scoreEndpointCandidateCapacityPressure(pressure, opts) + score += pressureScore + reasons = append(reasons, pressureReasons...) + } return ScoredPeerEndpointCandidate{ Candidate: candidate, @@ -193,6 +206,21 @@ func scorePeerEndpointCandidate(candidate PeerEndpointCandidate, opts EndpointCa } } +func scoreEndpointCandidateCapacityPressure(pressure EndpointCandidateCapacityPressure, opts EndpointCandidateScoreOptions) (int, []string) { + if pressure.Count <= 0 { + return 0, nil + } + if !opts.Now.IsZero() && pressure.LastSeenUnixSec > 0 && opts.MaxCapacityPressureAge > 0 { + lastSeen := time.Unix(pressure.LastSeenUnixSec, 0).UTC() + age := opts.Now.Sub(lastSeen) + if age < 0 || age > opts.MaxCapacityPressureAge { + return 0, []string{"capacity:pressure-stale"} + } + } + penalty := boundedInt(int(pressure.Count)*3, 3, 24) + return -penalty, []string{"capacity:pressure"} +} + func scoreEndpointCandidateObservation(observation EndpointCandidateHealthObservation, opts EndpointCandidateScoreOptions) (int, []string) { score := 0 reasons := []string{"observation:present"} diff --git a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go index 53ce712..c60b1fa 100644 --- a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go +++ b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go @@ -387,6 +387,48 @@ func TestRankPeerEndpointCandidatesTreatsCapacityAsSoftPressure(t *testing.T) { } } +func TestRankPeerEndpointCandidatesSpreadsFreshCapacityPressure(t *testing.T) { + now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + ranked := RankPeerEndpointCandidates([]PeerEndpointCandidate{ + { + EndpointID: "node-b-quic-a", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b-a.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-quic-b", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b-b.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 5, + LastVerifiedAt: &now, + }, + }, EndpointCandidateScoreOptions{ + Now: now, + MaxVerificationAge: time.Minute, + MaxCapacityPressureAge: time.Minute, + CapacityPressure: map[string]EndpointCandidateCapacityPressure{ + "node-b-quic-a": { + EndpointID: "node-b-quic-a", + Count: 8, + LastSeenUnixSec: now.Unix(), + }, + }, + }) + if ranked[0].Candidate.EndpointID != "node-b-quic-b" { + t.Fatalf("top endpoint = %q, want less pressured endpoint: %+v", ranked[0].Candidate.EndpointID, ranked) + } + if !containsReason(ranked[1].Reasons, "capacity:pressure") { + t.Fatalf("capacity pressure reason missing: %+v", ranked[1].Reasons) + } +} + func containsReason(reasons []string, reason string) bool { for _, item := range reasons { if item == reason { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 76e918b..e370f16 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -377,6 +377,9 @@ observations. The same dial telemetry now keeps bounded per-endpoint capacity-pressure counters, so operators can see whether stream saturation is occasional or concentrated on a specific QUIC carrier. +Fresh local capacity-pressure counters also feed endpoint ranking as a bounded +penalty, spreading new fabric sessions away from a saturated carrier without +declaring that carrier failed. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.