Summarize QUIC pressure in heartbeat
This commit is contained in:
@@ -49,6 +49,7 @@ const (
|
||||
maxVPNFabricEndpointHealthReportEntries = 32
|
||||
maxVPNFabricEndpointObservationEntries = 256
|
||||
maxVPNFabricCapacityCounterEntries = 32
|
||||
maxVPNFabricQUICPressureReportEntries = 16
|
||||
vpnFabricEndpointObservationMaxAge = 30 * time.Minute
|
||||
meshRendezvousLeaseReportSchema = "c17z18.mesh_rendezvous_lease_report.v1"
|
||||
meshRendezvousLeaseTelemetryCapability = "mesh_rendezvous_lease_telemetry"
|
||||
@@ -442,6 +443,16 @@ type vpnFabricCapacityCounter struct {
|
||||
LastSeenUnixSec int64 `json:"last_seen_unix_sec"`
|
||||
}
|
||||
|
||||
type vpnFabricQUICPressureEntry struct {
|
||||
PeerID string `json:"peer_id,omitempty"`
|
||||
Endpoint string `json:"endpoint,omitempty"`
|
||||
ActiveStreams int `json:"active_streams"`
|
||||
MaxStreams int `json:"max_streams"`
|
||||
CapacityPressurePercent int `json:"capacity_pressure_percent"`
|
||||
Saturated bool `json:"saturated,omitempty"`
|
||||
LastUsedUnixSec int64 `json:"last_used_unix_sec,omitempty"`
|
||||
}
|
||||
|
||||
type vpnFabricEndpointObservationStore struct {
|
||||
reporterNodeID string
|
||||
mu sync.Mutex
|
||||
@@ -855,6 +866,43 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any
|
||||
return report
|
||||
}
|
||||
|
||||
func vpnFabricQUICPressureReport(snapshot mesh.QUICFabricTransportSnapshot, maxEntries int) []vpnFabricQUICPressureEntry {
|
||||
if len(snapshot.Connections) == 0 {
|
||||
return []vpnFabricQUICPressureEntry{}
|
||||
}
|
||||
entries := make([]vpnFabricQUICPressureEntry, 0, len(snapshot.Connections))
|
||||
for _, conn := range snapshot.Connections {
|
||||
if conn.CapacityPressurePercent <= 0 && conn.ActiveStreams <= 0 && !conn.Saturated {
|
||||
continue
|
||||
}
|
||||
entries = append(entries, vpnFabricQUICPressureEntry{
|
||||
PeerID: conn.PeerID,
|
||||
Endpoint: conn.Endpoint,
|
||||
ActiveStreams: conn.ActiveStreams,
|
||||
MaxStreams: conn.MaxStreams,
|
||||
CapacityPressurePercent: conn.CapacityPressurePercent,
|
||||
Saturated: conn.Saturated,
|
||||
LastUsedUnixSec: conn.LastUsedUnixSec,
|
||||
})
|
||||
}
|
||||
sort.SliceStable(entries, func(i, j int) bool {
|
||||
if entries[i].CapacityPressurePercent != entries[j].CapacityPressurePercent {
|
||||
return entries[i].CapacityPressurePercent > entries[j].CapacityPressurePercent
|
||||
}
|
||||
if entries[i].ActiveStreams != entries[j].ActiveStreams {
|
||||
return entries[i].ActiveStreams > entries[j].ActiveStreams
|
||||
}
|
||||
if entries[i].PeerID != entries[j].PeerID {
|
||||
return entries[i].PeerID < entries[j].PeerID
|
||||
}
|
||||
return entries[i].Endpoint < entries[j].Endpoint
|
||||
})
|
||||
if maxEntries <= 0 || maxEntries > len(entries) {
|
||||
maxEntries = len(entries)
|
||||
}
|
||||
return entries[:maxEntries]
|
||||
}
|
||||
|
||||
func newFabricServiceChannelAccessStats() *fabricServiceChannelAccessStats {
|
||||
return &fabricServiceChannelAccessStats{}
|
||||
}
|
||||
@@ -3170,6 +3218,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
|
||||
if meshState != nil && meshState.VPNFabricQUICTransport != nil {
|
||||
quicSnapshot := meshState.VPNFabricQUICTransport.Snapshot()
|
||||
report["quic_sessions"] = quicSnapshot
|
||||
report["quic_capacity_pressure"] = vpnFabricQUICPressureReport(quicSnapshot, maxVPNFabricQUICPressureReportEntries)
|
||||
report["quic_max_streams_per_conn"] = meshState.VPNFabricQUICTransport.MaxStreamsPerConn
|
||||
report["quic_idle_ttl_seconds"] = int(meshState.VPNFabricQUICTransport.IdleTTL.Seconds())
|
||||
}
|
||||
|
||||
@@ -765,6 +765,8 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
|
||||
t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata)
|
||||
} else if report["quic_sessions"] == nil || report["quic_max_streams_per_conn"] != 24 {
|
||||
t.Fatalf("vpn fabric quic session report missing: %+v", report)
|
||||
} else if report["quic_capacity_pressure"] == nil {
|
||||
t.Fatalf("vpn fabric quic pressure report missing: %+v", report)
|
||||
}
|
||||
if payload.Capabilities["vpn_fabric_session_stream_shards"] != true {
|
||||
t.Fatalf("vpn fabric stream shard capability missing: %+v", payload.Capabilities)
|
||||
@@ -1185,6 +1187,44 @@ func TestMergeEndpointCapacityPressureKeepsStrongerSignal(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestVPNFabricQUICPressureReportRanksBusyConnections(t *testing.T) {
|
||||
report := vpnFabricQUICPressureReport(mesh.QUICFabricTransportSnapshot{
|
||||
Connections: []mesh.QUICFabricConnSnapshot{
|
||||
{
|
||||
PeerID: "node-c",
|
||||
Endpoint: "node-c.example.test:19443",
|
||||
ActiveStreams: 1,
|
||||
MaxStreams: 10,
|
||||
CapacityPressurePercent: 10,
|
||||
},
|
||||
{
|
||||
PeerID: "node-b",
|
||||
Endpoint: "node-b.example.test:19443",
|
||||
ActiveStreams: 9,
|
||||
MaxStreams: 10,
|
||||
CapacityPressurePercent: 90,
|
||||
Saturated: true,
|
||||
LastUsedUnixSec: 100,
|
||||
},
|
||||
{
|
||||
PeerID: "idle",
|
||||
Endpoint: "idle.example.test:19443",
|
||||
ActiveStreams: 0,
|
||||
MaxStreams: 10,
|
||||
},
|
||||
},
|
||||
}, 1)
|
||||
if len(report) != 1 {
|
||||
t.Fatalf("report count = %d, want 1: %+v", len(report), report)
|
||||
}
|
||||
if report[0].PeerID != "node-b" ||
|
||||
report[0].CapacityPressurePercent != 90 ||
|
||||
!report[0].Saturated ||
|
||||
report[0].LastUsedUnixSec != 100 {
|
||||
t.Fatalf("unexpected pressure report: %+v", report[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
merged := mergedEndpointCandidateObservations(
|
||||
|
||||
Reference in New Issue
Block a user