From f9ff0a46316572d06d6139f395c9ebb01f25ff97 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 13:26:07 +0300 Subject: [PATCH] Score VPN scheduler pressure --- .../internal/vpnruntime/fabric_transport.go | 28 +++++++++++++++++-- .../vpnruntime/fabric_transport_test.go | 11 ++++---- .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 ++ 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index c84fa16..82c03f0 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -253,6 +253,7 @@ type FabricFlowSchedulerSnapshot struct { HighWatermark int `json:"high_watermark"` BackpressureActive bool `json:"backpressure_active"` PressureLevel string `json:"pressure_level,omitempty"` + PressureScore int `json:"pressure_score,omitempty"` PressureReasons []string `json:"pressure_reasons,omitempty"` InFlight int `json:"in_flight"` MaxInFlight int `json:"max_in_flight"` @@ -862,12 +863,13 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { snapshot.AdaptiveBackpressureReason = "bulk_window_reduced_to_protect_interactive" } } - snapshot.PressureLevel, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot) + snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot) return snapshot } -func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, []string) { +func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, int, []string) { level := "nominal" + score := 0 reasons := []string{} addReason := func(reason string) { reason = strings.TrimSpace(reason) @@ -888,33 +890,43 @@ func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, } if snapshot.Dropped > 0 || snapshot.QualityWindowDropCount > 0 { escalate("critical") + score += boundedFabricPressureScore(int(snapshot.Dropped)+snapshot.QualityWindowDropCount*10, 20, 40) addReason("drops") } if snapshot.FailingChannelCount > 0 || snapshot.QualityWindowFailureCount > 0 { escalate("critical") + score += boundedFabricPressureScore((snapshot.FailingChannelCount+snapshot.QualityWindowFailureCount)*10, 20, 40) addReason("route_failures") } if snapshot.RouteRecoveredChannelCount > 0 || snapshot.RouteSwitchCount > 0 { escalate("warning") + score += boundedFabricPressureScore(snapshot.RouteRecoveredChannelCount*8+int(snapshot.RouteSwitchCount)*4, 8, 24) addReason("route_recovery") } if snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 { escalate("warning") + score += boundedFabricPressureScore((snapshot.SlowChannelCount+snapshot.QualityWindowSlowCount)*6, 6, 24) addReason("slow_channels") } if snapshot.BulkPressureActive { escalate("warning") + score += 15 addReason("bulk_pressure") } if snapshot.AdaptiveBackpressureActive { escalate("warning") + score += 10 addReason(snapshot.AdaptiveBackpressureReason) } if snapshot.BackpressureActive { escalate("warning") + score += 10 addReason("backpressure") } - return level, reasons + if score > 100 { + score = 100 + } + return level, score, reasons } func flowPressureRank(level string) int { @@ -928,6 +940,16 @@ func flowPressureRank(level string) int { } } +func boundedFabricPressureScore(value, minValue, maxValue int) int { + if value < minValue { + return minValue + } + if value > maxValue { + return maxValue + } + return value +} + func (s *FabricFlowScheduler) recommendedParallelSendWindowForTrafficClassLocked(trafficClass string, maxWindow int) int { if maxWindow <= 1 { return 1 diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index d619e95..3cd40c8 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -789,8 +789,8 @@ func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) { if snapshot.Dropped != 1 || !snapshot.BackpressureActive { t.Fatalf("snapshot = %+v, want one dropped packet and active backpressure", snapshot) } - if snapshot.PressureLevel != "critical" || !containsString(snapshot.PressureReasons, "drops") { - t.Fatalf("pressure = %s/%v, want critical drops", snapshot.PressureLevel, snapshot.PressureReasons) + if snapshot.PressureLevel != "critical" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "drops") { + t.Fatalf("pressure = %s score=%d reasons=%v, want critical drops", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons) } } @@ -1548,9 +1548,10 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler) } if snapshot.FlowScheduler.PressureLevel != "critical" || + snapshot.FlowScheduler.PressureScore <= 0 || !containsString(snapshot.FlowScheduler.PressureReasons, "route_recovery") || !containsString(snapshot.FlowScheduler.PressureReasons, "route_failures") { - t.Fatalf("route recovery pressure = %s/%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureReasons) + t.Fatalf("route recovery pressure = %s score=%d reasons=%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureScore, snapshot.FlowScheduler.PressureReasons) } if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB) @@ -1919,8 +1920,8 @@ func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testi if !snapshot.BulkPressureActive || snapshot.BulkPressureChannelCount != 16 || snapshot.InteractiveOrControlCount != 1 || !snapshot.BackpressureActive { t.Fatalf("bulk pressure telemetry = %+v", snapshot) } - if snapshot.PressureLevel != "warning" || !containsString(snapshot.PressureReasons, "bulk_pressure") { - t.Fatalf("pressure = %s/%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureReasons) + if snapshot.PressureLevel != "warning" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "bulk_pressure") { + t.Fatalf("pressure = %s score=%d reasons=%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons) } } diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 163ef2f..5cef969 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -465,6 +465,8 @@ Flow-scheduler snapshots now include a machine-readable pressure level (`nominal`, `warning`, `critical`) and bounded reason list derived from drops, route failures, route recovery, slow channels, bulk pressure, and adaptive backpressure. +The same pressure classification includes a bounded 0-100 score for automated +route, endpoint, and node comparisons. `mesh-live-smoke` reports the mixed-load scheduler pressure level and reasons. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without