Use capacity pressure in endpoint ranking

This commit is contained in:
2026-05-16 12:06:05 +03:00
parent 90fe4b6872
commit 9a170c83c2
5 changed files with 162 additions and 11 deletions
@@ -748,6 +748,37 @@ func (s *vpnFabricSessionDialStats) capacityCountersSnapshot(maxEntries int) []v
return values[:maxEntries] return values[:maxEntries]
} }
func (s *vpnFabricSessionDialStats) capacityPressureForScoring(maxAge time.Duration) map[string]mesh.EndpointCandidateCapacityPressure {
if s == nil {
return nil
}
now := time.Now().UTC()
s.capacityMu.Lock()
defer s.capacityMu.Unlock()
out := make(map[string]mesh.EndpointCandidateCapacityPressure, len(s.capacityByEndpoint))
for _, counter := range s.capacityByEndpoint {
endpointID := strings.TrimSpace(counter.EndpointID)
if endpointID == "" || counter.Count <= 0 {
continue
}
if maxAge > 0 && counter.LastSeenUnixSec > 0 {
lastSeen := time.Unix(counter.LastSeenUnixSec, 0).UTC()
if now.Sub(lastSeen) > maxAge {
continue
}
}
out[endpointID] = mesh.EndpointCandidateCapacityPressure{
EndpointID: endpointID,
Count: counter.Count,
LastSeenUnixSec: counter.LastSeenUnixSec,
}
}
if len(out) == 0 {
return nil
}
return out
}
func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() { func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() {
if s == nil { if s == nil {
return return
@@ -5254,12 +5285,18 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me
out := make([]mesh.FabricTransportTarget, 0, len(meshState.PeerEndpointCandidates[nextHop])+1) out := make([]mesh.FabricTransportTarget, 0, len(meshState.PeerEndpointCandidates[nextHop])+1)
seen := map[string]struct{}{} seen := map[string]struct{}{}
if candidates := meshState.PeerEndpointCandidates[nextHop]; len(candidates) > 0 { if candidates := meshState.PeerEndpointCandidates[nextHop]; len(candidates) > 0 {
var capacityPressure map[string]mesh.EndpointCandidateCapacityPressure
if meshState.VPNFabricSessionDialStats != nil {
capacityPressure = meshState.VPNFabricSessionDialStats.capacityPressureForScoring(2 * time.Minute)
}
ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{ ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{
ChannelClass: mesh.SyntheticChannelFabricControl, ChannelClass: mesh.SyntheticChannelFabricControl,
Now: time.Now().UTC(), Now: time.Now().UTC(),
MaxVerificationAge: 5 * time.Minute, MaxVerificationAge: 5 * time.Minute,
Observations: mergedEndpointCandidateObservations(meshState.PeerEndpointObservations, meshState.VPNFabricEndpointObservations.Snapshot()), Observations: mergedEndpointCandidateObservations(meshState.PeerEndpointObservations, meshState.VPNFabricEndpointObservations.Snapshot()),
MaxObservationAge: 5 * time.Minute, MaxObservationAge: 5 * time.Minute,
CapacityPressure: capacityPressure,
MaxCapacityPressureAge: 2 * time.Minute,
}) })
for _, item := range ranked { for _, item := range ranked {
endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/") endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/")
@@ -1086,6 +1086,47 @@ func TestVPNFabricSessionTargetsUseRemoteHealthObservations(t *testing.T) {
} }
} }
func TestVPNFabricSessionTargetsUseCapacityPressureForLoadSpread(t *testing.T) {
now := time.Now().UTC()
stats := newVPNFabricSessionDialStats()
for i := 0; i < 8; i++ {
stats.ObserveCapacityLimited(mesh.FabricTransportTarget{
EndpointID: "node-b-quic-a",
Endpoint: "quic://node-b-a.example.test:19443",
Transport: "direct_quic",
})
}
targets := vpnFabricSessionTargets(&syntheticMeshState{
VPNFabricSessionDialStats: stats,
PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-quic-a",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://node-b-a.example.test:19443",
Reachability: "public",
ConnectivityMode: "direct",
LastVerifiedAt: &now,
},
{
EndpointID: "node-b-quic-b",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://node-b-b.example.test:19443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 5,
LastVerifiedAt: &now,
},
},
},
}, "node-b")
if len(targets) != 2 || targets[0].EndpointID != "node-b-quic-b" {
t.Fatalf("targets did not spread away from pressured endpoint: %+v", targets)
}
}
func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) {
now := time.Now().UTC() now := time.Now().UTC()
merged := mergedEndpointCandidateObservations( merged := mergedEndpointCandidateObservations(
@@ -7,12 +7,14 @@ import (
) )
type EndpointCandidateScoreOptions struct { type EndpointCandidateScoreOptions struct {
ChannelClass string ChannelClass string
PreferredRegion string PreferredRegion string
Now time.Time Now time.Time
MaxVerificationAge time.Duration MaxVerificationAge time.Duration
Observations map[string]EndpointCandidateHealthObservation Observations map[string]EndpointCandidateHealthObservation
MaxObservationAge time.Duration MaxObservationAge time.Duration
CapacityPressure map[string]EndpointCandidateCapacityPressure
MaxCapacityPressureAge time.Duration
} }
type EndpointCandidateHealthObservation struct { type EndpointCandidateHealthObservation struct {
@@ -27,6 +29,12 @@ type EndpointCandidateHealthObservation struct {
ObservedAt time.Time `json:"observed_at,omitempty"` ObservedAt time.Time `json:"observed_at,omitempty"`
} }
type EndpointCandidateCapacityPressure struct {
EndpointID string `json:"endpoint_id,omitempty"`
Count int64 `json:"count"`
LastSeenUnixSec int64 `json:"last_seen_unix_sec"`
}
type ScoredPeerEndpointCandidate struct { type ScoredPeerEndpointCandidate struct {
Candidate PeerEndpointCandidate `json:"candidate"` Candidate PeerEndpointCandidate `json:"candidate"`
Score int `json:"score"` Score int `json:"score"`
@@ -185,6 +193,11 @@ func scorePeerEndpointCandidate(candidate PeerEndpointCandidate, opts EndpointCa
score += observationScore score += observationScore
reasons = append(reasons, observationReasons...) reasons = append(reasons, observationReasons...)
} }
if pressure, ok := opts.CapacityPressure[candidate.EndpointID]; ok {
pressureScore, pressureReasons := scoreEndpointCandidateCapacityPressure(pressure, opts)
score += pressureScore
reasons = append(reasons, pressureReasons...)
}
return ScoredPeerEndpointCandidate{ return ScoredPeerEndpointCandidate{
Candidate: candidate, Candidate: candidate,
@@ -193,6 +206,21 @@ func scorePeerEndpointCandidate(candidate PeerEndpointCandidate, opts EndpointCa
} }
} }
func scoreEndpointCandidateCapacityPressure(pressure EndpointCandidateCapacityPressure, opts EndpointCandidateScoreOptions) (int, []string) {
if pressure.Count <= 0 {
return 0, nil
}
if !opts.Now.IsZero() && pressure.LastSeenUnixSec > 0 && opts.MaxCapacityPressureAge > 0 {
lastSeen := time.Unix(pressure.LastSeenUnixSec, 0).UTC()
age := opts.Now.Sub(lastSeen)
if age < 0 || age > opts.MaxCapacityPressureAge {
return 0, []string{"capacity:pressure-stale"}
}
}
penalty := boundedInt(int(pressure.Count)*3, 3, 24)
return -penalty, []string{"capacity:pressure"}
}
func scoreEndpointCandidateObservation(observation EndpointCandidateHealthObservation, opts EndpointCandidateScoreOptions) (int, []string) { func scoreEndpointCandidateObservation(observation EndpointCandidateHealthObservation, opts EndpointCandidateScoreOptions) (int, []string) {
score := 0 score := 0
reasons := []string{"observation:present"} reasons := []string{"observation:present"}
@@ -387,6 +387,48 @@ func TestRankPeerEndpointCandidatesTreatsCapacityAsSoftPressure(t *testing.T) {
} }
} }
func TestRankPeerEndpointCandidatesSpreadsFreshCapacityPressure(t *testing.T) {
now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)
ranked := RankPeerEndpointCandidates([]PeerEndpointCandidate{
{
EndpointID: "node-b-quic-a",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://node-b-a.example.test:19443",
Reachability: "public",
ConnectivityMode: "direct",
LastVerifiedAt: &now,
},
{
EndpointID: "node-b-quic-b",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://node-b-b.example.test:19443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 5,
LastVerifiedAt: &now,
},
}, EndpointCandidateScoreOptions{
Now: now,
MaxVerificationAge: time.Minute,
MaxCapacityPressureAge: time.Minute,
CapacityPressure: map[string]EndpointCandidateCapacityPressure{
"node-b-quic-a": {
EndpointID: "node-b-quic-a",
Count: 8,
LastSeenUnixSec: now.Unix(),
},
},
})
if ranked[0].Candidate.EndpointID != "node-b-quic-b" {
t.Fatalf("top endpoint = %q, want less pressured endpoint: %+v", ranked[0].Candidate.EndpointID, ranked)
}
if !containsReason(ranked[1].Reasons, "capacity:pressure") {
t.Fatalf("capacity pressure reason missing: %+v", ranked[1].Reasons)
}
}
func containsReason(reasons []string, reason string) bool { func containsReason(reasons []string, reason string) bool {
for _, item := range reasons { for _, item := range reasons {
if item == reason { if item == reason {
@@ -377,6 +377,9 @@ observations.
The same dial telemetry now keeps bounded per-endpoint capacity-pressure The same dial telemetry now keeps bounded per-endpoint capacity-pressure
counters, so operators can see whether stream saturation is occasional or counters, so operators can see whether stream saturation is occasional or
concentrated on a specific QUIC carrier. concentrated on a specific QUIC carrier.
Fresh local capacity-pressure counters also feed endpoint ranking as a bounded
penalty, spreading new fabric sessions away from a saturated carrier without
declaring that carrier failed.
Endpoint ranking treats `capacity_limited` observations as a soft pressure Endpoint ranking treats `capacity_limited` observations as a soft pressure
penalty instead of a hard recent failure, enabling load spreading without penalty instead of a hard recent failure, enabling load spreading without
marking the carrier unhealthy. marking the carrier unhealthy.