diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index b04b22d..463aace 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -45,6 +45,7 @@ const ( meshSyntheticConfigRefreshInterval = 20 * time.Second meshRouteHealthFeedbackRefreshBackoff = 5 * time.Second maxMeshRendezvousLeaseReportEntries = 20 + maxVPNFabricEndpointHealthReportEntries = 32 meshRendezvousLeaseReportSchema = "c17z18.mesh_rendezvous_lease_report.v1" meshRendezvousLeaseTelemetryCapability = "mesh_rendezvous_lease_telemetry" meshRendezvousLeaseRefreshCapability = "mesh_rendezvous_lease_refresh_contract" @@ -449,6 +450,42 @@ func (s *vpnFabricEndpointObservationStore) Snapshot() map[string]mesh.EndpointC return out } +func (s *vpnFabricEndpointObservationStore) Report(observedAt time.Time, maxEntries int) map[string]any { + snapshot := s.Snapshot() + if len(snapshot) == 0 { + return map[string]any{ + "schema_version": "rap.vpn_fabric_endpoint_health_report.v1", + "observed_at": observedAt.UTC().Format(time.RFC3339Nano), + "total": 0, + "reported": 0, + "dropped": 0, + "observations": []mesh.EndpointCandidateHealthObservation{}, + } + } + values := make([]mesh.EndpointCandidateHealthObservation, 0, len(snapshot)) + for _, observation := range snapshot { + values = append(values, observation) + } + sort.SliceStable(values, func(i, j int) bool { + if !values[i].ObservedAt.Equal(values[j].ObservedAt) { + return values[i].ObservedAt.After(values[j].ObservedAt) + } + return values[i].EndpointID < values[j].EndpointID + }) + if maxEntries <= 0 || maxEntries > len(values) { + maxEntries = len(values) + } + reported := values[:maxEntries] + return map[string]any{ + "schema_version": "rap.vpn_fabric_endpoint_health_report.v1", + "observed_at": observedAt.UTC().Format(time.RFC3339Nano), + "total": len(values), + "reported": len(reported), + "dropped": len(values) - len(reported), + "observations": reported, + } +} + func (s *vpnFabricEndpointObservationStore) ObserveSuccess(endpointID string, latency time.Duration) { if s == nil || strings.TrimSpace(endpointID) == "" { return @@ -2837,12 +2874,15 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn if meshState != nil && meshState.VPNFabricSessionDialStats != nil { report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) } - if meshState != nil && meshState.VPNFabricEndpointObservations != nil { - report["endpoint_observations"] = meshState.VPNFabricEndpointObservations.Snapshot() - } payload.Metadata["vpn_fabric_session_transport_report"] = report payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true + if meshState != nil && meshState.VPNFabricEndpointObservations != nil { + payload.Metadata["vpn_fabric_endpoint_health_report"] = meshState.VPNFabricEndpointObservations.Report(observedAt, maxVPNFabricEndpointHealthReportEntries) + } else { + payload.Metadata["vpn_fabric_endpoint_health_report"] = newVPNFabricEndpointObservationStore().Report(observedAt, maxVPNFabricEndpointHealthReportEntries) + } + payload.Capabilities["vpn_fabric_endpoint_health_feedback"] = true } if meshState != nil && meshState.ConfigLoadError != "" { payload.HealthStatus = "warning" diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 584be64..5836ce7 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -753,6 +753,13 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { report["peer_sessions"] == nil { t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata) } + if payload.Capabilities["vpn_fabric_endpoint_health_feedback"] != true { + t.Fatalf("vpn fabric endpoint health capability missing: %+v", payload.Capabilities) + } + if report, ok := payload.Metadata["vpn_fabric_endpoint_health_report"].(map[string]any); !ok || + report["schema_version"] != "rap.vpn_fabric_endpoint_health_report.v1" { + t.Fatalf("vpn fabric endpoint health report missing: %+v", payload.Metadata) + } } func TestVPNFabricSessionDialStatsReport(t *testing.T) { @@ -779,6 +786,33 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) { } } +func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) { + store := newVPNFabricEndpointObservationStore() + base := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + store.observations["old"] = mesh.EndpointCandidateHealthObservation{ + EndpointID: "old", + SuccessCount: 1, + ObservedAt: base.Add(-time.Minute), + } + store.observations["new"] = mesh.EndpointCandidateHealthObservation{ + EndpointID: "new", + FailureCount: 1, + ObservedAt: base, + ReliabilityScore: 35, + } + report := store.Report(base, 1) + if report["schema_version"] != "rap.vpn_fabric_endpoint_health_report.v1" || + report["total"] != 2 || + report["reported"] != 1 || + report["dropped"] != 1 { + t.Fatalf("unexpected report counters: %+v", report) + } + observations, ok := report["observations"].([]mesh.EndpointCandidateHealthObservation) + if !ok || len(observations) != 1 || observations[0].EndpointID != "new" { + t.Fatalf("unexpected observations: %+v", report["observations"]) + } +} + func TestVPNFabricSessionTargetPrefersRankedQUICCandidate(t *testing.T) { now := time.Now().UTC() target, ok := vpnFabricSessionTarget(&syntheticMeshState{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 8266f5a..0e2f565 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -330,6 +330,9 @@ that endpoint while preserving it as a later fallback. Endpoint scoring no longer treats missing/zero latency on failed observations as moderate latency, preventing failed candidates from receiving a false score bonus. +Endpoint health observations are now emitted as a bounded standalone heartbeat +report (`rap.vpn_fabric_endpoint_health_report.v1`) so control plane can ingest +candidate feedback without parsing the transport diagnostics blob. Deliverables: