diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index b3d387b..b04b22d 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -22,6 +22,7 @@ import ( "runtime" "sort" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -363,6 +364,7 @@ type syntheticMeshState struct { VPNFabricSessionPeers *mesh.FabricSessionPeerManager VPNFabricTransport *mesh.WebSocketFabricTransport VPNFabricSessionDialStats *vpnFabricSessionDialStats + VPNFabricEndpointObservations *vpnFabricEndpointObservationStore PeerEndpoints map[string]string PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate VPNGateway *vpnruntime.Gateway @@ -419,10 +421,65 @@ type vpnFabricSessionDialStats struct { LastFailureUnixSec atomic.Int64 } +type vpnFabricEndpointObservationStore struct { + mu sync.Mutex + observations map[string]mesh.EndpointCandidateHealthObservation +} + func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats { return &vpnFabricSessionDialStats{} } +func newVPNFabricEndpointObservationStore() *vpnFabricEndpointObservationStore { + return &vpnFabricEndpointObservationStore{ + observations: map[string]mesh.EndpointCandidateHealthObservation{}, + } +} + +func (s *vpnFabricEndpointObservationStore) Snapshot() map[string]mesh.EndpointCandidateHealthObservation { + if s == nil { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + out := make(map[string]mesh.EndpointCandidateHealthObservation, len(s.observations)) + for key, value := range s.observations { + out[key] = value + } + return out +} + +func (s *vpnFabricEndpointObservationStore) ObserveSuccess(endpointID string, latency time.Duration) { + if s == nil || strings.TrimSpace(endpointID) == "" { + return + } + s.mu.Lock() + defer s.mu.Unlock() + observation := s.observations[endpointID] + observation.EndpointID = endpointID + observation.SuccessCount++ + observation.LastLatencyMs = latency.Milliseconds() + observation.ReliabilityScore = 100 + observation.LastFailureReason = "" + observation.ObservedAt = time.Now().UTC() + s.observations[endpointID] = observation +} + +func (s *vpnFabricEndpointObservationStore) ObserveFailure(endpointID string, reason string) { + if s == nil || strings.TrimSpace(endpointID) == "" { + return + } + s.mu.Lock() + defer s.mu.Unlock() + observation := s.observations[endpointID] + observation.EndpointID = endpointID + observation.FailureCount++ + observation.LastFailureReason = strings.TrimSpace(reason) + observation.ReliabilityScore = 35 + observation.ObservedAt = time.Now().UTC() + s.observations[endpointID] = observation +} + func fabricTransportLabelIsQUIC(label string) bool { switch strings.ToLower(strings.TrimSpace(label)) { case "quic", "direct_quic", "udp_quic", "quic_udp": @@ -945,6 +1002,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), + VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(), PeerEndpoints: copyStringMap(peerEndpoints), PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates), VPNGateway: vpnGateway, @@ -1841,6 +1899,9 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i if meshState.VPNFabricSessionDialStats == nil { meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() } + if meshState.VPNFabricEndpointObservations == nil { + meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore() + } meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates) if productionForwardingEnabled { @@ -2776,6 +2837,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn if meshState != nil && meshState.VPNFabricSessionDialStats != nil { report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) } + if meshState != nil && meshState.VPNFabricEndpointObservations != nil { + report["endpoint_observations"] = meshState.VPNFabricEndpointObservations.Snapshot() + } payload.Metadata["vpn_fabric_session_transport_report"] = report payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true @@ -4769,6 +4833,9 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if meshState.VPNFabricSessionDialStats == nil { meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() } + if meshState.VPNFabricEndpointObservations == nil { + meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore() + } meshState.VPNFabricSessionDialStats.Attempts.Add(1) if meshState.VPNFabricSessionPeers == nil { meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() @@ -4778,6 +4845,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st } token := fabricSessionGatewayToken(identity, assignment, nextHop) for index, target := range targets { + startedAt := time.Now() dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second) target.PeerID = nextHop target.Token = token @@ -4789,6 +4857,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if err != nil { cancel() meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("transport_select_failed") + meshState.VPNFabricEndpointObservations.ObserveFailure(target.EndpointID, "transport_select_failed") log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, index, target.Endpoint, target.Transport, err) continue } @@ -4796,6 +4865,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if err != nil { cancel() meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("session_open_failed") + meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, "session_open_failed") log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) continue } @@ -4811,11 +4881,13 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st cancel() _ = session.Close() meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("stream_open_failed") + meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, "stream_open_failed") log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) continue } cancel() meshState.VPNFabricSessionDialStats.ObserveSelected(selectedTarget) + meshState.VPNFabricEndpointObservations.ObserveSuccess(selectedTarget.EndpointID, time.Since(startedAt)) log.Printf("vpn fabric session transport selected: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s pinned_cert=%t fallback_candidates=%d", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, selectedTarget.PeerCertSHA256 != "", len(targets)-index-1) return &vpnruntime.FabricSessionPacketTransport{ Sender: session, @@ -4852,6 +4924,8 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me ChannelClass: mesh.SyntheticChannelFabricControl, Now: time.Now().UTC(), MaxVerificationAge: 5 * time.Minute, + Observations: meshState.VPNFabricEndpointObservations.Snapshot(), + MaxObservationAge: 5 * time.Minute, }) for _, item := range ranked { endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/") @@ -4864,6 +4938,7 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me } seen[key] = struct{}{} out = append(out, mesh.FabricTransportTarget{ + EndpointID: item.Candidate.EndpointID, Endpoint: endpoint, Transport: item.Candidate.Transport, PeerCertSHA256: endpointCandidateTLSCertSHA256(item.Candidate), diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 24068ea..584be64 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -872,6 +872,46 @@ func TestVPNFabricSessionTargetsIncludeRankedCandidatesThenLegacyFallback(t *tes } } +func TestVPNFabricSessionTargetsUseLocalHealthObservations(t *testing.T) { + now := time.Now().UTC() + observations := newVPNFabricEndpointObservationStore() + observations.ObserveFailure("node-b-quic", "session_open_failed") + observations.ObserveFailure("node-b-quic", "session_open_failed") + targets := vpnFabricSessionTargets(&syntheticMeshState{ + VPNFabricEndpointObservations: observations, + PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{ + "node-b": { + { + EndpointID: "node-b-quic", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-wss", + NodeID: "node-b", + Transport: "wss", + Address: "https://node-b.example.test:443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + }, + }, + }, "node-b") + if len(targets) != 2 { + t.Fatalf("target count = %d, want 2: %+v", len(targets), targets) + } + if targets[0].EndpointID != "node-b-wss" || targets[1].EndpointID != "node-b-quic" { + t.Fatalf("targets did not apply local health observations: %+v", targets) + } +} + func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) { now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC) payload := heartbeatPayload(config.Config{ diff --git a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go index 3da1cff..aed77eb 100644 --- a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go +++ b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go @@ -206,7 +206,7 @@ func scoreEndpointCandidateObservation(observation EndpointCandidateHealthObserv case observation.LastLatencyMs > 0 && observation.LastLatencyMs <= 50: score += 18 reasons = append(reasons, "latency:low") - case observation.LastLatencyMs <= 150: + case observation.LastLatencyMs > 0 && observation.LastLatencyMs <= 150: score += 8 reasons = append(reasons, "latency:moderate") case observation.LastLatencyMs > 0: diff --git a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go index ec2fbcc..c1127c1 100644 --- a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go +++ b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring_test.go @@ -307,6 +307,52 @@ func TestRankPeerEndpointCandidatesTreatsStaleObservationAsPenalty(t *testing.T) } } +func TestRankPeerEndpointCandidatesDoesNotRewardZeroLatencyFailure(t *testing.T) { + now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + ranked := RankPeerEndpointCandidates([]PeerEndpointCandidate{ + { + EndpointID: "node-b-quic", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-wss", + NodeID: "node-b", + Transport: "wss", + Address: "https://node-b.example.test:443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + }, EndpointCandidateScoreOptions{ + ChannelClass: SyntheticChannelFabricControl, + Now: now, + MaxVerificationAge: time.Minute, + Observations: map[string]EndpointCandidateHealthObservation{ + "node-b-quic": { + EndpointID: "node-b-quic", + FailureCount: 2, + LastFailureReason: "session_open_failed", + ReliabilityScore: 35, + ObservedAt: now, + }, + }, + MaxObservationAge: time.Minute, + }) + if ranked[0].Candidate.EndpointID != "node-b-wss" { + t.Fatalf("top endpoint = %q, want wss after repeated quic failures: %+v", ranked[0].Candidate.EndpointID, ranked) + } + if containsReason(ranked[1].Reasons, "latency:moderate") { + t.Fatalf("zero latency failure was rewarded as moderate latency: %+v", ranked[1].Reasons) + } +} + func containsReason(reasons []string, reason string) bool { for _, item := range reasons { if item == reason { diff --git a/agents/rap-node-agent/internal/mesh/fabric_transport.go b/agents/rap-node-agent/internal/mesh/fabric_transport.go index d2fa56b..e64db1a 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_transport.go @@ -25,6 +25,7 @@ type FabricTransport interface { } type FabricTransportTarget struct { + EndpointID string PeerID string Endpoint string Transport string diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 16f5574..8266f5a 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -324,6 +324,12 @@ certificate pin usage, and remaining fallback count for phone-side diagnostics. Heartbeat telemetry now includes VPN fabric-session dial counters for attempts, candidate failures, selected transport family, certificate pin usage, and the last selected endpoint/failure reason. +VPN fabric-session dialing feeds candidate success/failure observations back +into endpoint ranking, so repeated local QUIC failures can temporarily demote +that endpoint while preserving it as a later fallback. +Endpoint scoring no longer treats missing/zero latency on failed observations as +moderate latency, preventing failed candidates from receiving a false score +bonus. Deliverables: