From 831701003cae37c8f580f4f25fcb4a6f85df7d0e Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 11:26:06 +0300 Subject: [PATCH] Apply endpoint health in peer cache --- .../rap-node-agent/cmd/rap-node-agent/main.go | 42 ++++++++------- .../internal/mesh/peer_cache.go | 23 ++++---- .../internal/mesh/peer_cache_test.go | 53 +++++++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 ++ 4 files changed, 91 insertions(+), 30 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index c387b57..eaf57df 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -924,16 +924,17 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c productionForwardingEnabled := cfg.MeshProductionForwardingEnabled || loadedConfig.ProductionForwarding routeHealthRoutes := routeHealthRoutesFromPathDecisions(routes, loadedConfig.RoutePathDecisions) peerCache := mesh.NewPeerCache(mesh.PeerCacheConfig{ - Local: local, - PeerEndpoints: loadedConfig.PeerEndpoints, - PeerEndpointCandidates: loadedConfig.PeerEndpointCandidates, - PeerDirectory: loadedConfig.PeerDirectory, - RecoverySeeds: loadedConfig.RecoverySeeds, - RendezvousLeases: loadedConfig.RendezvousLeases, - Routes: loadedConfig.Routes, - WarmPeerLimit: mesh.DefaultWarmPeerLimit, - PreferredRegion: cfg.MeshRegion, - Now: time.Now().UTC(), + Local: local, + PeerEndpoints: loadedConfig.PeerEndpoints, + PeerEndpointCandidates: loadedConfig.PeerEndpointCandidates, + PeerEndpointObservations: loadedConfig.PeerEndpointObservations, + PeerDirectory: loadedConfig.PeerDirectory, + RecoverySeeds: loadedConfig.RecoverySeeds, + RendezvousLeases: loadedConfig.RendezvousLeases, + Routes: loadedConfig.Routes, + WarmPeerLimit: mesh.DefaultWarmPeerLimit, + PreferredRegion: cfg.MeshRegion, + Now: time.Now().UTC(), }) peerCacheSnapshot := peerCache.Snapshot() peerConnections := mesh.NewPeerConnectionTracker(peerCacheSnapshot, time.Now().UTC()) @@ -1931,16 +1932,17 @@ func refreshSyntheticMeshConfigForRouteHealthFeedback(ctx context.Context, cfg c func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, identity state.Identity, meshState *syntheticMeshState, loadedConfig loadedSyntheticMeshConfig, local mesh.PeerIdentity, preferredRegion string, observedAt time.Time) { routeHealthRoutes := routeHealthRoutesFromPathDecisions(loadedConfig.Routes, loadedConfig.RoutePathDecisions) peerCache := mesh.NewPeerCache(mesh.PeerCacheConfig{ - Local: local, - PeerEndpoints: loadedConfig.PeerEndpoints, - PeerEndpointCandidates: loadedConfig.PeerEndpointCandidates, - PeerDirectory: loadedConfig.PeerDirectory, - RecoverySeeds: loadedConfig.RecoverySeeds, - RendezvousLeases: loadedConfig.RendezvousLeases, - Routes: loadedConfig.Routes, - WarmPeerLimit: mesh.DefaultWarmPeerLimit, - PreferredRegion: preferredRegion, - Now: observedAt, + Local: local, + PeerEndpoints: loadedConfig.PeerEndpoints, + PeerEndpointCandidates: loadedConfig.PeerEndpointCandidates, + PeerEndpointObservations: loadedConfig.PeerEndpointObservations, + PeerDirectory: loadedConfig.PeerDirectory, + RecoverySeeds: loadedConfig.RecoverySeeds, + RendezvousLeases: loadedConfig.RendezvousLeases, + Routes: loadedConfig.Routes, + WarmPeerLimit: mesh.DefaultWarmPeerLimit, + PreferredRegion: preferredRegion, + Now: observedAt, }) if meshState.PeerConnections == nil { meshState.PeerConnections = mesh.NewPeerConnectionTracker(peerCache.Snapshot(), observedAt) diff --git a/agents/rap-node-agent/internal/mesh/peer_cache.go b/agents/rap-node-agent/internal/mesh/peer_cache.go index ab1abcc..90533a8 100644 --- a/agents/rap-node-agent/internal/mesh/peer_cache.go +++ b/agents/rap-node-agent/internal/mesh/peer_cache.go @@ -9,16 +9,17 @@ import ( const DefaultWarmPeerLimit = 8 type PeerCacheConfig struct { - Local PeerIdentity - PeerEndpoints map[string]string - PeerEndpointCandidates map[string][]PeerEndpointCandidate - PeerDirectory []PeerDirectoryEntry - RecoverySeeds []PeerRecoverySeed - RendezvousLeases []PeerRendezvousLease - Routes []SyntheticRoute - WarmPeerLimit int - PreferredRegion string - Now time.Time + Local PeerIdentity + PeerEndpoints map[string]string + PeerEndpointCandidates map[string][]PeerEndpointCandidate + PeerEndpointObservations map[string]EndpointCandidateHealthObservation + PeerDirectory []PeerDirectoryEntry + RecoverySeeds []PeerRecoverySeed + RendezvousLeases []PeerRendezvousLease + Routes []SyntheticRoute + WarmPeerLimit int + PreferredRegion string + Now time.Time } type PeerCache struct { @@ -116,6 +117,8 @@ func NewPeerCache(cfg PeerCacheConfig) *PeerCache { PreferredRegion: cfg.PreferredRegion, Now: now, MaxVerificationAge: time.Hour, + Observations: cfg.PeerEndpointObservations, + MaxObservationAge: time.Hour, }) if len(scored) > 0 { entry.EndpointCandidates = make([]PeerEndpointCandidate, 0, len(scored)) diff --git a/agents/rap-node-agent/internal/mesh/peer_cache_test.go b/agents/rap-node-agent/internal/mesh/peer_cache_test.go index a7d7f51..fa3fd37 100644 --- a/agents/rap-node-agent/internal/mesh/peer_cache_test.go +++ b/agents/rap-node-agent/internal/mesh/peer_cache_test.go @@ -100,6 +100,59 @@ func TestPeerCacheUsesBestEndpointCandidate(t *testing.T) { } } +func TestPeerCacheAppliesEndpointHealthObservations(t *testing.T) { + local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} + now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + cache := NewPeerCache(PeerCacheConfig{ + Local: local, + PeerEndpointCandidates: map[string][]PeerEndpointCandidate{ + "node-b": { + { + EndpointID: "node-b-quic", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b.example.test:19443", + Reachability: "public", + NATType: "none", + ConnectivityMode: "direct", + Priority: 1, + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-wss", + NodeID: "node-b", + Transport: "wss", + Address: "https://node-b.example.test:443", + Reachability: "public", + NATType: "none", + ConnectivityMode: "direct", + Priority: 1, + LastVerifiedAt: &now, + }, + }, + }, + PeerEndpointObservations: map[string]EndpointCandidateHealthObservation{ + "node-b-quic": { + EndpointID: "node-b-quic", + FailureCount: 2, + LastFailureReason: "session_open_failed", + ReliabilityScore: 35, + ObservedAt: now, + }, + }, + WarmPeerLimit: 1, + Now: now, + }) + + entry, ok := peerCacheEntryByID(cache.Snapshot(), "node-b") + if !ok { + t.Fatal("node-b missing from cache") + } + if entry.BestCandidateID != "node-b-wss" || entry.Endpoint != "https://node-b.example.test:443" { + t.Fatalf("peer cache did not apply endpoint observations: %+v", entry) + } +} + func TestPeerCacheUsesPreferredCorporateEndpointAddress(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} cache := NewPeerCache(PeerCacheConfig{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 5821311..d33f508 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -347,6 +347,9 @@ plane can distinguish local dial feedback from aggregated or policy-generated health hints. The endpoint health heartbeat report also includes the reporter node id at the report level for simpler multi-node ingestion and diagnostics. +Peer cache construction now applies endpoint health observations when ranking +peer endpoint candidates, so recovery and warm-peer decisions see the same +degraded-path feedback as VPN fabric-session dialing. Deliverables: