Track VPN pressure history

This commit is contained in:
2026-05-16 13:47:42 +03:00
parent 6a46063565
commit 8e9402580f
5 changed files with 171 additions and 51 deletions
@@ -910,6 +910,7 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot
"pressure_score": snapshot.PressureScore, "pressure_score": snapshot.PressureScore,
"pressure_reasons": append([]string{}, snapshot.PressureReasons...), "pressure_reasons": append([]string{}, snapshot.PressureReasons...),
"recommended_action": snapshot.RecommendedAction, "recommended_action": snapshot.RecommendedAction,
"pressure_history": copyFabricFlowPressureHistory(snapshot.PressureHistory),
"backpressure_active": snapshot.BackpressureActive, "backpressure_active": snapshot.BackpressureActive,
"bulk_pressure_active": snapshot.BulkPressureActive, "bulk_pressure_active": snapshot.BulkPressureActive,
"bulk_pressure_channel_count": snapshot.BulkPressureChannelCount, "bulk_pressure_channel_count": snapshot.BulkPressureChannelCount,
@@ -936,6 +937,18 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot
return report return report
} }
func copyFabricFlowPressureHistory(in []vpnruntime.FabricFlowPressureHistorySample) []vpnruntime.FabricFlowPressureHistorySample {
if len(in) == 0 {
return []vpnruntime.FabricFlowPressureHistorySample{}
}
out := make([]vpnruntime.FabricFlowPressureHistorySample, 0, len(in))
for _, sample := range in {
sample.PressureReasons = append([]string{}, sample.PressureReasons...)
out = append(out, sample)
}
return out
}
func copyStringIntMap(in map[string]int) map[string]int { func copyStringIntMap(in map[string]int) map[string]int {
if len(in) == 0 { if len(in) == 0 {
return map[string]int{} return map[string]int{}
@@ -1260,12 +1260,25 @@ func TestVPNFabricFlowPressureReportIncludesRecommendedAction(t *testing.T) {
RouteRecoveredChannelCount: 0, RouteRecoveredChannelCount: 0,
RouteRecoveryMaxMillis: 0, RouteRecoveryMaxMillis: 0,
RouteRecoveryAvgMillis: 0, RouteRecoveryAvgMillis: 0,
PressureHistory: []vpnruntime.FabricFlowPressureHistorySample{
{
ObservedAt: "2026-05-16T12:00:00Z",
PressureLevel: "warning",
PressureScore: 35,
PressureReasons: []string{"bulk_pressure"},
RecommendedAction: "throttle_bulk",
},
},
}) })
if report["recommended_action"] != "throttle_bulk" || if report["recommended_action"] != "throttle_bulk" ||
report["pressure_score"] != 35 || report["pressure_score"] != 35 ||
report["bulk_pressure_channel_count"] != 16 { report["bulk_pressure_channel_count"] != 16 {
t.Fatalf("unexpected flow pressure report: %+v", report) t.Fatalf("unexpected flow pressure report: %+v", report)
} }
history, ok := report["pressure_history"].([]vpnruntime.FabricFlowPressureHistorySample)
if !ok || len(history) != 1 || history[0].RecommendedAction != "throttle_bulk" {
t.Fatalf("unexpected flow pressure history: %+v", report["pressure_history"])
}
} }
func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) {
@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"sort" "sort"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -26,6 +27,7 @@ const (
defaultFabricFlowFailureThreshold = 2 defaultFabricFlowFailureThreshold = 2
defaultFabricFlowSlowSendThreshold = 2 * time.Second defaultFabricFlowSlowSendThreshold = 2 * time.Second
defaultFabricRouteQualitySwitchThreshold = 30 defaultFabricRouteQualitySwitchThreshold = 30
defaultFabricFlowPressureHistoryCapacity = 8
) )
type FabricPacketTransport struct { type FabricPacketTransport struct {
@@ -130,17 +132,19 @@ type FabricServiceChannelRouteQualityPreference struct {
} }
type FabricFlowScheduler struct { type FabricFlowScheduler struct {
mu sync.Mutex mu sync.Mutex
shardCount int shardCount int
queueCapacity int queueCapacity int
adaptivePolicy FabricServiceChannelAdaptivePolicy adaptivePolicy FabricServiceChannelAdaptivePolicy
queues map[string]*fabricFlowQueue queues map[string]*fabricFlowQueue
enqueued uint64 enqueued uint64
dequeued uint64 dequeued uint64
dropped uint64 dropped uint64
highWatermark int highWatermark int
inFlight int inFlight int
maxInFlight int maxInFlight int
pressureHistory []FabricFlowPressureHistorySample
lastPressureFingerprint string
} }
type FabricServiceChannelAdaptivePolicy struct { type FabricServiceChannelAdaptivePolicy struct {
@@ -239,46 +243,55 @@ type FabricScheduledPacketBatch struct {
} }
type FabricFlowSchedulerSnapshot struct { type FabricFlowSchedulerSnapshot struct {
SchemaVersion string `json:"schema_version"` SchemaVersion string `json:"schema_version"`
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
ServiceNeutral bool `json:"service_neutral"` ServiceNeutral bool `json:"service_neutral"`
Classifier string `json:"classifier"` Classifier string `json:"classifier"`
ServiceMode string `json:"service_mode"` ServiceMode string `json:"service_mode"`
ShardCount int `json:"shard_count"` ShardCount int `json:"shard_count"`
QueueCapacity int `json:"queue_capacity"` QueueCapacity int `json:"queue_capacity"`
ChannelCount int `json:"channel_count"` ChannelCount int `json:"channel_count"`
Enqueued uint64 `json:"enqueued"` Enqueued uint64 `json:"enqueued"`
Dequeued uint64 `json:"dequeued"` Dequeued uint64 `json:"dequeued"`
Dropped uint64 `json:"dropped"` Dropped uint64 `json:"dropped"`
HighWatermark int `json:"high_watermark"` HighWatermark int `json:"high_watermark"`
BackpressureActive bool `json:"backpressure_active"` BackpressureActive bool `json:"backpressure_active"`
PressureLevel string `json:"pressure_level,omitempty"` PressureLevel string `json:"pressure_level,omitempty"`
PressureScore int `json:"pressure_score,omitempty"` PressureScore int `json:"pressure_score,omitempty"`
PressureReasons []string `json:"pressure_reasons,omitempty"` PressureReasons []string `json:"pressure_reasons,omitempty"`
RecommendedAction string `json:"recommended_action,omitempty"` RecommendedAction string `json:"recommended_action,omitempty"`
InFlight int `json:"in_flight"` PressureHistory []FabricFlowPressureHistorySample `json:"pressure_history,omitempty"`
MaxInFlight int `json:"max_in_flight"` InFlight int `json:"in_flight"`
AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"` MaxInFlight int `json:"max_in_flight"`
AdaptiveBackpressureReason string `json:"adaptive_backpressure_reason,omitempty"` AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"`
RecommendedParallelWindows map[string]int `json:"recommended_parallel_windows,omitempty"` AdaptiveBackpressureReason string `json:"adaptive_backpressure_reason,omitempty"`
AdaptivePolicyFingerprint string `json:"adaptive_policy_fingerprint,omitempty"` RecommendedParallelWindows map[string]int `json:"recommended_parallel_windows,omitempty"`
BulkPressureActive bool `json:"bulk_pressure_active,omitempty"` AdaptivePolicyFingerprint string `json:"adaptive_policy_fingerprint,omitempty"`
BulkPressureChannelCount int `json:"bulk_pressure_channel_count,omitempty"` BulkPressureActive bool `json:"bulk_pressure_active,omitempty"`
InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"` BulkPressureChannelCount int `json:"bulk_pressure_channel_count,omitempty"`
RouteRecoveredChannelCount int `json:"route_recovered_channel_count,omitempty"` InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"`
RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` RouteRecoveredChannelCount int `json:"route_recovered_channel_count,omitempty"`
RouteRecoveryMaxMillis int64 `json:"route_recovery_max_ms,omitempty"` RouteSwitchCount uint64 `json:"route_switch_count,omitempty"`
RouteRecoveryAvgMillis int64 `json:"route_recovery_avg_ms,omitempty"` RouteRecoveryMaxMillis int64 `json:"route_recovery_max_ms,omitempty"`
RouteSwitchReasonCounts map[string]int `json:"route_switch_reason_counts,omitempty"` RouteRecoveryAvgMillis int64 `json:"route_recovery_avg_ms,omitempty"`
SlowChannelCount int `json:"slow_channel_count"` RouteSwitchReasonCounts map[string]int `json:"route_switch_reason_counts,omitempty"`
FailingChannelCount int `json:"failing_channel_count"` SlowChannelCount int `json:"slow_channel_count"`
QualityWindowSampleCount int `json:"quality_window_sample_count"` FailingChannelCount int `json:"failing_channel_count"`
QualityWindowFailureCount int `json:"quality_window_failure_count"` QualityWindowSampleCount int `json:"quality_window_sample_count"`
QualityWindowSlowCount int `json:"quality_window_slow_count"` QualityWindowFailureCount int `json:"quality_window_failure_count"`
QualityWindowDropCount int `json:"quality_window_drop_count"` QualityWindowSlowCount int `json:"quality_window_slow_count"`
QueueDepths map[string]int `json:"queue_depths"` QualityWindowDropCount int `json:"quality_window_drop_count"`
TrafficClassCounts map[string]int `json:"traffic_class_counts,omitempty"` QueueDepths map[string]int `json:"queue_depths"`
ChannelStats map[string]FabricFlowStat `json:"channel_stats"` TrafficClassCounts map[string]int `json:"traffic_class_counts,omitempty"`
ChannelStats map[string]FabricFlowStat `json:"channel_stats"`
}
type FabricFlowPressureHistorySample struct {
ObservedAt string `json:"observed_at"`
PressureLevel string `json:"pressure_level"`
PressureScore int `json:"pressure_score"`
PressureReasons []string `json:"pressure_reasons,omitempty"`
RecommendedAction string `json:"recommended_action"`
} }
type FabricFlowStat struct { type FabricFlowStat struct {
@@ -866,9 +879,53 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot {
} }
snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot) snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot)
snapshot.RecommendedAction = fabricFlowSchedulerRecommendedAction(snapshot) snapshot.RecommendedAction = fabricFlowSchedulerRecommendedAction(snapshot)
s.recordPressureHistoryLocked(&snapshot, time.Now())
return snapshot return snapshot
} }
func (s *FabricFlowScheduler) recordPressureHistoryLocked(snapshot *FabricFlowSchedulerSnapshot, observedAt time.Time) {
if s == nil || snapshot == nil {
return
}
fingerprint := fabricFlowPressureFingerprint(*snapshot)
if fingerprint != s.lastPressureFingerprint {
s.pressureHistory = append(s.pressureHistory, FabricFlowPressureHistorySample{
ObservedAt: observedAt.UTC().Format(time.RFC3339Nano),
PressureLevel: snapshot.PressureLevel,
PressureScore: snapshot.PressureScore,
PressureReasons: append([]string{}, snapshot.PressureReasons...),
RecommendedAction: snapshot.RecommendedAction,
})
if len(s.pressureHistory) > defaultFabricFlowPressureHistoryCapacity {
start := len(s.pressureHistory) - defaultFabricFlowPressureHistoryCapacity
s.pressureHistory = append([]FabricFlowPressureHistorySample{}, s.pressureHistory[start:]...)
}
s.lastPressureFingerprint = fingerprint
}
snapshot.PressureHistory = copyFabricFlowPressureHistory(s.pressureHistory)
}
func fabricFlowPressureFingerprint(snapshot FabricFlowSchedulerSnapshot) string {
return strings.Join([]string{
snapshot.PressureLevel,
strconv.Itoa(snapshot.PressureScore),
snapshot.RecommendedAction,
strings.Join(snapshot.PressureReasons, ","),
}, "|")
}
func copyFabricFlowPressureHistory(in []FabricFlowPressureHistorySample) []FabricFlowPressureHistorySample {
if len(in) == 0 {
return nil
}
out := make([]FabricFlowPressureHistorySample, 0, len(in))
for _, sample := range in {
sample.PressureReasons = append([]string{}, sample.PressureReasons...)
out = append(out, sample)
}
return out
}
func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, int, []string) { func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, int, []string) {
level := "nominal" level := "nominal"
score := 0 score := 0
@@ -805,6 +805,39 @@ func TestFabricFlowSchedulerSnapshotReportsNominalAction(t *testing.T) {
snapshot.RecommendedAction != "observe" { snapshot.RecommendedAction != "observe" {
t.Fatalf("nominal pressure snapshot = %+v", snapshot) t.Fatalf("nominal pressure snapshot = %+v", snapshot)
} }
if len(snapshot.PressureHistory) != 1 ||
snapshot.PressureHistory[0].PressureLevel != "nominal" ||
snapshot.PressureHistory[0].RecommendedAction != "observe" {
t.Fatalf("nominal pressure history = %+v", snapshot.PressureHistory)
}
}
func TestFabricFlowSchedulerRecordsPressureHistoryTransitions(t *testing.T) {
scheduler := NewFabricFlowScheduler(1, 1)
nominal := scheduler.Snapshot()
if len(nominal.PressureHistory) != 1 || nominal.PressureHistory[0].RecommendedAction != "observe" {
t.Fatalf("nominal pressure history = %+v", nominal.PressureHistory)
}
packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
packetB := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
scheduler.scheduleClientPackets("", "", [][]byte{packetA, packetB})
pressure := scheduler.Snapshot()
if len(pressure.PressureHistory) != 2 {
t.Fatalf("pressure history = %+v, want nominal plus critical transition", pressure.PressureHistory)
}
last := pressure.PressureHistory[len(pressure.PressureHistory)-1]
if last.PressureLevel != "critical" ||
last.RecommendedAction != "shed_or_reroute" ||
!containsString(last.PressureReasons, "drops") {
t.Fatalf("last pressure history sample = %+v", last)
}
unchanged := scheduler.Snapshot()
if len(unchanged.PressureHistory) != 2 {
t.Fatalf("unchanged pressure history duplicated: %+v", unchanged.PressureHistory)
}
} }
func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) { func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) {
@@ -479,6 +479,10 @@ The `flow_pressure` summary includes a `recommended_action` such as
contract, so heartbeat reports and smoke diagnostics consume the same runtime contract, so heartbeat reports and smoke diagnostics consume the same runtime
decision. decision.
The scheduler's nominal snapshot explicitly reports the `observe` action. The scheduler's nominal snapshot explicitly reports the `observe` action.
Flow-scheduler snapshots keep a bounded pressure transition history with the
observed level, score, reasons, and recommended action. Repeated snapshots do
not duplicate unchanged pressure states, so controllers can distinguish current
state from recent worsening or recovery without unbounded heartbeat growth.
`mesh-live-smoke` reports the recommended action for its mixed bulk/interactive `mesh-live-smoke` reports the recommended action for its mixed bulk/interactive
load scenario. load scenario.
Nodes advertise the `vpn_fabric_flow_pressure` capability when that heartbeat Nodes advertise the `vpn_fabric_flow_pressure` capability when that heartbeat