diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 940de69..dcdd869 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -370,6 +370,7 @@ type syntheticMeshState struct { VPNFabricEndpointObservations *vpnFabricEndpointObservationStore PeerEndpoints map[string]string PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate + PeerEndpointObservations map[string]mesh.EndpointCandidateHealthObservation VPNGateway *vpnruntime.Gateway ServiceChannelAccessStats *fabricServiceChannelAccessStats RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink @@ -869,6 +870,7 @@ type meshRouteHealthFeedbackRefreshState struct { type loadedSyntheticMeshConfig struct { PeerEndpoints map[string]string PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate + PeerEndpointObservations map[string]mesh.EndpointCandidateHealthObservation PeerDirectory []mesh.PeerDirectoryEntry RecoverySeeds []mesh.PeerRecoverySeed RendezvousLeases []mesh.PeerRendezvousLease @@ -895,13 +897,14 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c if err != nil { log.Printf("synthetic mesh config load failed; starting diagnostics-only mesh state: %v", err) loadedConfig = loadedSyntheticMeshConfig{ - PeerEndpoints: map[string]string{}, - PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{}, - PeerDirectory: []mesh.PeerDirectoryEntry{}, - RecoverySeeds: []mesh.PeerRecoverySeed{}, - RendezvousLeases: []mesh.PeerRendezvousLease{}, - Routes: []mesh.SyntheticRoute{}, - Source: "config_load_failed", + PeerEndpoints: map[string]string{}, + PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{}, + PeerEndpointObservations: map[string]mesh.EndpointCandidateHealthObservation{}, + PeerDirectory: []mesh.PeerDirectoryEntry{}, + RecoverySeeds: []mesh.PeerRecoverySeed{}, + RendezvousLeases: []mesh.PeerRendezvousLease{}, + Routes: []mesh.SyntheticRoute{}, + Source: "config_load_failed", } } peerEndpoints := loadedConfig.PeerEndpoints @@ -1082,6 +1085,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(), PeerEndpoints: copyStringMap(peerEndpoints), PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates), + PeerEndpointObservations: copyEndpointCandidateObservations(loadedConfig.PeerEndpointObservations), VPNGateway: vpnGateway, ServiceChannelAccessStats: serviceChannelAccessStats, RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink, @@ -1545,18 +1549,19 @@ func loadSyntheticMeshConfig(ctx context.Context, cfg config.Config, identity st return loadedSyntheticMeshConfig{}, err } return loadedSyntheticMeshConfig{ - PeerEndpoints: scoped.PeerEndpoints, - PeerEndpointCandidates: scoped.PeerEndpointCandidates, - PeerDirectory: scoped.PeerDirectory, - RecoverySeeds: scoped.RecoverySeeds, - RendezvousLeases: scoped.RendezvousLeases, - RoutePathDecisions: nil, - Routes: scoped.Routes, - Source: "scoped_config", - ConfigVersion: scoped.ConfigVersion, - PeerDirectoryVersion: scoped.PeerDirectoryVersion, - PolicyVersion: scoped.PolicyVersion, - ProductionForwarding: false, + PeerEndpoints: scoped.PeerEndpoints, + PeerEndpointCandidates: scoped.PeerEndpointCandidates, + PeerEndpointObservations: scoped.PeerEndpointObservations, + PeerDirectory: scoped.PeerDirectory, + RecoverySeeds: scoped.RecoverySeeds, + RendezvousLeases: scoped.RendezvousLeases, + RoutePathDecisions: nil, + Routes: scoped.Routes, + Source: "scoped_config", + ConfigVersion: scoped.ConfigVersion, + PeerDirectoryVersion: scoped.PeerDirectoryVersion, + PolicyVersion: scoped.PolicyVersion, + ProductionForwarding: false, }, nil } if api != nil { @@ -1570,6 +1575,7 @@ func loadSyntheticMeshConfig(ctx context.Context, cfg config.Config, identity st return loadedSyntheticMeshConfig{ PeerEndpoints: remote.PeerEndpoints, PeerEndpointCandidates: peerEndpointCandidatesFromControlPlane(remote.PeerEndpointCandidates), + PeerEndpointObservations: endpointCandidateObservationsFromControlPlane(remote.PeerEndpointObservations), PeerDirectory: peerDirectoryFromControlPlane(remote.PeerDirectory), RecoverySeeds: recoverySeedsFromControlPlane(remote.RecoverySeeds), RendezvousLeases: rendezvousLeasesFromControlPlane(remote.RendezvousLeases), @@ -1981,6 +1987,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i } meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates) + meshState.PeerEndpointObservations = copyEndpointCandidateObservations(loadedConfig.PeerEndpointObservations) if productionForwardingEnabled { meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints) } else { @@ -2287,6 +2294,26 @@ func peerEndpointCandidatesFromControlPlane(candidates map[string][]client.PeerE return out } +func endpointCandidateObservationsFromControlPlane(observations map[string]client.EndpointCandidateHealthObservation) map[string]mesh.EndpointCandidateHealthObservation { + out := make(map[string]mesh.EndpointCandidateHealthObservation, len(observations)) + for endpointID, item := range observations { + endpointID = strings.TrimSpace(endpointID) + if endpointID == "" { + continue + } + out[endpointID] = mesh.EndpointCandidateHealthObservation{ + EndpointID: firstNonEmpty(strings.TrimSpace(item.EndpointID), endpointID), + LastLatencyMs: item.LastLatencyMs, + SuccessCount: item.SuccessCount, + FailureCount: item.FailureCount, + LastFailureReason: item.LastFailureReason, + ReliabilityScore: item.ReliabilityScore, + ObservedAt: item.ObservedAt, + } + } + return out +} + func peerDirectoryFromControlPlane(entries []client.PeerDirectoryEntry) []mesh.PeerDirectoryEntry { out := make([]mesh.PeerDirectoryEntry, 0, len(entries)) for _, item := range entries { @@ -4508,6 +4535,17 @@ func copyPeerEndpointCandidatesMap(values map[string][]mesh.PeerEndpointCandidat return out } +func copyEndpointCandidateObservations(values map[string]mesh.EndpointCandidateHealthObservation) map[string]mesh.EndpointCandidateHealthObservation { + if len(values) == 0 { + return map[string]mesh.EndpointCandidateHealthObservation{} + } + out := make(map[string]mesh.EndpointCandidateHealthObservation, len(values)) + for endpointID, observation := range values { + out[endpointID] = observation + } + return out +} + func minInt(left, right int) int { if left < right { return left @@ -5005,7 +5043,7 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me ChannelClass: mesh.SyntheticChannelFabricControl, Now: time.Now().UTC(), MaxVerificationAge: 5 * time.Minute, - Observations: meshState.VPNFabricEndpointObservations.Snapshot(), + Observations: mergedEndpointCandidateObservations(meshState.PeerEndpointObservations, meshState.VPNFabricEndpointObservations.Snapshot()), MaxObservationAge: 5 * time.Minute, }) for _, item := range ranked { @@ -5036,6 +5074,23 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me return out } +func mergedEndpointCandidateObservations(remote map[string]mesh.EndpointCandidateHealthObservation, local map[string]mesh.EndpointCandidateHealthObservation) map[string]mesh.EndpointCandidateHealthObservation { + if len(remote) == 0 && len(local) == 0 { + return nil + } + out := make(map[string]mesh.EndpointCandidateHealthObservation, len(remote)+len(local)) + for endpointID, observation := range remote { + out[endpointID] = observation + } + for endpointID, observation := range local { + if existing, ok := out[endpointID]; ok && !observation.ObservedAt.IsZero() && !existing.ObservedAt.IsZero() && existing.ObservedAt.After(observation.ObservedAt) { + continue + } + out[endpointID] = observation + } + return out +} + func endpointCandidateTLSCertSHA256(candidate mesh.PeerEndpointCandidate) string { if len(candidate.Metadata) == 0 { return "" diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 535595e..77785ed 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -977,6 +977,64 @@ func TestVPNFabricSessionTargetsUseLocalHealthObservations(t *testing.T) { } } +func TestVPNFabricSessionTargetsUseRemoteHealthObservations(t *testing.T) { + now := time.Now().UTC() + targets := vpnFabricSessionTargets(&syntheticMeshState{ + PeerEndpointObservations: map[string]mesh.EndpointCandidateHealthObservation{ + "node-b-quic": { + EndpointID: "node-b-quic", + FailureCount: 2, + LastFailureReason: "control_plane_session_open_failed", + ReliabilityScore: 35, + ObservedAt: now, + }, + }, + PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{ + "node-b": { + { + EndpointID: "node-b-quic", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-wss", + NodeID: "node-b", + Transport: "wss", + Address: "https://node-b.example.test:443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + }, + }, + }, "node-b") + if len(targets) != 2 || targets[0].EndpointID != "node-b-wss" { + t.Fatalf("targets did not apply remote health observations: %+v", targets) + } +} + +func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { + now := time.Now().UTC() + merged := mergedEndpointCandidateObservations( + map[string]mesh.EndpointCandidateHealthObservation{ + "endpoint-a": {EndpointID: "endpoint-a", ReliabilityScore: 90, ObservedAt: now}, + }, + map[string]mesh.EndpointCandidateHealthObservation{ + "endpoint-a": {EndpointID: "endpoint-a", ReliabilityScore: 35, ObservedAt: now.Add(-time.Minute)}, + "endpoint-b": {EndpointID: "endpoint-b", ReliabilityScore: 80, ObservedAt: now}, + }, + ) + if merged["endpoint-a"].ReliabilityScore != 90 || merged["endpoint-b"].ReliabilityScore != 80 { + t.Fatalf("unexpected merged observations: %+v", merged) + } +} + func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) { now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC) payload := heartbeatPayload(config.Config{ diff --git a/agents/rap-node-agent/internal/client/client.go b/agents/rap-node-agent/internal/client/client.go index c153ff4..f947050 100644 --- a/agents/rap-node-agent/internal/client/client.go +++ b/agents/rap-node-agent/internal/client/client.go @@ -265,31 +265,32 @@ type SyntheticMeshRouteConfig struct { } type SyntheticMeshConfig struct { - Raw json.RawMessage `json:"-"` - Enabled bool `json:"enabled"` - SchemaVersion string `json:"schema_version"` - ClusterID string `json:"cluster_id"` - LocalNodeID string `json:"local_node_id"` - AuthorityRequired bool `json:"authority_required"` - ClusterAuthority *ClusterAuthorityDescriptor `json:"cluster_authority,omitempty"` - AuthorityPayload json.RawMessage `json:"authority_payload,omitempty"` - AuthoritySignature *ClusterSignature `json:"authority_signature,omitempty"` - ConfigVersion string `json:"config_version,omitempty"` - PeerDirectoryVersion string `json:"peer_directory_version,omitempty"` - PolicyVersion string `json:"policy_version,omitempty"` - PeerEndpoints map[string]string `json:"peer_endpoints"` - PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"` - PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"` - RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"` - RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"` - RendezvousRelayPolicy *RendezvousRelayPolicyReport `json:"rendezvous_relay_policy,omitempty"` - RoutePathDecisions *RoutePathDecisionReport `json:"route_path_decisions,omitempty"` - ServiceChannelFeedback *FabricServiceChannelFeedbackReport `json:"service_channel_route_feedback,omitempty"` - ServiceChannelAdaptivePolicy *FabricServiceChannelAdaptivePolicy `json:"service_channel_adaptive_policy,omitempty"` - ServiceChannelRemediationCommands []FabricServiceChannelRemediationCommand `json:"service_channel_remediation_commands,omitempty"` - MeshListener *MeshListenerConfig `json:"mesh_listener,omitempty"` - Routes []SyntheticMeshRouteConfig `json:"routes"` - ProductionForwarding bool `json:"production_forwarding"` + Raw json.RawMessage `json:"-"` + Enabled bool `json:"enabled"` + SchemaVersion string `json:"schema_version"` + ClusterID string `json:"cluster_id"` + LocalNodeID string `json:"local_node_id"` + AuthorityRequired bool `json:"authority_required"` + ClusterAuthority *ClusterAuthorityDescriptor `json:"cluster_authority,omitempty"` + AuthorityPayload json.RawMessage `json:"authority_payload,omitempty"` + AuthoritySignature *ClusterSignature `json:"authority_signature,omitempty"` + ConfigVersion string `json:"config_version,omitempty"` + PeerDirectoryVersion string `json:"peer_directory_version,omitempty"` + PolicyVersion string `json:"policy_version,omitempty"` + PeerEndpoints map[string]string `json:"peer_endpoints"` + PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"` + PeerEndpointObservations map[string]EndpointCandidateHealthObservation `json:"peer_endpoint_observations,omitempty"` + PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"` + RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"` + RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"` + RendezvousRelayPolicy *RendezvousRelayPolicyReport `json:"rendezvous_relay_policy,omitempty"` + RoutePathDecisions *RoutePathDecisionReport `json:"route_path_decisions,omitempty"` + ServiceChannelFeedback *FabricServiceChannelFeedbackReport `json:"service_channel_route_feedback,omitempty"` + ServiceChannelAdaptivePolicy *FabricServiceChannelAdaptivePolicy `json:"service_channel_adaptive_policy,omitempty"` + ServiceChannelRemediationCommands []FabricServiceChannelRemediationCommand `json:"service_channel_remediation_commands,omitempty"` + MeshListener *MeshListenerConfig `json:"mesh_listener,omitempty"` + Routes []SyntheticMeshRouteConfig `json:"routes"` + ProductionForwarding bool `json:"production_forwarding"` } func (c *SyntheticMeshConfig) UnmarshalJSON(data []byte) error { @@ -573,6 +574,16 @@ type PeerEndpointCandidate struct { Metadata json.RawMessage `json:"metadata,omitempty"` } +type EndpointCandidateHealthObservation struct { + EndpointID string `json:"endpoint_id"` + LastLatencyMs int64 `json:"last_latency_ms,omitempty"` + SuccessCount uint64 `json:"success_count,omitempty"` + FailureCount uint64 `json:"failure_count,omitempty"` + LastFailureReason string `json:"last_failure_reason,omitempty"` + ReliabilityScore int `json:"reliability_score,omitempty"` + ObservedAt time.Time `json:"observed_at,omitempty"` +} + func New(baseURL string) *Client { return &Client{ baseURL: baseURL, diff --git a/agents/rap-node-agent/internal/mesh/scoped_config.go b/agents/rap-node-agent/internal/mesh/scoped_config.go index f446403..663f7ff 100644 --- a/agents/rap-node-agent/internal/mesh/scoped_config.go +++ b/agents/rap-node-agent/internal/mesh/scoped_config.go @@ -9,18 +9,19 @@ import ( ) type ScopedSyntheticConfig struct { - SchemaVersion string `json:"schema_version"` - ClusterID string `json:"cluster_id"` - LocalNodeID string `json:"local_node_id"` - ConfigVersion string `json:"config_version,omitempty"` - PeerDirectoryVersion string `json:"peer_directory_version,omitempty"` - PolicyVersion string `json:"policy_version,omitempty"` - PeerEndpoints map[string]string `json:"peer_endpoints"` - PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"` - PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"` - RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"` - RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"` - Routes []SyntheticRoute `json:"routes"` + SchemaVersion string `json:"schema_version"` + ClusterID string `json:"cluster_id"` + LocalNodeID string `json:"local_node_id"` + ConfigVersion string `json:"config_version,omitempty"` + PeerDirectoryVersion string `json:"peer_directory_version,omitempty"` + PolicyVersion string `json:"policy_version,omitempty"` + PeerEndpoints map[string]string `json:"peer_endpoints"` + PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"` + PeerEndpointObservations map[string]EndpointCandidateHealthObservation `json:"peer_endpoint_observations,omitempty"` + PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"` + RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"` + RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"` + Routes []SyntheticRoute `json:"routes"` } type PeerDirectoryEntry struct { @@ -122,6 +123,14 @@ func (cfg ScopedSyntheticConfig) Validate(local PeerIdentity) error { } } } + for endpointID, observation := range cfg.PeerEndpointObservations { + if strings.TrimSpace(endpointID) == "" || strings.TrimSpace(observation.EndpointID) == "" || observation.EndpointID != endpointID { + return fmt.Errorf("scoped synthetic mesh config contains invalid peer endpoint observation") + } + if observation.ReliabilityScore < 0 || observation.ReliabilityScore > 100 { + return fmt.Errorf("scoped synthetic mesh config contains invalid peer endpoint observation reliability") + } + } if err := validatePeerDirectory(cfg.PeerDirectory, cfg.LocalNodeID); err != nil { return err } diff --git a/agents/rap-node-agent/internal/mesh/scoped_config_test.go b/agents/rap-node-agent/internal/mesh/scoped_config_test.go index 6809ec8..d775cc5 100644 --- a/agents/rap-node-agent/internal/mesh/scoped_config_test.go +++ b/agents/rap-node-agent/internal/mesh/scoped_config_test.go @@ -33,6 +33,15 @@ func TestLoadScopedSyntheticConfig(t *testing.T) { }, }, }, + PeerEndpointObservations: map[string]EndpointCandidateHealthObservation{ + "node-b-public": { + EndpointID: "node-b-public", + LastLatencyMs: 42, + SuccessCount: 3, + ReliabilityScore: 95, + ObservedAt: expiresAt.Add(-time.Minute), + }, + }, PeerDirectory: []PeerDirectoryEntry{ { NodeID: "node-b", @@ -81,6 +90,9 @@ func TestLoadScopedSyntheticConfig(t *testing.T) { if got := cfg.PeerEndpointCandidates["node-b"]; len(got) != 1 || got[0].EndpointID != "node-b-public" { t.Fatalf("unexpected endpoint candidates: %+v", cfg.PeerEndpointCandidates) } + if got := cfg.PeerEndpointObservations["node-b-public"]; got.EndpointID != "node-b-public" || got.ReliabilityScore != 95 { + t.Fatalf("unexpected endpoint observations: %+v", cfg.PeerEndpointObservations) + } if len(cfg.PeerDirectory) != 1 || cfg.PeerDirectory[0].NodeID != "node-b" || !cfg.PeerDirectory[0].RecoverySeed { t.Fatalf("unexpected peer directory: %+v", cfg.PeerDirectory) } @@ -162,6 +174,26 @@ func TestLoadScopedSyntheticConfigRejectsInvalidPeerEndpointCandidate(t *testing } } +func TestLoadScopedSyntheticConfigRejectsInvalidPeerEndpointObservation(t *testing.T) { + path := writeScopedConfig(t, ScopedSyntheticConfig{ + SchemaVersion: "c17f.synthetic.v1", + ClusterID: "cluster-1", + LocalNodeID: "node-a", + PeerEndpoints: map[string]string{}, + PeerEndpointObservations: map[string]EndpointCandidateHealthObservation{ + "endpoint-a": { + EndpointID: "endpoint-b", + ReliabilityScore: 101, + }, + }, + Routes: []SyntheticRoute{liveSyntheticRoute("route-a-b", []string{"node-a", "node-b"})}, + }) + _, err := LoadScopedSyntheticConfig(path, PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}) + if err == nil { + t.Fatal("expected invalid peer endpoint observation error") + } +} + func TestLoadScopedSyntheticConfigRejectsInvalidPeerDirectory(t *testing.T) { path := writeScopedConfig(t, ScopedSyntheticConfig{ SchemaVersion: "c17f.synthetic.v1", diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 1cbef20..80952f4 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -339,6 +339,9 @@ carriers instead of describing the dataplane as WebSocket-only. Endpoint health observations are pruned in-memory by age and count before snapshot/report generation, preventing long-running nodes from accumulating unbounded candidate history. +Scoped and control-plane synthetic mesh config can now carry +`peer_endpoint_observations`, and VPN fabric-session endpoint ranking merges +those remote health hints with local observations using the newest signal. Deliverables: