Accept endpoint health from mesh config

This commit is contained in:
2026-05-16 11:17:07 +03:00
parent 53a5a457e3
commit 4516046a20
6 changed files with 225 additions and 57 deletions
@@ -370,6 +370,7 @@ type syntheticMeshState struct {
VPNFabricEndpointObservations *vpnFabricEndpointObservationStore
PeerEndpoints map[string]string
PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate
PeerEndpointObservations map[string]mesh.EndpointCandidateHealthObservation
VPNGateway *vpnruntime.Gateway
ServiceChannelAccessStats *fabricServiceChannelAccessStats
RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink
@@ -869,6 +870,7 @@ type meshRouteHealthFeedbackRefreshState struct {
type loadedSyntheticMeshConfig struct {
PeerEndpoints map[string]string
PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate
PeerEndpointObservations map[string]mesh.EndpointCandidateHealthObservation
PeerDirectory []mesh.PeerDirectoryEntry
RecoverySeeds []mesh.PeerRecoverySeed
RendezvousLeases []mesh.PeerRendezvousLease
@@ -895,13 +897,14 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
if err != nil {
log.Printf("synthetic mesh config load failed; starting diagnostics-only mesh state: %v", err)
loadedConfig = loadedSyntheticMeshConfig{
PeerEndpoints: map[string]string{},
PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{},
PeerDirectory: []mesh.PeerDirectoryEntry{},
RecoverySeeds: []mesh.PeerRecoverySeed{},
RendezvousLeases: []mesh.PeerRendezvousLease{},
Routes: []mesh.SyntheticRoute{},
Source: "config_load_failed",
PeerEndpoints: map[string]string{},
PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{},
PeerEndpointObservations: map[string]mesh.EndpointCandidateHealthObservation{},
PeerDirectory: []mesh.PeerDirectoryEntry{},
RecoverySeeds: []mesh.PeerRecoverySeed{},
RendezvousLeases: []mesh.PeerRendezvousLease{},
Routes: []mesh.SyntheticRoute{},
Source: "config_load_failed",
}
}
peerEndpoints := loadedConfig.PeerEndpoints
@@ -1082,6 +1085,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(),
PeerEndpoints: copyStringMap(peerEndpoints),
PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates),
PeerEndpointObservations: copyEndpointCandidateObservations(loadedConfig.PeerEndpointObservations),
VPNGateway: vpnGateway,
ServiceChannelAccessStats: serviceChannelAccessStats,
RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink,
@@ -1545,18 +1549,19 @@ func loadSyntheticMeshConfig(ctx context.Context, cfg config.Config, identity st
return loadedSyntheticMeshConfig{}, err
}
return loadedSyntheticMeshConfig{
PeerEndpoints: scoped.PeerEndpoints,
PeerEndpointCandidates: scoped.PeerEndpointCandidates,
PeerDirectory: scoped.PeerDirectory,
RecoverySeeds: scoped.RecoverySeeds,
RendezvousLeases: scoped.RendezvousLeases,
RoutePathDecisions: nil,
Routes: scoped.Routes,
Source: "scoped_config",
ConfigVersion: scoped.ConfigVersion,
PeerDirectoryVersion: scoped.PeerDirectoryVersion,
PolicyVersion: scoped.PolicyVersion,
ProductionForwarding: false,
PeerEndpoints: scoped.PeerEndpoints,
PeerEndpointCandidates: scoped.PeerEndpointCandidates,
PeerEndpointObservations: scoped.PeerEndpointObservations,
PeerDirectory: scoped.PeerDirectory,
RecoverySeeds: scoped.RecoverySeeds,
RendezvousLeases: scoped.RendezvousLeases,
RoutePathDecisions: nil,
Routes: scoped.Routes,
Source: "scoped_config",
ConfigVersion: scoped.ConfigVersion,
PeerDirectoryVersion: scoped.PeerDirectoryVersion,
PolicyVersion: scoped.PolicyVersion,
ProductionForwarding: false,
}, nil
}
if api != nil {
@@ -1570,6 +1575,7 @@ func loadSyntheticMeshConfig(ctx context.Context, cfg config.Config, identity st
return loadedSyntheticMeshConfig{
PeerEndpoints: remote.PeerEndpoints,
PeerEndpointCandidates: peerEndpointCandidatesFromControlPlane(remote.PeerEndpointCandidates),
PeerEndpointObservations: endpointCandidateObservationsFromControlPlane(remote.PeerEndpointObservations),
PeerDirectory: peerDirectoryFromControlPlane(remote.PeerDirectory),
RecoverySeeds: recoverySeedsFromControlPlane(remote.RecoverySeeds),
RendezvousLeases: rendezvousLeasesFromControlPlane(remote.RendezvousLeases),
@@ -1981,6 +1987,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
}
meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints)
meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates)
meshState.PeerEndpointObservations = copyEndpointCandidateObservations(loadedConfig.PeerEndpointObservations)
if productionForwardingEnabled {
meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints)
} else {
@@ -2287,6 +2294,26 @@ func peerEndpointCandidatesFromControlPlane(candidates map[string][]client.PeerE
return out
}
func endpointCandidateObservationsFromControlPlane(observations map[string]client.EndpointCandidateHealthObservation) map[string]mesh.EndpointCandidateHealthObservation {
out := make(map[string]mesh.EndpointCandidateHealthObservation, len(observations))
for endpointID, item := range observations {
endpointID = strings.TrimSpace(endpointID)
if endpointID == "" {
continue
}
out[endpointID] = mesh.EndpointCandidateHealthObservation{
EndpointID: firstNonEmpty(strings.TrimSpace(item.EndpointID), endpointID),
LastLatencyMs: item.LastLatencyMs,
SuccessCount: item.SuccessCount,
FailureCount: item.FailureCount,
LastFailureReason: item.LastFailureReason,
ReliabilityScore: item.ReliabilityScore,
ObservedAt: item.ObservedAt,
}
}
return out
}
func peerDirectoryFromControlPlane(entries []client.PeerDirectoryEntry) []mesh.PeerDirectoryEntry {
out := make([]mesh.PeerDirectoryEntry, 0, len(entries))
for _, item := range entries {
@@ -4508,6 +4535,17 @@ func copyPeerEndpointCandidatesMap(values map[string][]mesh.PeerEndpointCandidat
return out
}
func copyEndpointCandidateObservations(values map[string]mesh.EndpointCandidateHealthObservation) map[string]mesh.EndpointCandidateHealthObservation {
if len(values) == 0 {
return map[string]mesh.EndpointCandidateHealthObservation{}
}
out := make(map[string]mesh.EndpointCandidateHealthObservation, len(values))
for endpointID, observation := range values {
out[endpointID] = observation
}
return out
}
func minInt(left, right int) int {
if left < right {
return left
@@ -5005,7 +5043,7 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me
ChannelClass: mesh.SyntheticChannelFabricControl,
Now: time.Now().UTC(),
MaxVerificationAge: 5 * time.Minute,
Observations: meshState.VPNFabricEndpointObservations.Snapshot(),
Observations: mergedEndpointCandidateObservations(meshState.PeerEndpointObservations, meshState.VPNFabricEndpointObservations.Snapshot()),
MaxObservationAge: 5 * time.Minute,
})
for _, item := range ranked {
@@ -5036,6 +5074,23 @@ func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []me
return out
}
func mergedEndpointCandidateObservations(remote map[string]mesh.EndpointCandidateHealthObservation, local map[string]mesh.EndpointCandidateHealthObservation) map[string]mesh.EndpointCandidateHealthObservation {
if len(remote) == 0 && len(local) == 0 {
return nil
}
out := make(map[string]mesh.EndpointCandidateHealthObservation, len(remote)+len(local))
for endpointID, observation := range remote {
out[endpointID] = observation
}
for endpointID, observation := range local {
if existing, ok := out[endpointID]; ok && !observation.ObservedAt.IsZero() && !existing.ObservedAt.IsZero() && existing.ObservedAt.After(observation.ObservedAt) {
continue
}
out[endpointID] = observation
}
return out
}
func endpointCandidateTLSCertSHA256(candidate mesh.PeerEndpointCandidate) string {
if len(candidate.Metadata) == 0 {
return ""
@@ -977,6 +977,64 @@ func TestVPNFabricSessionTargetsUseLocalHealthObservations(t *testing.T) {
}
}
func TestVPNFabricSessionTargetsUseRemoteHealthObservations(t *testing.T) {
now := time.Now().UTC()
targets := vpnFabricSessionTargets(&syntheticMeshState{
PeerEndpointObservations: map[string]mesh.EndpointCandidateHealthObservation{
"node-b-quic": {
EndpointID: "node-b-quic",
FailureCount: 2,
LastFailureReason: "control_plane_session_open_failed",
ReliabilityScore: 35,
ObservedAt: now,
},
},
PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-quic",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://node-b.example.test:19443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 10,
LastVerifiedAt: &now,
},
{
EndpointID: "node-b-wss",
NodeID: "node-b",
Transport: "wss",
Address: "https://node-b.example.test:443",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 10,
LastVerifiedAt: &now,
},
},
},
}, "node-b")
if len(targets) != 2 || targets[0].EndpointID != "node-b-wss" {
t.Fatalf("targets did not apply remote health observations: %+v", targets)
}
}
func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) {
now := time.Now().UTC()
merged := mergedEndpointCandidateObservations(
map[string]mesh.EndpointCandidateHealthObservation{
"endpoint-a": {EndpointID: "endpoint-a", ReliabilityScore: 90, ObservedAt: now},
},
map[string]mesh.EndpointCandidateHealthObservation{
"endpoint-a": {EndpointID: "endpoint-a", ReliabilityScore: 35, ObservedAt: now.Add(-time.Minute)},
"endpoint-b": {EndpointID: "endpoint-b", ReliabilityScore: 80, ObservedAt: now},
},
)
if merged["endpoint-a"].ReliabilityScore != 90 || merged["endpoint-b"].ReliabilityScore != 80 {
t.Fatalf("unexpected merged observations: %+v", merged)
}
}
func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) {
now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC)
payload := heartbeatPayload(config.Config{
+36 -25
View File
@@ -265,31 +265,32 @@ type SyntheticMeshRouteConfig struct {
}
type SyntheticMeshConfig struct {
Raw json.RawMessage `json:"-"`
Enabled bool `json:"enabled"`
SchemaVersion string `json:"schema_version"`
ClusterID string `json:"cluster_id"`
LocalNodeID string `json:"local_node_id"`
AuthorityRequired bool `json:"authority_required"`
ClusterAuthority *ClusterAuthorityDescriptor `json:"cluster_authority,omitempty"`
AuthorityPayload json.RawMessage `json:"authority_payload,omitempty"`
AuthoritySignature *ClusterSignature `json:"authority_signature,omitempty"`
ConfigVersion string `json:"config_version,omitempty"`
PeerDirectoryVersion string `json:"peer_directory_version,omitempty"`
PolicyVersion string `json:"policy_version,omitempty"`
PeerEndpoints map[string]string `json:"peer_endpoints"`
PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"`
PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"`
RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"`
RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"`
RendezvousRelayPolicy *RendezvousRelayPolicyReport `json:"rendezvous_relay_policy,omitempty"`
RoutePathDecisions *RoutePathDecisionReport `json:"route_path_decisions,omitempty"`
ServiceChannelFeedback *FabricServiceChannelFeedbackReport `json:"service_channel_route_feedback,omitempty"`
ServiceChannelAdaptivePolicy *FabricServiceChannelAdaptivePolicy `json:"service_channel_adaptive_policy,omitempty"`
ServiceChannelRemediationCommands []FabricServiceChannelRemediationCommand `json:"service_channel_remediation_commands,omitempty"`
MeshListener *MeshListenerConfig `json:"mesh_listener,omitempty"`
Routes []SyntheticMeshRouteConfig `json:"routes"`
ProductionForwarding bool `json:"production_forwarding"`
Raw json.RawMessage `json:"-"`
Enabled bool `json:"enabled"`
SchemaVersion string `json:"schema_version"`
ClusterID string `json:"cluster_id"`
LocalNodeID string `json:"local_node_id"`
AuthorityRequired bool `json:"authority_required"`
ClusterAuthority *ClusterAuthorityDescriptor `json:"cluster_authority,omitempty"`
AuthorityPayload json.RawMessage `json:"authority_payload,omitempty"`
AuthoritySignature *ClusterSignature `json:"authority_signature,omitempty"`
ConfigVersion string `json:"config_version,omitempty"`
PeerDirectoryVersion string `json:"peer_directory_version,omitempty"`
PolicyVersion string `json:"policy_version,omitempty"`
PeerEndpoints map[string]string `json:"peer_endpoints"`
PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"`
PeerEndpointObservations map[string]EndpointCandidateHealthObservation `json:"peer_endpoint_observations,omitempty"`
PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"`
RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"`
RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"`
RendezvousRelayPolicy *RendezvousRelayPolicyReport `json:"rendezvous_relay_policy,omitempty"`
RoutePathDecisions *RoutePathDecisionReport `json:"route_path_decisions,omitempty"`
ServiceChannelFeedback *FabricServiceChannelFeedbackReport `json:"service_channel_route_feedback,omitempty"`
ServiceChannelAdaptivePolicy *FabricServiceChannelAdaptivePolicy `json:"service_channel_adaptive_policy,omitempty"`
ServiceChannelRemediationCommands []FabricServiceChannelRemediationCommand `json:"service_channel_remediation_commands,omitempty"`
MeshListener *MeshListenerConfig `json:"mesh_listener,omitempty"`
Routes []SyntheticMeshRouteConfig `json:"routes"`
ProductionForwarding bool `json:"production_forwarding"`
}
func (c *SyntheticMeshConfig) UnmarshalJSON(data []byte) error {
@@ -573,6 +574,16 @@ type PeerEndpointCandidate struct {
Metadata json.RawMessage `json:"metadata,omitempty"`
}
type EndpointCandidateHealthObservation struct {
EndpointID string `json:"endpoint_id"`
LastLatencyMs int64 `json:"last_latency_ms,omitempty"`
SuccessCount uint64 `json:"success_count,omitempty"`
FailureCount uint64 `json:"failure_count,omitempty"`
LastFailureReason string `json:"last_failure_reason,omitempty"`
ReliabilityScore int `json:"reliability_score,omitempty"`
ObservedAt time.Time `json:"observed_at,omitempty"`
}
func New(baseURL string) *Client {
return &Client{
baseURL: baseURL,
@@ -9,18 +9,19 @@ import (
)
type ScopedSyntheticConfig struct {
SchemaVersion string `json:"schema_version"`
ClusterID string `json:"cluster_id"`
LocalNodeID string `json:"local_node_id"`
ConfigVersion string `json:"config_version,omitempty"`
PeerDirectoryVersion string `json:"peer_directory_version,omitempty"`
PolicyVersion string `json:"policy_version,omitempty"`
PeerEndpoints map[string]string `json:"peer_endpoints"`
PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"`
PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"`
RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"`
RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"`
Routes []SyntheticRoute `json:"routes"`
SchemaVersion string `json:"schema_version"`
ClusterID string `json:"cluster_id"`
LocalNodeID string `json:"local_node_id"`
ConfigVersion string `json:"config_version,omitempty"`
PeerDirectoryVersion string `json:"peer_directory_version,omitempty"`
PolicyVersion string `json:"policy_version,omitempty"`
PeerEndpoints map[string]string `json:"peer_endpoints"`
PeerEndpointCandidates map[string][]PeerEndpointCandidate `json:"peer_endpoint_candidates,omitempty"`
PeerEndpointObservations map[string]EndpointCandidateHealthObservation `json:"peer_endpoint_observations,omitempty"`
PeerDirectory []PeerDirectoryEntry `json:"peer_directory,omitempty"`
RecoverySeeds []PeerRecoverySeed `json:"recovery_seeds,omitempty"`
RendezvousLeases []PeerRendezvousLease `json:"rendezvous_leases,omitempty"`
Routes []SyntheticRoute `json:"routes"`
}
type PeerDirectoryEntry struct {
@@ -122,6 +123,14 @@ func (cfg ScopedSyntheticConfig) Validate(local PeerIdentity) error {
}
}
}
for endpointID, observation := range cfg.PeerEndpointObservations {
if strings.TrimSpace(endpointID) == "" || strings.TrimSpace(observation.EndpointID) == "" || observation.EndpointID != endpointID {
return fmt.Errorf("scoped synthetic mesh config contains invalid peer endpoint observation")
}
if observation.ReliabilityScore < 0 || observation.ReliabilityScore > 100 {
return fmt.Errorf("scoped synthetic mesh config contains invalid peer endpoint observation reliability")
}
}
if err := validatePeerDirectory(cfg.PeerDirectory, cfg.LocalNodeID); err != nil {
return err
}
@@ -33,6 +33,15 @@ func TestLoadScopedSyntheticConfig(t *testing.T) {
},
},
},
PeerEndpointObservations: map[string]EndpointCandidateHealthObservation{
"node-b-public": {
EndpointID: "node-b-public",
LastLatencyMs: 42,
SuccessCount: 3,
ReliabilityScore: 95,
ObservedAt: expiresAt.Add(-time.Minute),
},
},
PeerDirectory: []PeerDirectoryEntry{
{
NodeID: "node-b",
@@ -81,6 +90,9 @@ func TestLoadScopedSyntheticConfig(t *testing.T) {
if got := cfg.PeerEndpointCandidates["node-b"]; len(got) != 1 || got[0].EndpointID != "node-b-public" {
t.Fatalf("unexpected endpoint candidates: %+v", cfg.PeerEndpointCandidates)
}
if got := cfg.PeerEndpointObservations["node-b-public"]; got.EndpointID != "node-b-public" || got.ReliabilityScore != 95 {
t.Fatalf("unexpected endpoint observations: %+v", cfg.PeerEndpointObservations)
}
if len(cfg.PeerDirectory) != 1 || cfg.PeerDirectory[0].NodeID != "node-b" || !cfg.PeerDirectory[0].RecoverySeed {
t.Fatalf("unexpected peer directory: %+v", cfg.PeerDirectory)
}
@@ -162,6 +174,26 @@ func TestLoadScopedSyntheticConfigRejectsInvalidPeerEndpointCandidate(t *testing
}
}
func TestLoadScopedSyntheticConfigRejectsInvalidPeerEndpointObservation(t *testing.T) {
path := writeScopedConfig(t, ScopedSyntheticConfig{
SchemaVersion: "c17f.synthetic.v1",
ClusterID: "cluster-1",
LocalNodeID: "node-a",
PeerEndpoints: map[string]string{},
PeerEndpointObservations: map[string]EndpointCandidateHealthObservation{
"endpoint-a": {
EndpointID: "endpoint-b",
ReliabilityScore: 101,
},
},
Routes: []SyntheticRoute{liveSyntheticRoute("route-a-b", []string{"node-a", "node-b"})},
})
_, err := LoadScopedSyntheticConfig(path, PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"})
if err == nil {
t.Fatal("expected invalid peer endpoint observation error")
}
}
func TestLoadScopedSyntheticConfigRejectsInvalidPeerDirectory(t *testing.T) {
path := writeScopedConfig(t, ScopedSyntheticConfig{
SchemaVersion: "c17f.synthetic.v1",