From 1687277688fae1e613e74e9ab44594033b635e7d Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 13:22:23 +0300 Subject: [PATCH] Classify VPN scheduler pressure --- .../internal/vpnruntime/fabric_transport.go | 65 +++++++++++++++++++ .../vpnruntime/fabric_transport_test.go | 11 ++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 4 ++ 3 files changed, 80 insertions(+) diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index 6d62f59..c84fa16 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -252,6 +252,8 @@ type FabricFlowSchedulerSnapshot struct { Dropped uint64 `json:"dropped"` HighWatermark int `json:"high_watermark"` BackpressureActive bool `json:"backpressure_active"` + PressureLevel string `json:"pressure_level,omitempty"` + PressureReasons []string `json:"pressure_reasons,omitempty"` InFlight int `json:"in_flight"` MaxInFlight int `json:"max_in_flight"` AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"` @@ -860,9 +862,72 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { snapshot.AdaptiveBackpressureReason = "bulk_window_reduced_to_protect_interactive" } } + snapshot.PressureLevel, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot) return snapshot } +func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, []string) { + level := "nominal" + reasons := []string{} + addReason := func(reason string) { + reason = strings.TrimSpace(reason) + if reason == "" { + return + } + for _, existing := range reasons { + if existing == reason { + return + } + } + reasons = append(reasons, reason) + } + escalate := func(next string) { + if flowPressureRank(next) > flowPressureRank(level) { + level = next + } + } + if snapshot.Dropped > 0 || snapshot.QualityWindowDropCount > 0 { + escalate("critical") + addReason("drops") + } + if snapshot.FailingChannelCount > 0 || snapshot.QualityWindowFailureCount > 0 { + escalate("critical") + addReason("route_failures") + } + if snapshot.RouteRecoveredChannelCount > 0 || snapshot.RouteSwitchCount > 0 { + escalate("warning") + addReason("route_recovery") + } + if snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 { + escalate("warning") + addReason("slow_channels") + } + if snapshot.BulkPressureActive { + escalate("warning") + addReason("bulk_pressure") + } + if snapshot.AdaptiveBackpressureActive { + escalate("warning") + addReason(snapshot.AdaptiveBackpressureReason) + } + if snapshot.BackpressureActive { + escalate("warning") + addReason("backpressure") + } + return level, reasons +} + +func flowPressureRank(level string) int { + switch strings.TrimSpace(level) { + case "critical": + return 2 + case "warning": + return 1 + default: + return 0 + } +} + func (s *FabricFlowScheduler) recommendedParallelSendWindowForTrafficClassLocked(trafficClass string, maxWindow int) int { if maxWindow <= 1 { return 1 diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index a04ff5f..d619e95 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -789,6 +789,9 @@ func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) { if snapshot.Dropped != 1 || !snapshot.BackpressureActive { t.Fatalf("snapshot = %+v, want one dropped packet and active backpressure", snapshot) } + if snapshot.PressureLevel != "critical" || !containsString(snapshot.PressureReasons, "drops") { + t.Fatalf("pressure = %s/%v, want critical drops", snapshot.PressureLevel, snapshot.PressureReasons) + } } func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) { @@ -1544,6 +1547,11 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test snapshot.FlowScheduler.RouteSwitchReasonCounts["peer_unavailable"] != 1 { t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler) } + if snapshot.FlowScheduler.PressureLevel != "critical" || + !containsString(snapshot.FlowScheduler.PressureReasons, "route_recovery") || + !containsString(snapshot.FlowScheduler.PressureReasons, "route_failures") { + t.Fatalf("route recovery pressure = %s/%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureReasons) + } if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB) } @@ -1911,6 +1919,9 @@ func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testi if !snapshot.BulkPressureActive || snapshot.BulkPressureChannelCount != 16 || snapshot.InteractiveOrControlCount != 1 || !snapshot.BackpressureActive { t.Fatalf("bulk pressure telemetry = %+v", snapshot) } + if snapshot.PressureLevel != "warning" || !containsString(snapshot.PressureReasons, "bulk_pressure") { + t.Fatalf("pressure = %s/%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureReasons) + } } func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 8d18a0e..84243f6 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -461,6 +461,10 @@ timing and switch count. Common route switch reasons are bucketed into stable labels such as timeout, peer_unavailable, connection_refused, connection_reset, no_route_to_host, and capacity_limited to keep heartbeat cardinality bounded. +Flow-scheduler snapshots now include a machine-readable pressure level +(`nominal`, `warning`, `critical`) and bounded reason list derived from drops, +route failures, route recovery, slow channels, bulk pressure, and adaptive +backpressure. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.