Promote VPN pressure action to snapshot
This commit is contained in:
@@ -236,38 +236,7 @@ func smokeVPNFlowSchedulerBulkPressure() (bool, int, int, int, int, string, int,
|
|||||||
snapshot.PressureLevel,
|
snapshot.PressureLevel,
|
||||||
snapshot.PressureScore,
|
snapshot.PressureScore,
|
||||||
snapshot.PressureReasons,
|
snapshot.PressureReasons,
|
||||||
smokeVPNPressureAction(snapshot)
|
snapshot.RecommendedAction
|
||||||
}
|
|
||||||
|
|
||||||
func smokeVPNPressureAction(snapshot vpnruntime.FabricFlowSchedulerSnapshot) string {
|
|
||||||
if containsSmokeString(snapshot.PressureReasons, "drops") || snapshot.QualityWindowDropCount > 0 {
|
|
||||||
return "shed_or_reroute"
|
|
||||||
}
|
|
||||||
if containsSmokeString(snapshot.PressureReasons, "route_failures") || snapshot.QualityWindowFailureCount > 0 || snapshot.FailingChannelCount > 0 {
|
|
||||||
return "rebuild_or_reroute"
|
|
||||||
}
|
|
||||||
if containsSmokeString(snapshot.PressureReasons, "route_recovery") || snapshot.RouteSwitchCount > 0 {
|
|
||||||
return "observe_recovery"
|
|
||||||
}
|
|
||||||
if containsSmokeString(snapshot.PressureReasons, "slow_channels") || snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 {
|
|
||||||
return "prefer_faster_route"
|
|
||||||
}
|
|
||||||
if containsSmokeString(snapshot.PressureReasons, "bulk_pressure") || snapshot.BulkPressureActive {
|
|
||||||
return "throttle_bulk"
|
|
||||||
}
|
|
||||||
if snapshot.AdaptiveBackpressureActive || snapshot.BackpressureActive {
|
|
||||||
return "reduce_parallelism"
|
|
||||||
}
|
|
||||||
return "observe"
|
|
||||||
}
|
|
||||||
|
|
||||||
func containsSmokeString(values []string, needle string) bool {
|
|
||||||
for _, value := range values {
|
|
||||||
if value == needle {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func smokeVPNFlowSchedulerRouteRecovery() (bool, uint64, int64, int64, int64, string) {
|
func smokeVPNFlowSchedulerRouteRecovery() (bool, uint64, int64, int64, int64, string) {
|
||||||
|
|||||||
@@ -909,7 +909,7 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot
|
|||||||
"pressure_level": snapshot.PressureLevel,
|
"pressure_level": snapshot.PressureLevel,
|
||||||
"pressure_score": snapshot.PressureScore,
|
"pressure_score": snapshot.PressureScore,
|
||||||
"pressure_reasons": append([]string{}, snapshot.PressureReasons...),
|
"pressure_reasons": append([]string{}, snapshot.PressureReasons...),
|
||||||
"recommended_action": vpnFabricFlowPressureAction(snapshot),
|
"recommended_action": snapshot.RecommendedAction,
|
||||||
"backpressure_active": snapshot.BackpressureActive,
|
"backpressure_active": snapshot.BackpressureActive,
|
||||||
"bulk_pressure_active": snapshot.BulkPressureActive,
|
"bulk_pressure_active": snapshot.BulkPressureActive,
|
||||||
"bulk_pressure_channel_count": snapshot.BulkPressureChannelCount,
|
"bulk_pressure_channel_count": snapshot.BulkPressureChannelCount,
|
||||||
@@ -930,35 +930,12 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot
|
|||||||
if report["pressure_level"] == "" {
|
if report["pressure_level"] == "" {
|
||||||
report["pressure_level"] = "nominal"
|
report["pressure_level"] = "nominal"
|
||||||
}
|
}
|
||||||
|
if report["recommended_action"] == "" {
|
||||||
|
report["recommended_action"] = "observe"
|
||||||
|
}
|
||||||
return report
|
return report
|
||||||
}
|
}
|
||||||
|
|
||||||
func vpnFabricFlowPressureAction(snapshot vpnruntime.FabricFlowSchedulerSnapshot) string {
|
|
||||||
reasons := map[string]struct{}{}
|
|
||||||
for _, reason := range snapshot.PressureReasons {
|
|
||||||
reasons[strings.TrimSpace(reason)] = struct{}{}
|
|
||||||
}
|
|
||||||
if _, ok := reasons["drops"]; ok || snapshot.QualityWindowDropCount > 0 {
|
|
||||||
return "shed_or_reroute"
|
|
||||||
}
|
|
||||||
if _, ok := reasons["route_failures"]; ok || snapshot.QualityWindowFailureCount > 0 || snapshot.FailingChannelCount > 0 {
|
|
||||||
return "rebuild_or_reroute"
|
|
||||||
}
|
|
||||||
if _, ok := reasons["route_recovery"]; ok || snapshot.RouteSwitchCount > 0 {
|
|
||||||
return "observe_recovery"
|
|
||||||
}
|
|
||||||
if _, ok := reasons["slow_channels"]; ok || snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 {
|
|
||||||
return "prefer_faster_route"
|
|
||||||
}
|
|
||||||
if _, ok := reasons["bulk_pressure"]; ok || snapshot.BulkPressureActive {
|
|
||||||
return "throttle_bulk"
|
|
||||||
}
|
|
||||||
if snapshot.AdaptiveBackpressureActive || snapshot.BackpressureActive {
|
|
||||||
return "reduce_parallelism"
|
|
||||||
}
|
|
||||||
return "observe"
|
|
||||||
}
|
|
||||||
|
|
||||||
func copyStringIntMap(in map[string]int) map[string]int {
|
func copyStringIntMap(in map[string]int) map[string]int {
|
||||||
if len(in) == 0 {
|
if len(in) == 0 {
|
||||||
return map[string]int{}
|
return map[string]int{}
|
||||||
|
|||||||
@@ -1239,32 +1239,12 @@ func TestVPNFabricQUICPressureReportRanksBusyConnections(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVPNFabricFlowPressureActionPrioritizesAutomation(t *testing.T) {
|
|
||||||
cases := []struct {
|
|
||||||
name string
|
|
||||||
snapshot vpnruntime.FabricFlowSchedulerSnapshot
|
|
||||||
want string
|
|
||||||
}{
|
|
||||||
{name: "drops", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"drops"}}, want: "shed_or_reroute"},
|
|
||||||
{name: "failures", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"route_failures"}}, want: "rebuild_or_reroute"},
|
|
||||||
{name: "recovery", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{RouteSwitchCount: 1}, want: "observe_recovery"},
|
|
||||||
{name: "slow", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{PressureReasons: []string{"slow_channels"}}, want: "prefer_faster_route"},
|
|
||||||
{name: "bulk", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{BulkPressureActive: true}, want: "throttle_bulk"},
|
|
||||||
{name: "backpressure", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{BackpressureActive: true}, want: "reduce_parallelism"},
|
|
||||||
{name: "nominal", snapshot: vpnruntime.FabricFlowSchedulerSnapshot{}, want: "observe"},
|
|
||||||
}
|
|
||||||
for _, tc := range cases {
|
|
||||||
if got := vpnFabricFlowPressureAction(tc.snapshot); got != tc.want {
|
|
||||||
t.Fatalf("%s action = %q, want %q", tc.name, got, tc.want)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestVPNFabricFlowPressureReportIncludesRecommendedAction(t *testing.T) {
|
func TestVPNFabricFlowPressureReportIncludesRecommendedAction(t *testing.T) {
|
||||||
report := vpnFabricFlowPressureReport(vpnruntime.FabricFlowSchedulerSnapshot{
|
report := vpnFabricFlowPressureReport(vpnruntime.FabricFlowSchedulerSnapshot{
|
||||||
PressureLevel: "warning",
|
PressureLevel: "warning",
|
||||||
PressureScore: 35,
|
PressureScore: 35,
|
||||||
PressureReasons: []string{"bulk_pressure", "backpressure"},
|
PressureReasons: []string{"bulk_pressure", "backpressure"},
|
||||||
|
RecommendedAction: "throttle_bulk",
|
||||||
BackpressureActive: true,
|
BackpressureActive: true,
|
||||||
BulkPressureActive: true,
|
BulkPressureActive: true,
|
||||||
BulkPressureChannelCount: 16,
|
BulkPressureChannelCount: 16,
|
||||||
|
|||||||
@@ -255,6 +255,7 @@ type FabricFlowSchedulerSnapshot struct {
|
|||||||
PressureLevel string `json:"pressure_level,omitempty"`
|
PressureLevel string `json:"pressure_level,omitempty"`
|
||||||
PressureScore int `json:"pressure_score,omitempty"`
|
PressureScore int `json:"pressure_score,omitempty"`
|
||||||
PressureReasons []string `json:"pressure_reasons,omitempty"`
|
PressureReasons []string `json:"pressure_reasons,omitempty"`
|
||||||
|
RecommendedAction string `json:"recommended_action,omitempty"`
|
||||||
InFlight int `json:"in_flight"`
|
InFlight int `json:"in_flight"`
|
||||||
MaxInFlight int `json:"max_in_flight"`
|
MaxInFlight int `json:"max_in_flight"`
|
||||||
AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"`
|
AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"`
|
||||||
@@ -864,6 +865,7 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot)
|
snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot)
|
||||||
|
snapshot.RecommendedAction = fabricFlowSchedulerRecommendedAction(snapshot)
|
||||||
return snapshot
|
return snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -940,6 +942,32 @@ func flowPressureRank(level string) int {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fabricFlowSchedulerRecommendedAction(snapshot FabricFlowSchedulerSnapshot) string {
|
||||||
|
reasons := map[string]struct{}{}
|
||||||
|
for _, reason := range snapshot.PressureReasons {
|
||||||
|
reasons[strings.TrimSpace(reason)] = struct{}{}
|
||||||
|
}
|
||||||
|
if _, ok := reasons["drops"]; ok || snapshot.QualityWindowDropCount > 0 {
|
||||||
|
return "shed_or_reroute"
|
||||||
|
}
|
||||||
|
if _, ok := reasons["route_failures"]; ok || snapshot.QualityWindowFailureCount > 0 || snapshot.FailingChannelCount > 0 {
|
||||||
|
return "rebuild_or_reroute"
|
||||||
|
}
|
||||||
|
if _, ok := reasons["route_recovery"]; ok || snapshot.RouteSwitchCount > 0 {
|
||||||
|
return "observe_recovery"
|
||||||
|
}
|
||||||
|
if _, ok := reasons["slow_channels"]; ok || snapshot.SlowChannelCount > 0 || snapshot.QualityWindowSlowCount > 0 {
|
||||||
|
return "prefer_faster_route"
|
||||||
|
}
|
||||||
|
if _, ok := reasons["bulk_pressure"]; ok || snapshot.BulkPressureActive {
|
||||||
|
return "throttle_bulk"
|
||||||
|
}
|
||||||
|
if snapshot.AdaptiveBackpressureActive || snapshot.BackpressureActive {
|
||||||
|
return "reduce_parallelism"
|
||||||
|
}
|
||||||
|
return "observe"
|
||||||
|
}
|
||||||
|
|
||||||
func boundedFabricPressureScore(value, minValue, maxValue int) int {
|
func boundedFabricPressureScore(value, minValue, maxValue int) int {
|
||||||
if value < minValue {
|
if value < minValue {
|
||||||
return minValue
|
return minValue
|
||||||
|
|||||||
@@ -792,6 +792,9 @@ func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) {
|
|||||||
if snapshot.PressureLevel != "critical" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "drops") {
|
if snapshot.PressureLevel != "critical" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "drops") {
|
||||||
t.Fatalf("pressure = %s score=%d reasons=%v, want critical drops", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons)
|
t.Fatalf("pressure = %s score=%d reasons=%v, want critical drops", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons)
|
||||||
}
|
}
|
||||||
|
if snapshot.RecommendedAction != "shed_or_reroute" {
|
||||||
|
t.Fatalf("recommended action = %q, want shed_or_reroute", snapshot.RecommendedAction)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) {
|
func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) {
|
||||||
@@ -1553,6 +1556,9 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test
|
|||||||
!containsString(snapshot.FlowScheduler.PressureReasons, "route_failures") {
|
!containsString(snapshot.FlowScheduler.PressureReasons, "route_failures") {
|
||||||
t.Fatalf("route recovery pressure = %s score=%d reasons=%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureScore, snapshot.FlowScheduler.PressureReasons)
|
t.Fatalf("route recovery pressure = %s score=%d reasons=%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureScore, snapshot.FlowScheduler.PressureReasons)
|
||||||
}
|
}
|
||||||
|
if snapshot.FlowScheduler.RecommendedAction != "rebuild_or_reroute" {
|
||||||
|
t.Fatalf("route recovery action = %q, want rebuild_or_reroute", snapshot.FlowScheduler.RecommendedAction)
|
||||||
|
}
|
||||||
if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 {
|
if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 {
|
||||||
t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB)
|
t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB)
|
||||||
}
|
}
|
||||||
@@ -1923,6 +1929,9 @@ func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testi
|
|||||||
if snapshot.PressureLevel != "warning" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "bulk_pressure") {
|
if snapshot.PressureLevel != "warning" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "bulk_pressure") {
|
||||||
t.Fatalf("pressure = %s score=%d reasons=%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons)
|
t.Fatalf("pressure = %s score=%d reasons=%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons)
|
||||||
}
|
}
|
||||||
|
if snapshot.RecommendedAction != "throttle_bulk" {
|
||||||
|
t.Fatalf("recommended action = %q, want throttle_bulk", snapshot.RecommendedAction)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) {
|
func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) {
|
||||||
|
|||||||
@@ -475,6 +475,9 @@ recovery timing, reason counts, and recommended per-class windows.
|
|||||||
The `flow_pressure` summary includes a `recommended_action` such as
|
The `flow_pressure` summary includes a `recommended_action` such as
|
||||||
`observe`, `throttle_bulk`, `reduce_parallelism`, `prefer_faster_route`,
|
`observe`, `throttle_bulk`, `reduce_parallelism`, `prefer_faster_route`,
|
||||||
`observe_recovery`, `rebuild_or_reroute`, or `shed_or_reroute`.
|
`observe_recovery`, `rebuild_or_reroute`, or `shed_or_reroute`.
|
||||||
|
`recommended_action` is now part of the shared `FabricFlowSchedulerSnapshot`
|
||||||
|
contract, so heartbeat reports and smoke diagnostics consume the same runtime
|
||||||
|
decision.
|
||||||
`mesh-live-smoke` reports the recommended action for its mixed bulk/interactive
|
`mesh-live-smoke` reports the recommended action for its mixed bulk/interactive
|
||||||
load scenario.
|
load scenario.
|
||||||
Nodes advertise the `vpn_fabric_flow_pressure` capability when that heartbeat
|
Nodes advertise the `vpn_fabric_flow_pressure` capability when that heartbeat
|
||||||
|
|||||||
Reference in New Issue
Block a user