diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index dcdd869..b7c1355 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -426,17 +426,23 @@ type vpnFabricSessionDialStats struct { } type vpnFabricEndpointObservationStore struct { - mu sync.Mutex - observations map[string]mesh.EndpointCandidateHealthObservation + reporterNodeID string + mu sync.Mutex + observations map[string]mesh.EndpointCandidateHealthObservation } func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats { return &vpnFabricSessionDialStats{} } -func newVPNFabricEndpointObservationStore() *vpnFabricEndpointObservationStore { +func newVPNFabricEndpointObservationStore(reporterNodeID ...string) *vpnFabricEndpointObservationStore { + nodeID := "" + if len(reporterNodeID) > 0 { + nodeID = strings.TrimSpace(reporterNodeID[0]) + } return &vpnFabricEndpointObservationStore{ - observations: map[string]mesh.EndpointCandidateHealthObservation{}, + reporterNodeID: nodeID, + observations: map[string]mesh.EndpointCandidateHealthObservation{}, } } @@ -533,6 +539,8 @@ func (s *vpnFabricEndpointObservationStore) ObserveSuccess(endpointID string, la defer s.mu.Unlock() observation := s.observations[endpointID] observation.EndpointID = endpointID + observation.Source = "local_vpn_fabric_session" + observation.ReporterNodeID = s.reporterNodeID observation.SuccessCount++ observation.LastLatencyMs = latency.Milliseconds() observation.ReliabilityScore = 100 @@ -550,6 +558,8 @@ func (s *vpnFabricEndpointObservationStore) ObserveFailure(endpointID string, re defer s.mu.Unlock() observation := s.observations[endpointID] observation.EndpointID = endpointID + observation.Source = "local_vpn_fabric_session" + observation.ReporterNodeID = s.reporterNodeID observation.FailureCount++ observation.LastFailureReason = strings.TrimSpace(reason) observation.ReliabilityScore = 35 @@ -1082,7 +1092,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), - VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(), + VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(identity.NodeID), PeerEndpoints: copyStringMap(peerEndpoints), PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates), PeerEndpointObservations: copyEndpointCandidateObservations(loadedConfig.PeerEndpointObservations), @@ -1983,7 +1993,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() } if meshState.VPNFabricEndpointObservations == nil { - meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore() + meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore(identity.NodeID) } meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates) @@ -2303,6 +2313,8 @@ func endpointCandidateObservationsFromControlPlane(observations map[string]clien } out[endpointID] = mesh.EndpointCandidateHealthObservation{ EndpointID: firstNonEmpty(strings.TrimSpace(item.EndpointID), endpointID), + Source: item.Source, + ReporterNodeID: item.ReporterNodeID, LastLatencyMs: item.LastLatencyMs, SuccessCount: item.SuccessCount, FailureCount: item.FailureCount, @@ -2948,7 +2960,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn if meshState != nil && meshState.VPNFabricEndpointObservations != nil { payload.Metadata["vpn_fabric_endpoint_health_report"] = meshState.VPNFabricEndpointObservations.Report(observedAt, maxVPNFabricEndpointHealthReportEntries) } else { - payload.Metadata["vpn_fabric_endpoint_health_report"] = newVPNFabricEndpointObservationStore().Report(observedAt, maxVPNFabricEndpointHealthReportEntries) + payload.Metadata["vpn_fabric_endpoint_health_report"] = newVPNFabricEndpointObservationStore(identity.NodeID).Report(observedAt, maxVPNFabricEndpointHealthReportEntries) } payload.Capabilities["vpn_fabric_endpoint_health_feedback"] = true } @@ -4953,7 +4965,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() } if meshState.VPNFabricEndpointObservations == nil { - meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore() + meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore(identity.NodeID) } meshState.VPNFabricSessionDialStats.Attempts.Add(1) if meshState.VPNFabricSessionPeers == nil { diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 77785ed..51db325 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -789,7 +789,7 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { } func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) { - store := newVPNFabricEndpointObservationStore() + store := newVPNFabricEndpointObservationStore("node-a") base := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) store.observations["old"] = mesh.EndpointCandidateHealthObservation{ EndpointID: "old", @@ -815,6 +815,16 @@ func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) } } +func TestVPNFabricEndpointObservationStoreTagsLocalSource(t *testing.T) { + store := newVPNFabricEndpointObservationStore("node-a") + store.ObserveFailure("endpoint-a", "session_open_failed") + snapshot := store.Snapshot() + observation := snapshot["endpoint-a"] + if observation.Source != "local_vpn_fabric_session" || observation.ReporterNodeID != "node-a" { + t.Fatalf("unexpected local observation source: %+v", observation) + } +} + func TestVPNFabricEndpointObservationStorePrunesOldAndExcessEntries(t *testing.T) { store := newVPNFabricEndpointObservationStore() now := time.Now().UTC() diff --git a/agents/rap-node-agent/internal/client/client.go b/agents/rap-node-agent/internal/client/client.go index f947050..706d32f 100644 --- a/agents/rap-node-agent/internal/client/client.go +++ b/agents/rap-node-agent/internal/client/client.go @@ -576,6 +576,8 @@ type PeerEndpointCandidate struct { type EndpointCandidateHealthObservation struct { EndpointID string `json:"endpoint_id"` + Source string `json:"source,omitempty"` + ReporterNodeID string `json:"reporter_node_id,omitempty"` LastLatencyMs int64 `json:"last_latency_ms,omitempty"` SuccessCount uint64 `json:"success_count,omitempty"` FailureCount uint64 `json:"failure_count,omitempty"` diff --git a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go index aed77eb..38426d2 100644 --- a/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go +++ b/agents/rap-node-agent/internal/mesh/endpoint_candidate_scoring.go @@ -17,6 +17,8 @@ type EndpointCandidateScoreOptions struct { type EndpointCandidateHealthObservation struct { EndpointID string `json:"endpoint_id"` + Source string `json:"source,omitempty"` + ReporterNodeID string `json:"reporter_node_id,omitempty"` LastLatencyMs int64 `json:"last_latency_ms,omitempty"` SuccessCount uint64 `json:"success_count,omitempty"` FailureCount uint64 `json:"failure_count,omitempty"` diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 80952f4..7647a30 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -342,6 +342,9 @@ unbounded candidate history. Scoped and control-plane synthetic mesh config can now carry `peer_endpoint_observations`, and VPN fabric-session endpoint ranking merges those remote health hints with local observations using the newest signal. +Endpoint health observations include source and reporter node fields so control +plane can distinguish local dial feedback from aggregated or policy-generated +health hints. Deliverables: