diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index d5aaea8..20d1711 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -909,6 +909,7 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot "pressure_level": snapshot.PressureLevel, "pressure_score": snapshot.PressureScore, "pressure_reasons": append([]string{}, snapshot.PressureReasons...), + "recommended_action": vpnFabricFlowPressureAction(snapshot), "backpressure_active": snapshot.BackpressureActive, "bulk_pressure_active": snapshot.BulkPressureActive, "bulk_pressure_channel_count": snapshot.BulkPressureChannelCount, @@ -932,6 +933,32 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot return report } +func vpnFabricFlowPressureAction(snapshot vpnruntime.FabricFlowSchedulerSnapshot) string { + reasons := map[string]struct{}{} + for _, reason := range snapshot.PressureReasons { + reasons[strings.TrimSpace(reason)] = struct{}{} + } + if _, ok := reasons["drops"]; ok || snapshot.QualityWindowDropCount > 0 { + return "shed_or_reroute" + } + if _, ok := reasons["route_failures"]; ok || snapshot.QualityWindowFailureCount > 0 || snapshot.FailingChannelCount > 0 { + return "rebuild_or_reroute" + } + if _, ok := reasons["route_recovery"]; ok || snapshot.RouteSwitchCount > 0 { + return "observe_recovery" + } + if _, ok := reasons["slow_channels"]; ok || snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 { + return "prefer_faster_route" + } + if _, ok := reasons["bulk_pressure"]; ok || snapshot.BulkPressureActive { + return "throttle_bulk" + } + if snapshot.AdaptiveBackpressureActive || snapshot.BackpressureActive { + return "reduce_parallelism" + } + return "observe" +} + func copyStringIntMap(in map[string]int) map[string]int { if len(in) == 0 { return map[string]int{} diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index d478019..3a98ea2 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -775,7 +775,8 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { } else if pressure, ok := report["flow_pressure"].(map[string]any); !ok || pressure["schema_version"] != "rap.vpn_fabric_flow_pressure.v1" || pressure["pressure_level"] != "nominal" || - pressure["pressure_score"] != 0 { + pressure["pressure_score"] != 0 || + pressure["recommended_action"] != "observe" { t.Fatalf("vpn fabric flow pressure report missing: %+v", report) } if payload.Capabilities["vpn_fabric_session_stream_shards"] != true { @@ -1238,6 +1239,27 @@ func TestVPNFabricQUICPressureReportRanksBusyConnections(t *testing.T) { } } +func TestVPNFabricFlowPressureActionPrioritizesAutomation(t *testing.T) { + cases := []struct { + name string + snapshot vpnruntime.FabricFlowSchedulerSnapshot + want string + }{ + {name: "drops", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"drops"}}, want: "shed_or_reroute"}, + {name: "failures", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"route_failures"}}, want: "rebuild_or_reroute"}, + {name: "recovery", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{RouteSwitchCount: 1}, want: "observe_recovery"}, + {name: "slow", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"slow_channels"}}, want: "prefer_faster_route"}, + {name: "bulk", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{BulkPressureActive: true}, want: "throttle_bulk"}, + {name: "backpressure", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{BackpressureActive: true}, want: "reduce_parallelism"}, + {name: "nominal", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{}, want: "observe"}, + } + for _, tc := range cases { + if got := vpnFabricFlowPressureAction(tc.snapshot); got != tc.want { + t.Fatalf("%s action = %q, want %q", tc.name, got, tc.want) + } + } +} + func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { now := time.Now().UTC() merged := mergedEndpointCandidateObservations( diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 37e92e2..22ecb55 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -472,6 +472,9 @@ reasons. Heartbeat VPN fabric transport reports now include a compact `flow_pressure` summary with level, score, reasons, bulk pressure, route recovery timing, reason counts, and recommended per-class windows. +The `flow_pressure` summary includes a `recommended_action` such as +`observe`, `throttle_bulk`, `reduce_parallelism`, `prefer_faster_route`, +`observe_recovery`, `rebuild_or_reroute`, or `shed_or_reroute`. Nodes advertise the `vpn_fabric_flow_pressure` capability when that heartbeat summary is available. When the VPN fabric ingress runtime has not been initialized yet, the heartbeat