Report VPN flow pressure in heartbeat
This commit is contained in:
@@ -903,6 +903,46 @@ func vpnFabricQUICPressureReport(snapshot mesh.QUICFabricTransportSnapshot, maxE
|
|||||||
return entries[:maxEntries]
|
return entries[:maxEntries]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot) map[string]any {
|
||||||
|
report := map[string]any{
|
||||||
|
"schema_version": "rap.vpn_fabric_flow_pressure.v1",
|
||||||
|
"pressure_level": snapshot.PressureLevel,
|
||||||
|
"pressure_score": snapshot.PressureScore,
|
||||||
|
"pressure_reasons": append([]string{}, snapshot.PressureReasons...),
|
||||||
|
"backpressure_active": snapshot.BackpressureActive,
|
||||||
|
"bulk_pressure_active": snapshot.BulkPressureActive,
|
||||||
|
"bulk_pressure_channel_count": snapshot.BulkPressureChannelCount,
|
||||||
|
"interactive_or_control_channels": snapshot.InteractiveOrControlCount,
|
||||||
|
"route_recovered_channel_count": snapshot.RouteRecoveredChannelCount,
|
||||||
|
"route_switch_count": snapshot.RouteSwitchCount,
|
||||||
|
"route_recovery_max_ms": snapshot.RouteRecoveryMaxMillis,
|
||||||
|
"route_recovery_avg_ms": snapshot.RouteRecoveryAvgMillis,
|
||||||
|
"route_switch_reason_counts": copyStringIntMap(snapshot.RouteSwitchReasonCounts),
|
||||||
|
"adaptive_backpressure_active": snapshot.AdaptiveBackpressureActive,
|
||||||
|
"adaptive_backpressure_reason": snapshot.AdaptiveBackpressureReason,
|
||||||
|
"quality_window_failure_count": snapshot.QualityWindowFailureCount,
|
||||||
|
"quality_window_slow_count": snapshot.QualityWindowSlowCount,
|
||||||
|
"quality_window_drop_count": snapshot.QualityWindowDropCount,
|
||||||
|
"recommended_parallel_windows": copyStringIntMap(snapshot.RecommendedParallelWindows),
|
||||||
|
"adaptive_policy_fingerprint": snapshot.AdaptivePolicyFingerprint,
|
||||||
|
}
|
||||||
|
if report["pressure_level"] == "" {
|
||||||
|
report["pressure_level"] = "nominal"
|
||||||
|
}
|
||||||
|
return report
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyStringIntMap(in map[string]int) map[string]int {
|
||||||
|
if len(in) == 0 {
|
||||||
|
return map[string]int{}
|
||||||
|
}
|
||||||
|
out := make(map[string]int, len(in))
|
||||||
|
for key, value := range in {
|
||||||
|
out[key] = value
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func newFabricServiceChannelAccessStats() *fabricServiceChannelAccessStats {
|
func newFabricServiceChannelAccessStats() *fabricServiceChannelAccessStats {
|
||||||
return &fabricServiceChannelAccessStats{}
|
return &fabricServiceChannelAccessStats{}
|
||||||
}
|
}
|
||||||
@@ -3225,6 +3265,10 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
|
|||||||
if meshState != nil && meshState.VPNFabricSessionDialStats != nil {
|
if meshState != nil && meshState.VPNFabricSessionDialStats != nil {
|
||||||
report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt)
|
report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt)
|
||||||
}
|
}
|
||||||
|
if meshState != nil && meshState.VPNFabricIngress != nil {
|
||||||
|
ingressSnapshot := meshState.VPNFabricIngress.Snapshot(identity.ClusterID)
|
||||||
|
report["flow_pressure"] = vpnFabricFlowPressureReport(ingressSnapshot.FlowScheduler)
|
||||||
|
}
|
||||||
payload.Metadata["vpn_fabric_session_transport_report"] = report
|
payload.Metadata["vpn_fabric_session_transport_report"] = report
|
||||||
payload.Capabilities["vpn_fabric_session_transport"] = true
|
payload.Capabilities["vpn_fabric_session_transport"] = true
|
||||||
payload.Capabilities["vpn_packet_batch_binary_frames"] = true
|
payload.Capabilities["vpn_packet_batch_binary_frames"] = true
|
||||||
|
|||||||
@@ -722,6 +722,11 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
|
|||||||
transport.MaxStreamsPerConn = 24
|
transport.MaxStreamsPerConn = 24
|
||||||
return transport
|
return transport
|
||||||
}(),
|
}(),
|
||||||
|
VPNFabricIngress: &vpnruntime.FabricClientPacketIngress{
|
||||||
|
FlowScheduler: vpnruntime.NewFabricFlowScheduler(0, 0),
|
||||||
|
ClusterID: "cluster-1",
|
||||||
|
LocalNodeID: "node-a",
|
||||||
|
},
|
||||||
QUICFabricListenAddr: "127.0.0.1:19443",
|
QUICFabricListenAddr: "127.0.0.1:19443",
|
||||||
}, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC))
|
}, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
@@ -767,6 +772,11 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
|
|||||||
t.Fatalf("vpn fabric quic session report missing: %+v", report)
|
t.Fatalf("vpn fabric quic session report missing: %+v", report)
|
||||||
} else if report["quic_capacity_pressure"] == nil {
|
} else if report["quic_capacity_pressure"] == nil {
|
||||||
t.Fatalf("vpn fabric quic pressure report missing: %+v", report)
|
t.Fatalf("vpn fabric quic pressure report missing: %+v", report)
|
||||||
|
} else if pressure, ok := report["flow_pressure"].(map[string]any); !ok ||
|
||||||
|
pressure["schema_version"] != "rap.vpn_fabric_flow_pressure.v1" ||
|
||||||
|
pressure["pressure_level"] != "nominal" ||
|
||||||
|
pressure["pressure_score"] != 0 {
|
||||||
|
t.Fatalf("vpn fabric flow pressure report missing: %+v", report)
|
||||||
}
|
}
|
||||||
if payload.Capabilities["vpn_fabric_session_stream_shards"] != true {
|
if payload.Capabilities["vpn_fabric_session_stream_shards"] != true {
|
||||||
t.Fatalf("vpn fabric stream shard capability missing: %+v", payload.Capabilities)
|
t.Fatalf("vpn fabric stream shard capability missing: %+v", payload.Capabilities)
|
||||||
|
|||||||
@@ -469,6 +469,9 @@ The same pressure classification includes a bounded 0-100 score for automated
|
|||||||
route, endpoint, and node comparisons.
|
route, endpoint, and node comparisons.
|
||||||
`mesh-live-smoke` reports the mixed-load scheduler pressure level, score, and
|
`mesh-live-smoke` reports the mixed-load scheduler pressure level, score, and
|
||||||
reasons.
|
reasons.
|
||||||
|
Heartbeat VPN fabric transport reports now include a compact
|
||||||
|
`flow_pressure` summary with level, score, reasons, bulk pressure, route
|
||||||
|
recovery timing, reason counts, and recommended per-class windows.
|
||||||
Endpoint ranking treats `capacity_limited` observations as a soft pressure
|
Endpoint ranking treats `capacity_limited` observations as a soft pressure
|
||||||
penalty instead of a hard recent failure, enabling load spreading without
|
penalty instead of a hard recent failure, enabling load spreading without
|
||||||
marking the carrier unhealthy.
|
marking the carrier unhealthy.
|
||||||
|
|||||||
Reference in New Issue
Block a user