Report capacity pressure by endpoint

This commit is contained in:
2026-05-16 12:02:46 +03:00
parent 9e964e28cb
commit 90fe4b6872
3 changed files with 130 additions and 7 deletions
@@ -48,6 +48,7 @@ const (
maxMeshRendezvousLeaseReportEntries = 20
maxVPNFabricEndpointHealthReportEntries = 32
maxVPNFabricEndpointObservationEntries = 256
maxVPNFabricCapacityCounterEntries = 32
vpnFabricEndpointObservationMaxAge = 30 * time.Minute
meshRendezvousLeaseReportSchema = "c17z18.mesh_rendezvous_lease_report.v1"
meshRendezvousLeaseTelemetryCapability = "mesh_rendezvous_lease_telemetry"
@@ -429,6 +430,16 @@ type vpnFabricSessionDialStats struct {
LastSelectedUnixSec atomic.Int64
LastCapacityUnixSec atomic.Int64
LastFailureUnixSec atomic.Int64
capacityMu sync.Mutex
capacityByEndpoint map[string]vpnFabricCapacityCounter
}
type vpnFabricCapacityCounter struct {
EndpointID string `json:"endpoint_id,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Transport string `json:"transport,omitempty"`
Count int64 `json:"count"`
LastSeenUnixSec int64 `json:"last_seen_unix_sec"`
}
type vpnFabricEndpointObservationStore struct {
@@ -438,7 +449,9 @@ type vpnFabricEndpointObservationStore struct {
}
func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats {
return &vpnFabricSessionDialStats{}
return &vpnFabricSessionDialStats{
capacityByEndpoint: map[string]vpnFabricCapacityCounter{},
}
}
func newVPNFabricEndpointObservationStore(reporterNodeID ...string) *vpnFabricEndpointObservationStore {
@@ -637,13 +650,102 @@ func (s *vpnFabricSessionDialStats) ObserveCapacityLimited(target mesh.FabricTra
return
}
s.ObserveCandidateFailure("capacity_limited")
s.LastCapacityEndpoint.Store(strings.TrimSpace(target.Endpoint))
endpoint := strings.TrimSpace(target.Endpoint)
endpointID := strings.TrimSpace(target.EndpointID)
s.LastCapacityEndpoint.Store(endpoint)
transport := strings.TrimSpace(target.Transport)
if transport == "" {
transport = "legacy_peer_endpoint"
}
s.LastCapacityTransport.Store(transport)
s.LastCapacityUnixSec.Store(time.Now().UTC().Unix())
observedAt := time.Now().UTC().Unix()
s.LastCapacityUnixSec.Store(observedAt)
key := endpointID
if key == "" {
key = transport + "|" + endpoint
}
if key == "|" {
key = "unknown"
}
s.capacityMu.Lock()
if s.capacityByEndpoint == nil {
s.capacityByEndpoint = map[string]vpnFabricCapacityCounter{}
}
counter := s.capacityByEndpoint[key]
counter.EndpointID = endpointID
counter.Endpoint = endpoint
counter.Transport = transport
counter.Count++
counter.LastSeenUnixSec = observedAt
s.capacityByEndpoint[key] = counter
s.pruneCapacityCountersLocked(maxVPNFabricCapacityCounterEntries)
s.capacityMu.Unlock()
}
func (s *vpnFabricSessionDialStats) pruneCapacityCountersLocked(maxEntries int) {
if s == nil || maxEntries <= 0 || len(s.capacityByEndpoint) <= maxEntries {
return
}
values := make([]vpnFabricCapacityCounter, 0, len(s.capacityByEndpoint))
for _, counter := range s.capacityByEndpoint {
values = append(values, counter)
}
sort.SliceStable(values, func(i, j int) bool {
if values[i].LastSeenUnixSec != values[j].LastSeenUnixSec {
return values[i].LastSeenUnixSec > values[j].LastSeenUnixSec
}
if values[i].Count != values[j].Count {
return values[i].Count > values[j].Count
}
return values[i].Endpoint < values[j].Endpoint
})
keep := make(map[string]struct{}, maxEntries)
for _, counter := range values[:maxEntries] {
key := strings.TrimSpace(counter.EndpointID)
if key == "" {
key = strings.TrimSpace(counter.Transport) + "|" + strings.TrimSpace(counter.Endpoint)
}
if key == "|" {
key = "unknown"
}
keep[key] = struct{}{}
}
for key := range s.capacityByEndpoint {
if _, ok := keep[key]; !ok {
delete(s.capacityByEndpoint, key)
}
}
}
func (s *vpnFabricSessionDialStats) capacityCountersSnapshot(maxEntries int) []vpnFabricCapacityCounter {
if s == nil {
return nil
}
s.capacityMu.Lock()
defer s.capacityMu.Unlock()
if len(s.capacityByEndpoint) == 0 {
return []vpnFabricCapacityCounter{}
}
values := make([]vpnFabricCapacityCounter, 0, len(s.capacityByEndpoint))
for _, counter := range s.capacityByEndpoint {
values = append(values, counter)
}
sort.SliceStable(values, func(i, j int) bool {
if values[i].LastSeenUnixSec != values[j].LastSeenUnixSec {
return values[i].LastSeenUnixSec > values[j].LastSeenUnixSec
}
if values[i].Count != values[j].Count {
return values[i].Count > values[j].Count
}
if values[i].EndpointID != values[j].EndpointID {
return values[i].EndpointID < values[j].EndpointID
}
return values[i].Endpoint < values[j].Endpoint
})
if maxEntries <= 0 || maxEntries > len(values) {
maxEntries = len(values)
}
return values[:maxEntries]
}
func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() {
@@ -702,6 +804,7 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any
"last_selected_unix_sec": s.LastSelectedUnixSec.Load(),
"last_capacity_unix_sec": s.LastCapacityUnixSec.Load(),
"last_failure_unix_sec": s.LastFailureUnixSec.Load(),
"capacity_pressure": s.capacityCountersSnapshot(maxVPNFabricCapacityCounterEntries),
}
if value, ok := s.LastTransport.Load().(string); ok && value != "" {
report["last_transport"] = value
@@ -777,8 +777,14 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) {
stats := newVPNFabricSessionDialStats()
stats.Attempts.Add(1)
stats.ObserveCapacityLimited(mesh.FabricTransportTarget{
Endpoint: "quic://node-b.example.test:19443",
Transport: "direct_quic",
EndpointID: "node-b-quic",
Endpoint: "quic://node-b.example.test:19443",
Transport: "direct_quic",
})
stats.ObserveCapacityLimited(mesh.FabricTransportTarget{
EndpointID: "node-b-quic",
Endpoint: "quic://node-b.example.test:19443",
Transport: "direct_quic",
})
stats.ObserveCandidateFailure("session_open_failed")
stats.ObserveSelected(mesh.FabricTransportTarget{
@@ -790,8 +796,8 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) {
report := stats.Report(time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC))
if report["attempts"] != int64(1) ||
report["selected"] != int64(1) ||
report["candidate_failures"] != int64(2) ||
report["capacity_limited"] != int64(1) ||
report["candidate_failures"] != int64(3) ||
report["capacity_limited"] != int64(2) ||
report["session_open_failures"] != int64(1) ||
report["quic_selected"] != int64(1) ||
report["pinned_cert_selected"] != int64(1) ||
@@ -802,6 +808,17 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) {
report["last_failure_reason"] != "session_open_failed" {
t.Fatalf("unexpected dial stats report: %+v", report)
}
capacityPressure, ok := report["capacity_pressure"].([]vpnFabricCapacityCounter)
if !ok || len(capacityPressure) != 1 {
t.Fatalf("capacity pressure counters missing: %+v", report["capacity_pressure"])
}
if capacityPressure[0].EndpointID != "node-b-quic" ||
capacityPressure[0].Endpoint != "quic://node-b.example.test:19443" ||
capacityPressure[0].Transport != "direct_quic" ||
capacityPressure[0].Count != 2 ||
capacityPressure[0].LastSeenUnixSec <= 0 {
t.Fatalf("unexpected capacity pressure counter: %+v", capacityPressure[0])
}
}
func TestFabricSessionOpenFailureReasonClassifiesCapacity(t *testing.T) {