Classify VPN scheduler pressure

This commit is contained in:
2026-05-16 13:22:23 +03:00
parent 5c3b19cff7
commit 1687277688
3 changed files with 80 additions and 0 deletions
@@ -252,6 +252,8 @@ type FabricFlowSchedulerSnapshot struct {
Dropped uint64 `json:"dropped"` Dropped uint64 `json:"dropped"`
HighWatermark int `json:"high_watermark"` HighWatermark int `json:"high_watermark"`
BackpressureActive bool `json:"backpressure_active"` BackpressureActive bool `json:"backpressure_active"`
PressureLevel string `json:"pressure_level,omitempty"`
PressureReasons []string `json:"pressure_reasons,omitempty"`
InFlight int `json:"in_flight"` InFlight int `json:"in_flight"`
MaxInFlight int `json:"max_in_flight"` MaxInFlight int `json:"max_in_flight"`
AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"` AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"`
@@ -860,9 +862,72 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot {
snapshot.AdaptiveBackpressureReason = "bulk_window_reduced_to_protect_interactive" snapshot.AdaptiveBackpressureReason = "bulk_window_reduced_to_protect_interactive"
} }
} }
snapshot.PressureLevel, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot)
return snapshot return snapshot
} }
func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, []string) {
level := "nominal"
reasons := []string{}
addReason := func(reason string) {
reason = strings.TrimSpace(reason)
if reason == "" {
return
}
for _, existing := range reasons {
if existing == reason {
return
}
}
reasons = append(reasons, reason)
}
escalate := func(next string) {
if flowPressureRank(next) > flowPressureRank(level) {
level = next
}
}
if snapshot.Dropped > 0 || snapshot.QualityWindowDropCount > 0 {
escalate("critical")
addReason("drops")
}
if snapshot.FailingChannelCount > 0 || snapshot.QualityWindowFailureCount > 0 {
escalate("critical")
addReason("route_failures")
}
if snapshot.RouteRecoveredChannelCount > 0 || snapshot.RouteSwitchCount > 0 {
escalate("warning")
addReason("route_recovery")
}
if snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 {
escalate("warning")
addReason("slow_channels")
}
if snapshot.BulkPressureActive {
escalate("warning")
addReason("bulk_pressure")
}
if snapshot.AdaptiveBackpressureActive {
escalate("warning")
addReason(snapshot.AdaptiveBackpressureReason)
}
if snapshot.BackpressureActive {
escalate("warning")
addReason("backpressure")
}
return level, reasons
}
func flowPressureRank(level string) int {
switch strings.TrimSpace(level) {
case "critical":
return 2
case "warning":
return 1
default:
return 0
}
}
func (s *FabricFlowScheduler) recommendedParallelSendWindowForTrafficClassLocked(trafficClass string, maxWindow int) int { func (s *FabricFlowScheduler) recommendedParallelSendWindowForTrafficClassLocked(trafficClass string, maxWindow int) int {
if maxWindow <= 1 { if maxWindow <= 1 {
return 1 return 1
@@ -789,6 +789,9 @@ func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) {
if snapshot.Dropped != 1 || !snapshot.BackpressureActive { if snapshot.Dropped != 1 || !snapshot.BackpressureActive {
t.Fatalf("snapshot = %+v, want one dropped packet and active backpressure", snapshot) t.Fatalf("snapshot = %+v, want one dropped packet and active backpressure", snapshot)
} }
if snapshot.PressureLevel != "critical" || !containsString(snapshot.PressureReasons, "drops") {
t.Fatalf("pressure = %s/%v, want critical drops", snapshot.PressureLevel, snapshot.PressureReasons)
}
} }
func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) { func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) {
@@ -1544,6 +1547,11 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test
snapshot.FlowScheduler.RouteSwitchReasonCounts["peer_unavailable"] != 1 { snapshot.FlowScheduler.RouteSwitchReasonCounts["peer_unavailable"] != 1 {
t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler) t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler)
} }
if snapshot.FlowScheduler.PressureLevel != "critical" ||
!containsString(snapshot.FlowScheduler.PressureReasons, "route_recovery") ||
!containsString(snapshot.FlowScheduler.PressureReasons, "route_failures") {
t.Fatalf("route recovery pressure = %s/%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureReasons)
}
if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 {
t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB) t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB)
} }
@@ -1911,6 +1919,9 @@ func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testi
if !snapshot.BulkPressureActive || snapshot.BulkPressureChannelCount != 16 || snapshot.InteractiveOrControlCount != 1 || !snapshot.BackpressureActive { if !snapshot.BulkPressureActive || snapshot.BulkPressureChannelCount != 16 || snapshot.InteractiveOrControlCount != 1 || !snapshot.BackpressureActive {
t.Fatalf("bulk pressure telemetry = %+v", snapshot) t.Fatalf("bulk pressure telemetry = %+v", snapshot)
} }
if snapshot.PressureLevel != "warning" || !containsString(snapshot.PressureReasons, "bulk_pressure") {
t.Fatalf("pressure = %s/%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureReasons)
}
} }
func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) { func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) {
@@ -461,6 +461,10 @@ timing and switch count.
Common route switch reasons are bucketed into stable labels such as timeout, Common route switch reasons are bucketed into stable labels such as timeout,
peer_unavailable, connection_refused, connection_reset, no_route_to_host, and peer_unavailable, connection_refused, connection_reset, no_route_to_host, and
capacity_limited to keep heartbeat cardinality bounded. capacity_limited to keep heartbeat cardinality bounded.
Flow-scheduler snapshots now include a machine-readable pressure level
(`nominal`, `warning`, `critical`) and bounded reason list derived from drops,
route failures, route recovery, slow channels, bulk pressure, and adaptive
backpressure.
Endpoint ranking treats `capacity_limited` observations as a soft pressure Endpoint ranking treats `capacity_limited` observations as a soft pressure
penalty instead of a hard recent failure, enabling load spreading without penalty instead of a hard recent failure, enabling load spreading without
marking the carrier unhealthy. marking the carrier unhealthy.