From 5e4c0d596b5c85b061cd7f5624adde0eedc10042 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 13:39:09 +0300 Subject: [PATCH] Promote VPN pressure action to snapshot --- .../cmd/mesh-live-smoke/main.go | 33 +------------------ .../rap-node-agent/cmd/rap-node-agent/main.go | 31 +++-------------- .../cmd/rap-node-agent/main_test.go | 22 +------------ .../internal/vpnruntime/fabric_transport.go | 28 ++++++++++++++++ .../vpnruntime/fabric_transport_test.go | 9 +++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 ++ 6 files changed, 46 insertions(+), 80 deletions(-) diff --git a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go index 56e0393..7c49d93 100644 --- a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go +++ b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go @@ -236,38 +236,7 @@ func smokeVPNFlowSchedulerBulkPressure() (bool, int, int, int, int, string, int, snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons, - smokeVPNPressureAction(snapshot) -} - -func smokeVPNPressureAction(snapshot vpnruntime.FabricFlowSchedulerSnapshot) string { - if containsSmokeString(snapshot.PressureReasons, "drops") || snapshot.QualityWindowDropCount > 0 { - return "shed_or_reroute" - } - if containsSmokeString(snapshot.PressureReasons, "route_failures") || snapshot.QualityWindowFailureCount > 0 || snapshot.FailingChannelCount > 0 { - return "rebuild_or_reroute" - } - if containsSmokeString(snapshot.PressureReasons, "route_recovery") || snapshot.RouteSwitchCount > 0 { - return "observe_recovery" - } - if containsSmokeString(snapshot.PressureReasons, "slow_channels") || snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 { - return "prefer_faster_route" - } - if containsSmokeString(snapshot.PressureReasons, "bulk_pressure") || snapshot.BulkPressureActive { - return "throttle_bulk" - } - if snapshot.AdaptiveBackpressureActive || snapshot.BackpressureActive { - return "reduce_parallelism" - } - return "observe" -} - -func containsSmokeString(values []string, needle string) bool { - for _, value := range values { - if value == needle { - return true - } - } - return false + snapshot.RecommendedAction } func smokeVPNFlowSchedulerRouteRecovery() (bool, uint64, int64, int64, int64, string) { diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 20d1711..4548f1c 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -909,7 +909,7 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot "pressure_level": snapshot.PressureLevel, "pressure_score": snapshot.PressureScore, "pressure_reasons": append([]string{}, snapshot.PressureReasons...), - "recommended_action": vpnFabricFlowPressureAction(snapshot), + "recommended_action": snapshot.RecommendedAction, "backpressure_active": snapshot.BackpressureActive, "bulk_pressure_active": snapshot.BulkPressureActive, "bulk_pressure_channel_count": snapshot.BulkPressureChannelCount, @@ -930,35 +930,12 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot if report["pressure_level"] == "" { report["pressure_level"] = "nominal" } + if report["recommended_action"] == "" { + report["recommended_action"] = "observe" + } return report } -func vpnFabricFlowPressureAction(snapshot vpnruntime.FabricFlowSchedulerSnapshot) string { - reasons := map[string]struct{}{} - for _, reason := range snapshot.PressureReasons { - reasons[strings.TrimSpace(reason)] = struct{}{} - } - if _, ok := reasons["drops"]; ok || snapshot.QualityWindowDropCount > 0 { - return "shed_or_reroute" - } - if _, ok := reasons["route_failures"]; ok || snapshot.QualityWindowFailureCount > 0 || snapshot.FailingChannelCount > 0 { - return "rebuild_or_reroute" - } - if _, ok := reasons["route_recovery"]; ok || snapshot.RouteSwitchCount > 0 { - return "observe_recovery" - } - if _, ok := reasons["slow_channels"]; ok || snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 { - return "prefer_faster_route" - } - if _, ok := reasons["bulk_pressure"]; ok || snapshot.BulkPressureActive { - return "throttle_bulk" - } - if snapshot.AdaptiveBackpressureActive || snapshot.BackpressureActive { - return "reduce_parallelism" - } - return "observe" -} - func copyStringIntMap(in map[string]int) map[string]int { if len(in) == 0 { return map[string]int{} diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index e3511c3..d5180dc 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -1239,32 +1239,12 @@ func TestVPNFabricQUICPressureReportRanksBusyConnections(t *testing.T) { } } -func TestVPNFabricFlowPressureActionPrioritizesAutomation(t *testing.T) { - cases := []struct { - name string - snapshot vpnruntime.FabricFlowSchedulerSnapshot - want string - }{ - {name: "drops", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"drops"}}, want: "shed_or_reroute"}, - {name: "failures", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"route_failures"}}, want: "rebuild_or_reroute"}, - {name: "recovery", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{RouteSwitchCount: 1}, want: "observe_recovery"}, - {name: "slow", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"slow_channels"}}, want: "prefer_faster_route"}, - {name: "bulk", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{BulkPressureActive: true}, want: "throttle_bulk"}, - {name: "backpressure", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{BackpressureActive: true}, want: "reduce_parallelism"}, - {name: "nominal", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{}, want: "observe"}, - } - for _, tc := range cases { - if got := vpnFabricFlowPressureAction(tc.snapshot); got != tc.want { - t.Fatalf("%s action = %q, want %q", tc.name, got, tc.want) - } - } -} - func TestVPNFabricFlowPressureReportIncludesRecommendedAction(t *testing.T) { report := vpnFabricFlowPressureReport(vpnruntime.FabricFlowSchedulerSnapshot{ PressureLevel: "warning", PressureScore: 35, PressureReasons: []string{"bulk_pressure", "backpressure"}, + RecommendedAction: "throttle_bulk", BackpressureActive: true, BulkPressureActive: true, BulkPressureChannelCount: 16, diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index 82c03f0..48d8457 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -255,6 +255,7 @@ type FabricFlowSchedulerSnapshot struct { PressureLevel string `json:"pressure_level,omitempty"` PressureScore int `json:"pressure_score,omitempty"` PressureReasons []string `json:"pressure_reasons,omitempty"` + RecommendedAction string `json:"recommended_action,omitempty"` InFlight int `json:"in_flight"` MaxInFlight int `json:"max_in_flight"` AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"` @@ -864,6 +865,7 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { } } snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot) + snapshot.RecommendedAction = fabricFlowSchedulerRecommendedAction(snapshot) return snapshot } @@ -940,6 +942,32 @@ func flowPressureRank(level string) int { } } +func fabricFlowSchedulerRecommendedAction(snapshot FabricFlowSchedulerSnapshot) string { + reasons := map[string]struct{}{} + for _, reason := range snapshot.PressureReasons { + reasons[strings.TrimSpace(reason)] = struct{}{} + } + if _, ok := reasons["drops"]; ok || snapshot.QualityWindowDropCount > 0 { + return "shed_or_reroute" + } + if _, ok := reasons["route_failures"]; ok || snapshot.QualityWindowFailureCount > 0 || snapshot.FailingChannelCount > 0 { + return "rebuild_or_reroute" + } + if _, ok := reasons["route_recovery"]; ok || snapshot.RouteSwitchCount > 0 { + return "observe_recovery" + } + if _, ok := reasons["slow_channels"]; ok || snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 { + return "prefer_faster_route" + } + if _, ok := reasons["bulk_pressure"]; ok || snapshot.BulkPressureActive { + return "throttle_bulk" + } + if snapshot.AdaptiveBackpressureActive || snapshot.BackpressureActive { + return "reduce_parallelism" + } + return "observe" +} + func boundedFabricPressureScore(value, minValue, maxValue int) int { if value < minValue { return minValue diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 3cd40c8..478e824 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -792,6 +792,9 @@ func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) { if snapshot.PressureLevel != "critical" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "drops") { t.Fatalf("pressure = %s score=%d reasons=%v, want critical drops", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons) } + if snapshot.RecommendedAction != "shed_or_reroute" { + t.Fatalf("recommended action = %q, want shed_or_reroute", snapshot.RecommendedAction) + } } func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) { @@ -1553,6 +1556,9 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test !containsString(snapshot.FlowScheduler.PressureReasons, "route_failures") { t.Fatalf("route recovery pressure = %s score=%d reasons=%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureScore, snapshot.FlowScheduler.PressureReasons) } + if snapshot.FlowScheduler.RecommendedAction != "rebuild_or_reroute" { + t.Fatalf("route recovery action = %q, want rebuild_or_reroute", snapshot.FlowScheduler.RecommendedAction) + } if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB) } @@ -1923,6 +1929,9 @@ func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testi if snapshot.PressureLevel != "warning" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "bulk_pressure") { t.Fatalf("pressure = %s score=%d reasons=%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons) } + if snapshot.RecommendedAction != "throttle_bulk" { + t.Fatalf("recommended action = %q, want throttle_bulk", snapshot.RecommendedAction) + } } func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 19df906..1df6413 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -475,6 +475,9 @@ recovery timing, reason counts, and recommended per-class windows. The `flow_pressure` summary includes a `recommended_action` such as `observe`, `throttle_bulk`, `reduce_parallelism`, `prefer_faster_route`, `observe_recovery`, `rebuild_or_reroute`, or `shed_or_reroute`. +`recommended_action` is now part of the shared `FabricFlowSchedulerSnapshot` +contract, so heartbeat reports and smoke diagnostics consume the same runtime +decision. `mesh-live-smoke` reports the recommended action for its mixed bulk/interactive load scenario. Nodes advertise the `vpn_fabric_flow_pressure` capability when that heartbeat