Report VPN fabric endpoint health

This commit is contained in:
2026-05-16 11:08:22 +03:00
parent 396d36d5a9
commit 68bce01c6f
3 changed files with 80 additions and 3 deletions
@@ -45,6 +45,7 @@ const (
meshSyntheticConfigRefreshInterval = 20 * time.Second
meshRouteHealthFeedbackRefreshBackoff = 5 * time.Second
maxMeshRendezvousLeaseReportEntries = 20
maxVPNFabricEndpointHealthReportEntries = 32
meshRendezvousLeaseReportSchema = "c17z18.mesh_rendezvous_lease_report.v1"
meshRendezvousLeaseTelemetryCapability = "mesh_rendezvous_lease_telemetry"
meshRendezvousLeaseRefreshCapability = "mesh_rendezvous_lease_refresh_contract"
@@ -449,6 +450,42 @@ func (s *vpnFabricEndpointObservationStore) Snapshot() map[string]mesh.EndpointC
return out
}
func (s *vpnFabricEndpointObservationStore) Report(observedAt time.Time, maxEntries int) map[string]any {
snapshot := s.Snapshot()
if len(snapshot) == 0 {
return map[string]any{
"schema_version": "rap.vpn_fabric_endpoint_health_report.v1",
"observed_at": observedAt.UTC().Format(time.RFC3339Nano),
"total": 0,
"reported": 0,
"dropped": 0,
"observations": []mesh.EndpointCandidateHealthObservation{},
}
}
values := make([]mesh.EndpointCandidateHealthObservation, 0, len(snapshot))
for _, observation := range snapshot {
values = append(values, observation)
}
sort.SliceStable(values, func(i, j int) bool {
if !values[i].ObservedAt.Equal(values[j].ObservedAt) {
return values[i].ObservedAt.After(values[j].ObservedAt)
}
return values[i].EndpointID < values[j].EndpointID
})
if maxEntries <= 0 || maxEntries > len(values) {
maxEntries = len(values)
}
reported := values[:maxEntries]
return map[string]any{
"schema_version": "rap.vpn_fabric_endpoint_health_report.v1",
"observed_at": observedAt.UTC().Format(time.RFC3339Nano),
"total": len(values),
"reported": len(reported),
"dropped": len(values) - len(reported),
"observations": reported,
}
}
func (s *vpnFabricEndpointObservationStore) ObserveSuccess(endpointID string, latency time.Duration) {
if s == nil || strings.TrimSpace(endpointID) == "" {
return
@@ -2837,12 +2874,15 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
if meshState != nil && meshState.VPNFabricSessionDialStats != nil {
report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt)
}
if meshState != nil && meshState.VPNFabricEndpointObservations != nil {
report["endpoint_observations"] = meshState.VPNFabricEndpointObservations.Snapshot()
}
payload.Metadata["vpn_fabric_session_transport_report"] = report
payload.Capabilities["vpn_fabric_session_transport"] = true
payload.Capabilities["vpn_packet_batch_binary_frames"] = true
if meshState != nil && meshState.VPNFabricEndpointObservations != nil {
payload.Metadata["vpn_fabric_endpoint_health_report"] = meshState.VPNFabricEndpointObservations.Report(observedAt, maxVPNFabricEndpointHealthReportEntries)
} else {
payload.Metadata["vpn_fabric_endpoint_health_report"] = newVPNFabricEndpointObservationStore().Report(observedAt, maxVPNFabricEndpointHealthReportEntries)
}
payload.Capabilities["vpn_fabric_endpoint_health_feedback"] = true
}
if meshState != nil && meshState.ConfigLoadError != "" {
payload.HealthStatus = "warning"
@@ -753,6 +753,13 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
report["peer_sessions"] == nil {
t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata)
}
if payload.Capabilities["vpn_fabric_endpoint_health_feedback"] != true {
t.Fatalf("vpn fabric endpoint health capability missing: %+v", payload.Capabilities)
}
if report, ok := payload.Metadata["vpn_fabric_endpoint_health_report"].(map[string]any); !ok ||
report["schema_version"] != "rap.vpn_fabric_endpoint_health_report.v1" {
t.Fatalf("vpn fabric endpoint health report missing: %+v", payload.Metadata)
}
}
func TestVPNFabricSessionDialStatsReport(t *testing.T) {
@@ -779,6 +786,33 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) {
}
}
func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) {
store := newVPNFabricEndpointObservationStore()
base := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)
store.observations["old"] = mesh.EndpointCandidateHealthObservation{
EndpointID: "old",
SuccessCount: 1,
ObservedAt: base.Add(-time.Minute),
}
store.observations["new"] = mesh.EndpointCandidateHealthObservation{
EndpointID: "new",
FailureCount: 1,
ObservedAt: base,
ReliabilityScore: 35,
}
report := store.Report(base, 1)
if report["schema_version"] != "rap.vpn_fabric_endpoint_health_report.v1" ||
report["total"] != 2 ||
report["reported"] != 1 ||
report["dropped"] != 1 {
t.Fatalf("unexpected report counters: %+v", report)
}
observations, ok := report["observations"].([]mesh.EndpointCandidateHealthObservation)
if !ok || len(observations) != 1 || observations[0].EndpointID != "new" {
t.Fatalf("unexpected observations: %+v", report["observations"])
}
}
func TestVPNFabricSessionTargetPrefersRankedQUICCandidate(t *testing.T) {
now := time.Now().UTC()
target, ok := vpnFabricSessionTarget(&syntheticMeshState{
@@ -330,6 +330,9 @@ that endpoint while preserving it as a later fallback.
Endpoint scoring no longer treats missing/zero latency on failed observations as
moderate latency, preventing failed candidates from receiving a false score
bonus.
Endpoint health observations are now emitted as a bounded standalone heartbeat
report (`rap.vpn_fabric_endpoint_health_report.v1`) so control plane can ingest
candidate feedback without parsing the transport diagnostics blob.
Deliverables: