Feed VPN fabric endpoint health into ranking

This commit is contained in:
2026-05-16 11:04:56 +03:00
parent e185e1f142
commit 396d36d5a9
6 changed files with 169 additions and 1 deletions
@@ -22,6 +22,7 @@ import (
"runtime" "runtime"
"sort" "sort"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time" "time"
@@ -363,6 +364,7 @@ type syntheticMeshState struct {
VPNFabricSessionPeers *mesh.FabricSessionPeerManager VPNFabricSessionPeers *mesh.FabricSessionPeerManager
VPNFabricTransport *mesh.WebSocketFabricTransport VPNFabricTransport *mesh.WebSocketFabricTransport
VPNFabricSessionDialStats *vpnFabricSessionDialStats VPNFabricSessionDialStats *vpnFabricSessionDialStats
VPNFabricEndpointObservations *vpnFabricEndpointObservationStore
PeerEndpoints map[string]string PeerEndpoints map[string]string
PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate
VPNGateway *vpnruntime.Gateway VPNGateway *vpnruntime.Gateway
@@ -419,10 +421,65 @@ type vpnFabricSessionDialStats struct {
LastFailureUnixSec atomic.Int64 LastFailureUnixSec atomic.Int64
} }
type vpnFabricEndpointObservationStore struct {
mu sync.Mutex
observations map[string]mesh.EndpointCandidateHealthObservation
}
func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats { func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats {
return &vpnFabricSessionDialStats{} return &vpnFabricSessionDialStats{}
} }
func newVPNFabricEndpointObservationStore() *vpnFabricEndpointObservationStore {
return &vpnFabricEndpointObservationStore{
observations: map[string]mesh.EndpointCandidateHealthObservation{},
}
}
func (s *vpnFabricEndpointObservationStore) Snapshot() map[string]mesh.EndpointCandidateHealthObservation {
if s == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
out := make(map[string]mesh.EndpointCandidateHealthObservation, len(s.observations))
for key, value := range s.observations {
out[key] = value
}
return out
}
func (s *vpnFabricEndpointObservationStore) ObserveSuccess(endpointID string, latency time.Duration) {
if s == nil || strings.TrimSpace(endpointID) == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
observation := s.observations[endpointID]
observation.EndpointID = endpointID
observation.SuccessCount++
observation.LastLatencyMs = latency.Milliseconds()
observation.ReliabilityScore = 100
observation.LastFailureReason = ""
observation.ObservedAt = time.Now().UTC()
s.observations[endpointID] = observation
}
func (s *vpnFabricEndpointObservationStore) ObserveFailure(endpointID string, reason string) {
if s == nil || strings.TrimSpace(endpointID) == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
observation := s.observations[endpointID]
observation.EndpointID = endpointID
observation.FailureCount++
observation.LastFailureReason = strings.TrimSpace(reason)
observation.ReliabilityScore = 35
observation.ObservedAt = time.Now().UTC()
s.observations[endpointID] = observation
}
func fabricTransportLabelIsQUIC(label string) bool { func fabricTransportLabelIsQUIC(label string) bool {
switch strings.ToLower(strings.TrimSpace(label)) { switch strings.ToLower(strings.TrimSpace(label)) {
case "quic", "direct_quic", "udp_quic", "quic_udp": case "quic", "direct_quic", "udp_quic", "quic_udp":
@@ -945,6 +1002,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricSessionPeers: vpnFabricSessionPeers,
VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers),
VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), VPNFabricSessionDialStats: newVPNFabricSessionDialStats(),
VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(),
PeerEndpoints: copyStringMap(peerEndpoints), PeerEndpoints: copyStringMap(peerEndpoints),
PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates), PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates),
VPNGateway: vpnGateway, VPNGateway: vpnGateway,
@@ -1841,6 +1899,9 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
if meshState.VPNFabricSessionDialStats == nil { if meshState.VPNFabricSessionDialStats == nil {
meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats()
} }
if meshState.VPNFabricEndpointObservations == nil {
meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore()
}
meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints)
meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates) meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates)
if productionForwardingEnabled { if productionForwardingEnabled {
@@ -2776,6 +2837,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
if meshState != nil && meshState.VPNFabricSessionDialStats != nil { if meshState != nil && meshState.VPNFabricSessionDialStats != nil {
report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt)
} }
if meshState != nil && meshState.VPNFabricEndpointObservations != nil {
report["endpoint_observations"] = meshState.VPNFabricEndpointObservations.Snapshot()
}
payload.Metadata["vpn_fabric_session_transport_report"] = report payload.Metadata["vpn_fabric_session_transport_report"] = report
payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_fabric_session_transport"] = true
payload.Capabilities["vpn_packet_batch_binary_frames"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true
@@ -4769,6 +4833,9 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
if meshState.VPNFabricSessionDialStats == nil { if meshState.VPNFabricSessionDialStats == nil {
meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats()
} }
if meshState.VPNFabricEndpointObservations == nil {
meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore()
}
meshState.VPNFabricSessionDialStats.Attempts.Add(1) meshState.VPNFabricSessionDialStats.Attempts.Add(1)
if meshState.VPNFabricSessionPeers == nil { if meshState.VPNFabricSessionPeers == nil {
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
@@ -4778,6 +4845,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
} }
token := fabricSessionGatewayToken(identity, assignment, nextHop) token := fabricSessionGatewayToken(identity, assignment, nextHop)
for index, target := range targets { for index, target := range targets {
startedAt := time.Now()
dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second) dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
target.PeerID = nextHop target.PeerID = nextHop
target.Token = token target.Token = token
@@ -4789,6 +4857,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
if err != nil { if err != nil {
cancel() cancel()
meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("transport_select_failed") meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("transport_select_failed")
meshState.VPNFabricEndpointObservations.ObserveFailure(target.EndpointID, "transport_select_failed")
log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, index, target.Endpoint, target.Transport, err) log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, index, target.Endpoint, target.Transport, err)
continue continue
} }
@@ -4796,6 +4865,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
if err != nil { if err != nil {
cancel() cancel()
meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("session_open_failed") meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("session_open_failed")
meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, "session_open_failed")
log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err)
continue continue
} }
@@ -4811,11 +4881,13 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
cancel() cancel()
_ = session.Close() _ = session.Close()
meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("stream_open_failed") meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("stream_open_failed")
meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, "stream_open_failed")
log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err)
continue continue
} }
cancel() cancel()
meshState.VPNFabricSessionDialStats.ObserveSelected(selectedTarget) meshState.VPNFabricSessionDialStats.ObserveSelected(selectedTarget)
meshState.VPNFabricEndpointObservations.ObserveSuccess(selectedTarget.EndpointID, time.Since(startedAt))
log.Printf("vpn fabric session transport selected: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s pinned_cert=%t fallback_candidates=%d", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, selectedTarget.PeerCertSHA256 != "", len(targets)-index-1) log.Printf("vpn fabric session transport selected: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s pinned_cert=%t fallback_candidates=%d", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, selectedTarget.PeerCertSHA256 != "", len(targets)-index-1)
return &vpnruntime.FabricSessionPacketTransport{ return &vpnruntime.FabricSessionPacketTransport{
Sender: session, Sender: session,
@@ -4852,6 +4924,8 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me
ChannelClass: mesh.SyntheticChannelFabricControl, ChannelClass: mesh.SyntheticChannelFabricControl,
Now: time.Now().UTC(), Now: time.Now().UTC(),
MaxVerificationAge: 5 * time.Minute, MaxVerificationAge: 5 * time.Minute,
Observations: meshState.VPNFabricEndpointObservations.Snapshot(),
MaxObservationAge: 5 * time.Minute,
}) })
for _, item := range ranked { for _, item := range ranked {
endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/") endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/")
@@ -4864,6 +4938,7 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me
} }
seen[key] = struct{}{} seen[key] = struct{}{}
out = append(out, mesh.FabricTransportTarget{ out = append(out, mesh.FabricTransportTarget{
EndpointID: item.Candidate.EndpointID,
Endpoint: endpoint, Endpoint: endpoint,
Transport: item.Candidate.Transport, Transport: item.Candidate.Transport,
PeerCertSHA256: endpointCandidateTLSCertSHA256(item.Candidate), PeerCertSHA256: endpointCandidateTLSCertSHA256(item.Candidate),
@@ -872,6 +872,46 @@ func TestVPNFabricSessionTargetsIncludeRankedCandidatesThenLegacyFallback(t *tes
} }
} }
func TestVPNFabricSessionTargetsUseLocalHealthObservations(t *testing.T) {
now := time.Now().UTC()
observations := newVPNFabricEndpointObservationStore()
observations.ObserveFailure("node-b-quic", "session_open_failed")
observations.ObserveFailure("node-b-quic", "session_open_failed")
targets := vpnFabricSessionTargets(&syntheticMeshState{
VPNFabricEndpointObservations: observations,
PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-quic",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://node-b.example.test:19443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 10,
LastVerifiedAt: &now,
},
{
EndpointID: "node-b-wss",
NodeID: "node-b",
Transport: "wss",
Address: "https://node-b.example.test:443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 10,
LastVerifiedAt: &now,
},
},
},
}, "node-b")
if len(targets) != 2 {
t.Fatalf("target count = %d, want 2: %+v", len(targets), targets)
}
if targets[0].EndpointID != "node-b-wss" || targets[1].EndpointID != "node-b-quic" {
t.Fatalf("targets did not apply local health observations: %+v", targets)
}
}
func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) { func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) {
now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC) now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC)
payload := heartbeatPayload(config.Config{ payload := heartbeatPayload(config.Config{
@@ -206,7 +206,7 @@ func scoreEndpointCandidateObservation(observation EndpointCandidateHealthObserv
case observation.LastLatencyMs > 0 && observation.LastLatencyMs <= 50: case observation.LastLatencyMs > 0 && observation.LastLatencyMs <= 50:
score += 18 score += 18
reasons = append(reasons, "latency:low") reasons = append(reasons, "latency:low")
case observation.LastLatencyMs <= 150: case observation.LastLatencyMs > 0 && observation.LastLatencyMs <= 150:
score += 8 score += 8
reasons = append(reasons, "latency:moderate") reasons = append(reasons, "latency:moderate")
case observation.LastLatencyMs > 0: case observation.LastLatencyMs > 0:
@@ -307,6 +307,52 @@ func TestRankPeerEndpointCandidatesTreatsStaleObservationAsPenalty(t *testing.T)
} }
} }
func TestRankPeerEndpointCandidatesDoesNotRewardZeroLatencyFailure(t *testing.T) {
now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)
ranked := RankPeerEndpointCandidates([]PeerEndpointCandidate{
{
EndpointID: "node-b-quic",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://node-b.example.test:19443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 10,
LastVerifiedAt: &now,
},
{
EndpointID: "node-b-wss",
NodeID: "node-b",
Transport: "wss",
Address: "https://node-b.example.test:443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 10,
LastVerifiedAt: &now,
},
}, EndpointCandidateScoreOptions{
ChannelClass: SyntheticChannelFabricControl,
Now: now,
MaxVerificationAge: time.Minute,
Observations: map[string]EndpointCandidateHealthObservation{
"node-b-quic": {
EndpointID: "node-b-quic",
FailureCount: 2,
LastFailureReason: "session_open_failed",
ReliabilityScore: 35,
ObservedAt: now,
},
},
MaxObservationAge: time.Minute,
})
if ranked[0].Candidate.EndpointID != "node-b-wss" {
t.Fatalf("top endpoint = %q, want wss after repeated quic failures: %+v", ranked[0].Candidate.EndpointID, ranked)
}
if containsReason(ranked[1].Reasons, "latency:moderate") {
t.Fatalf("zero latency failure was rewarded as moderate latency: %+v", ranked[1].Reasons)
}
}
func containsReason(reasons []string, reason string) bool { func containsReason(reasons []string, reason string) bool {
for _, item := range reasons { for _, item := range reasons {
if item == reason { if item == reason {
@@ -25,6 +25,7 @@ type FabricTransport interface {
} }
type FabricTransportTarget struct { type FabricTransportTarget struct {
EndpointID string
PeerID string PeerID string
Endpoint string Endpoint string
Transport string Transport string
@@ -324,6 +324,12 @@ certificate pin usage, and remaining fallback count for phone-side diagnostics.
Heartbeat telemetry now includes VPN fabric-session dial counters for attempts, Heartbeat telemetry now includes VPN fabric-session dial counters for attempts,
candidate failures, selected transport family, certificate pin usage, and the candidate failures, selected transport family, certificate pin usage, and the
last selected endpoint/failure reason. last selected endpoint/failure reason.
VPN fabric-session dialing feeds candidate success/failure observations back
into endpoint ranking, so repeated local QUIC failures can temporarily demote
that endpoint while preserving it as a later fallback.
Endpoint scoring no longer treats missing/zero latency on failed observations as
moderate latency, preventing failed candidates from receiving a false score
bonus.
Deliverables: Deliverables: