diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 2dde72a..940de69 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -46,6 +46,8 @@ const ( meshRouteHealthFeedbackRefreshBackoff = 5 * time.Second maxMeshRendezvousLeaseReportEntries = 20 maxVPNFabricEndpointHealthReportEntries = 32 + maxVPNFabricEndpointObservationEntries = 256 + vpnFabricEndpointObservationMaxAge = 30 * time.Minute meshRendezvousLeaseReportSchema = "c17z18.mesh_rendezvous_lease_report.v1" meshRendezvousLeaseTelemetryCapability = "mesh_rendezvous_lease_telemetry" meshRendezvousLeaseRefreshCapability = "mesh_rendezvous_lease_refresh_contract" @@ -443,6 +445,7 @@ func (s *vpnFabricEndpointObservationStore) Snapshot() map[string]mesh.EndpointC } s.mu.Lock() defer s.mu.Unlock() + s.pruneLocked(time.Now().UTC(), vpnFabricEndpointObservationMaxAge, maxVPNFabricEndpointObservationEntries) out := make(map[string]mesh.EndpointCandidateHealthObservation, len(s.observations)) for key, value := range s.observations { out[key] = value @@ -450,6 +453,41 @@ func (s *vpnFabricEndpointObservationStore) Snapshot() map[string]mesh.EndpointC return out } +func (s *vpnFabricEndpointObservationStore) pruneLocked(now time.Time, maxAge time.Duration, maxEntries int) { + if s == nil || len(s.observations) == 0 { + return + } + if !now.IsZero() && maxAge > 0 { + for endpointID, observation := range s.observations { + if !observation.ObservedAt.IsZero() && now.Sub(observation.ObservedAt.UTC()) > maxAge { + delete(s.observations, endpointID) + } + } + } + if maxEntries <= 0 || len(s.observations) <= maxEntries { + return + } + values := make([]mesh.EndpointCandidateHealthObservation, 0, len(s.observations)) + for _, observation := range s.observations { + values = append(values, observation) + } + sort.SliceStable(values, func(i, j int) bool { + if !values[i].ObservedAt.Equal(values[j].ObservedAt) { + return values[i].ObservedAt.After(values[j].ObservedAt) + } + return values[i].EndpointID < values[j].EndpointID + }) + keep := make(map[string]struct{}, maxEntries) + for _, observation := range values[:maxEntries] { + keep[observation.EndpointID] = struct{}{} + } + for endpointID := range s.observations { + if _, ok := keep[endpointID]; !ok { + delete(s.observations, endpointID) + } + } +} + func (s *vpnFabricEndpointObservationStore) Report(observedAt time.Time, maxEntries int) map[string]any { snapshot := s.Snapshot() if len(snapshot) == 0 { @@ -500,6 +538,7 @@ func (s *vpnFabricEndpointObservationStore) ObserveSuccess(endpointID string, la observation.LastFailureReason = "" observation.ObservedAt = time.Now().UTC() s.observations[endpointID] = observation + s.pruneLocked(observation.ObservedAt, vpnFabricEndpointObservationMaxAge, maxVPNFabricEndpointObservationEntries) } func (s *vpnFabricEndpointObservationStore) ObserveFailure(endpointID string, reason string) { @@ -515,6 +554,7 @@ func (s *vpnFabricEndpointObservationStore) ObserveFailure(endpointID string, re observation.ReliabilityScore = 35 observation.ObservedAt = time.Now().UTC() s.observations[endpointID] = observation + s.pruneLocked(observation.ObservedAt, vpnFabricEndpointObservationMaxAge, maxVPNFabricEndpointObservationEntries) } func fabricTransportLabelIsQUIC(label string) bool { diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 3cdcaad..535595e 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -5,6 +5,7 @@ import ( "crypto/ed25519" "encoding/base64" "encoding/json" + "fmt" "io" "log" "net/http" @@ -814,6 +815,35 @@ func TestVPNFabricEndpointObservationReportIsBoundedAndNewestFirst(t *testing.T) } } +func TestVPNFabricEndpointObservationStorePrunesOldAndExcessEntries(t *testing.T) { + store := newVPNFabricEndpointObservationStore() + now := time.Now().UTC() + store.observations["old"] = mesh.EndpointCandidateHealthObservation{ + EndpointID: "old", + ObservedAt: now.Add(-vpnFabricEndpointObservationMaxAge - time.Second), + } + for i := 0; i < maxVPNFabricEndpointObservationEntries+10; i++ { + endpointID := fmt.Sprintf("endpoint-%03d", i) + store.observations[endpointID] = mesh.EndpointCandidateHealthObservation{ + EndpointID: endpointID, + ObservedAt: now.Add(time.Duration(i) * time.Second), + } + } + snapshot := store.Snapshot() + if len(snapshot) != maxVPNFabricEndpointObservationEntries { + t.Fatalf("snapshot size = %d, want %d", len(snapshot), maxVPNFabricEndpointObservationEntries) + } + if _, ok := snapshot["old"]; ok { + t.Fatalf("old observation was not pruned: %+v", snapshot["old"]) + } + if _, ok := snapshot["endpoint-000"]; ok { + t.Fatalf("oldest excess observation was not pruned") + } + if _, ok := snapshot[fmt.Sprintf("endpoint-%03d", maxVPNFabricEndpointObservationEntries+9)]; !ok { + t.Fatalf("newest observation was pruned") + } +} + func TestVPNFabricSessionTargetPrefersRankedQUICCandidate(t *testing.T) { now := time.Now().UTC() target, ok := vpnFabricSessionTarget(&syntheticMeshState{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index df54123..1cbef20 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -336,6 +336,9 @@ candidate feedback without parsing the transport diagnostics blob. VPN fabric-session transport telemetry is carrier-neutral (`fabric_session_binary_frames`) and reports QUIC/WebSocket as available carriers instead of describing the dataplane as WebSocket-only. +Endpoint health observations are pruned in-memory by age and count before +snapshot/report generation, preventing long-running nodes from accumulating +unbounded candidate history. Deliverables: