Tag VPN fabric endpoint health sources

This commit is contained in:
2026-05-16 11:19:36 +03:00
parent 4516046a20
commit 0124913919
5 changed files with 38 additions and 9 deletions
@@ -426,17 +426,23 @@ type vpnFabricSessionDialStats struct {
} }
type vpnFabricEndpointObservationStore struct { type vpnFabricEndpointObservationStore struct {
mu sync.Mutex reporterNodeID string
observations map[string]mesh.EndpointCandidateHealthObservation mu sync.Mutex
observations map[string]mesh.EndpointCandidateHealthObservation
} }
func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats { func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats {
return &vpnFabricSessionDialStats{} return &vpnFabricSessionDialStats{}
} }
func newVPNFabricEndpointObservationStore() *vpnFabricEndpointObservationStore { func newVPNFabricEndpointObservationStore(reporterNodeID ...string) *vpnFabricEndpointObservationStore {
nodeID := ""
if len(reporterNodeID) > 0 {
nodeID = strings.TrimSpace(reporterNodeID[0])
}
return &vpnFabricEndpointObservationStore{ return &vpnFabricEndpointObservationStore{
observations: map[string]mesh.EndpointCandidateHealthObservation{}, reporterNodeID: nodeID,
observations: map[string]mesh.EndpointCandidateHealthObservation{},
} }
} }
@@ -533,6 +539,8 @@ func (s *vpnFabricEndpointObservationStore) ObserveSuccess(endpointID string, la
defer s.mu.Unlock() defer s.mu.Unlock()
observation := s.observations[endpointID] observation := s.observations[endpointID]
observation.EndpointID = endpointID observation.EndpointID = endpointID
observation.Source = "local_vpn_fabric_session"
observation.ReporterNodeID = s.reporterNodeID
observation.SuccessCount++ observation.SuccessCount++
observation.LastLatencyMs = latency.Milliseconds() observation.LastLatencyMs = latency.Milliseconds()
observation.ReliabilityScore = 100 observation.ReliabilityScore = 100
@@ -550,6 +558,8 @@ func (s *vpnFabricEndpointObservationStore) ObserveFailure(endpointID string, re
defer s.mu.Unlock() defer s.mu.Unlock()
observation := s.observations[endpointID] observation := s.observations[endpointID]
observation.EndpointID = endpointID observation.EndpointID = endpointID
observation.Source = "local_vpn_fabric_session"
observation.ReporterNodeID = s.reporterNodeID
observation.FailureCount++ observation.FailureCount++
observation.LastFailureReason = strings.TrimSpace(reason) observation.LastFailureReason = strings.TrimSpace(reason)
observation.ReliabilityScore = 35 observation.ReliabilityScore = 35
@@ -1082,7 +1092,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricSessionPeers: vpnFabricSessionPeers,
VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers),
VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), VPNFabricSessionDialStats: newVPNFabricSessionDialStats(),
VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(), VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(identity.NodeID),
PeerEndpoints: copyStringMap(peerEndpoints), PeerEndpoints: copyStringMap(peerEndpoints),
PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates), PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates),
PeerEndpointObservations: copyEndpointCandidateObservations(loadedConfig.PeerEndpointObservations), PeerEndpointObservations: copyEndpointCandidateObservations(loadedConfig.PeerEndpointObservations),
@@ -1983,7 +1993,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats()
} }
if meshState.VPNFabricEndpointObservations == nil { if meshState.VPNFabricEndpointObservations == nil {
meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore() meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore(identity.NodeID)
} }
meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints)
meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates) meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates)
@@ -2303,6 +2313,8 @@ func endpointCandidateObservationsFromControlPlane(observations map[string]clien
} }
out[endpointID] = mesh.EndpointCandidateHealthObservation{ out[endpointID] = mesh.EndpointCandidateHealthObservation{
EndpointID: firstNonEmpty(strings.TrimSpace(item.EndpointID), endpointID), EndpointID: firstNonEmpty(strings.TrimSpace(item.EndpointID), endpointID),
Source: item.Source,
ReporterNodeID: item.ReporterNodeID,
LastLatencyMs: item.LastLatencyMs, LastLatencyMs: item.LastLatencyMs,
SuccessCount: item.SuccessCount, SuccessCount: item.SuccessCount,
FailureCount: item.FailureCount, FailureCount: item.FailureCount,
@@ -2948,7 +2960,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
if meshState != nil && meshState.VPNFabricEndpointObservations != nil { if meshState != nil && meshState.VPNFabricEndpointObservations != nil {
payload.Metadata["vpn_fabric_endpoint_health_report"] = meshState.VPNFabricEndpointObservations.Report(observedAt, maxVPNFabricEndpointHealthReportEntries) payload.Metadata["vpn_fabric_endpoint_health_report"] = meshState.VPNFabricEndpointObservations.Report(observedAt, maxVPNFabricEndpointHealthReportEntries)
} else { } else {
payload.Metadata["vpn_fabric_endpoint_health_report"] = newVPNFabricEndpointObservationStore().Report(observedAt, maxVPNFabricEndpointHealthReportEntries) payload.Metadata["vpn_fabric_endpoint_health_report"] = newVPNFabricEndpointObservationStore(identity.NodeID).Report(observedAt, maxVPNFabricEndpointHealthReportEntries)
} }
payload.Capabilities["vpn_fabric_endpoint_health_feedback"] = true payload.Capabilities["vpn_fabric_endpoint_health_feedback"] = true
} }
@@ -4953,7 +4965,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats()
} }
if meshState.VPNFabricEndpointObservations == nil { if meshState.VPNFabricEndpointObservations == nil {
meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore() meshState.VPNFabricEndpointObservations = newVPNFabricEndpointObservationStore(identity.NodeID)
} }
meshState.VPNFabricSessionDialStats.Attempts.Add(1) meshState.VPNFabricSessionDialStats.Attempts.Add(1)
if meshState.VPNFabricSessionPeers == nil { if meshState.VPNFabricSessionPeers == nil {
@@ -789,7 +789,7 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) {
} }
func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) { func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) {
store := newVPNFabricEndpointObservationStore() store := newVPNFabricEndpointObservationStore("node-a")
base := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) base := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)
store.observations["old"] = mesh.EndpointCandidateHealthObservation{ store.observations["old"] = mesh.EndpointCandidateHealthObservation{
EndpointID: "old", EndpointID: "old",
@@ -815,6 +815,16 @@ func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T)
} }
} }
func TestVPNFabricEndpointObservationStoreTagsLocalSource(t *testing.T) {
store := newVPNFabricEndpointObservationStore("node-a")
store.ObserveFailure("endpoint-a", "session_open_failed")
snapshot := store.Snapshot()
observation := snapshot["endpoint-a"]
if observation.Source != "local_vpn_fabric_session" || observation.ReporterNodeID != "node-a" {
t.Fatalf("unexpected local observation source: %+v", observation)
}
}
func TestVPNFabricEndpointObservationStorePrunesOldAndExcessEntries(t *testing.T) { func TestVPNFabricEndpointObservationStorePrunesOldAndExcessEntries(t *testing.T) {
store := newVPNFabricEndpointObservationStore() store := newVPNFabricEndpointObservationStore()
now := time.Now().UTC() now := time.Now().UTC()
@@ -576,6 +576,8 @@ type PeerEndpointCandidate struct {
type EndpointCandidateHealthObservation struct { type EndpointCandidateHealthObservation struct {
EndpointID string `json:"endpoint_id"` EndpointID string `json:"endpoint_id"`
Source string `json:"source,omitempty"`
ReporterNodeID string `json:"reporter_node_id,omitempty"`
LastLatencyMs int64 `json:"last_latency_ms,omitempty"` LastLatencyMs int64 `json:"last_latency_ms,omitempty"`
SuccessCount uint64 `json:"success_count,omitempty"` SuccessCount uint64 `json:"success_count,omitempty"`
FailureCount uint64 `json:"failure_count,omitempty"` FailureCount uint64 `json:"failure_count,omitempty"`
@@ -17,6 +17,8 @@ type EndpointCandidateScoreOptions struct {
type EndpointCandidateHealthObservation struct { type EndpointCandidateHealthObservation struct {
EndpointID string `json:"endpoint_id"` EndpointID string `json:"endpoint_id"`
Source string `json:"source,omitempty"`
ReporterNodeID string `json:"reporter_node_id,omitempty"`
LastLatencyMs int64 `json:"last_latency_ms,omitempty"` LastLatencyMs int64 `json:"last_latency_ms,omitempty"`
SuccessCount uint64 `json:"success_count,omitempty"` SuccessCount uint64 `json:"success_count,omitempty"`
FailureCount uint64 `json:"failure_count,omitempty"` FailureCount uint64 `json:"failure_count,omitempty"`
@@ -342,6 +342,9 @@ unbounded candidate history.
Scoped and control-plane synthetic mesh config can now carry Scoped and control-plane synthetic mesh config can now carry
`peer_endpoint_observations`, and VPN fabric-session endpoint ranking merges `peer_endpoint_observations`, and VPN fabric-session endpoint ranking merges
those remote health hints with local observations using the newest signal. those remote health hints with local observations using the newest signal.
Endpoint health observations include source and reporter node fields so control
plane can distinguish local dial feedback from aggregated or policy-generated
health hints.
Deliverables: Deliverables: