From 8e9402580f750beb5b56a885d26aaa8f1f74b493 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 13:47:42 +0300 Subject: [PATCH] Track VPN pressure history --- .../rap-node-agent/cmd/rap-node-agent/main.go | 13 ++ .../cmd/rap-node-agent/main_test.go | 13 ++ .../internal/vpnruntime/fabric_transport.go | 159 ++++++++++++------ .../vpnruntime/fabric_transport_test.go | 33 ++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 4 + 5 files changed, 171 insertions(+), 51 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 4548f1c..9d07232 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -910,6 +910,7 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot "pressure_score": snapshot.PressureScore, "pressure_reasons": append([]string{}, snapshot.PressureReasons...), "recommended_action": snapshot.RecommendedAction, + "pressure_history": copyFabricFlowPressureHistory(snapshot.PressureHistory), "backpressure_active": snapshot.BackpressureActive, "bulk_pressure_active": snapshot.BulkPressureActive, "bulk_pressure_channel_count": snapshot.BulkPressureChannelCount, @@ -936,6 +937,18 @@ func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot return report } +func copyFabricFlowPressureHistory(in []vpnruntime.FabricFlowPressureHistorySample) []vpnruntime.FabricFlowPressureHistorySample { + if len(in) == 0 { + return []vpnruntime.FabricFlowPressureHistorySample{} + } + out := make([]vpnruntime.FabricFlowPressureHistorySample, 0, len(in)) + for _, sample := range in { + sample.PressureReasons = append([]string{}, sample.PressureReasons...) + out = append(out, sample) + } + return out +} + func copyStringIntMap(in map[string]int) map[string]int { if len(in) == 0 { return map[string]int{} diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index d5180dc..65144d3 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -1260,12 +1260,25 @@ func TestVPNFabricFlowPressureReportIncludesRecommendedAction(t *testing.T) { RouteRecoveredChannelCount: 0, RouteRecoveryMaxMillis: 0, RouteRecoveryAvgMillis: 0, + PressureHistory: []vpnruntime.FabricFlowPressureHistorySample{ + { + ObservedAt: "2026-05-16T12:00:00Z", + PressureLevel: "warning", + PressureScore: 35, + PressureReasons: []string{"bulk_pressure"}, + RecommendedAction: "throttle_bulk", + }, + }, }) if report["recommended_action"] != "throttle_bulk" || report["pressure_score"] != 35 || report["bulk_pressure_channel_count"] != 16 { t.Fatalf("unexpected flow pressure report: %+v", report) } + history, ok := report["pressure_history"].([]vpnruntime.FabricFlowPressureHistorySample) + if !ok || len(history) != 1 || history[0].RecommendedAction != "throttle_bulk" { + t.Fatalf("unexpected flow pressure history: %+v", report["pressure_history"]) + } } func TestMergedEndpointCandidateObservationsKeepsNewest(t *testing.T) { diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index 48d8457..95f7ed2 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -7,6 +7,7 @@ import ( "fmt" "hash/fnv" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -26,6 +27,7 @@ const ( defaultFabricFlowFailureThreshold = 2 defaultFabricFlowSlowSendThreshold = 2 * time.Second defaultFabricRouteQualitySwitchThreshold = 30 + defaultFabricFlowPressureHistoryCapacity = 8 ) type FabricPacketTransport struct { @@ -130,17 +132,19 @@ type FabricServiceChannelRouteQualityPreference struct { } type FabricFlowScheduler struct { - mu sync.Mutex - shardCount int - queueCapacity int - adaptivePolicy FabricServiceChannelAdaptivePolicy - queues map[string]*fabricFlowQueue - enqueued uint64 - dequeued uint64 - dropped uint64 - highWatermark int - inFlight int - maxInFlight int + mu sync.Mutex + shardCount int + queueCapacity int + adaptivePolicy FabricServiceChannelAdaptivePolicy + queues map[string]*fabricFlowQueue + enqueued uint64 + dequeued uint64 + dropped uint64 + highWatermark int + inFlight int + maxInFlight int + pressureHistory []FabricFlowPressureHistorySample + lastPressureFingerprint string } type FabricServiceChannelAdaptivePolicy struct { @@ -239,46 +243,55 @@ type FabricScheduledPacketBatch struct { } type FabricFlowSchedulerSnapshot struct { - SchemaVersion string `json:"schema_version"` - Enabled bool `json:"enabled"` - ServiceNeutral bool `json:"service_neutral"` - Classifier string `json:"classifier"` - ServiceMode string `json:"service_mode"` - ShardCount int `json:"shard_count"` - QueueCapacity int `json:"queue_capacity"` - ChannelCount int `json:"channel_count"` - Enqueued uint64 `json:"enqueued"` - Dequeued uint64 `json:"dequeued"` - Dropped uint64 `json:"dropped"` - HighWatermark int `json:"high_watermark"` - BackpressureActive bool `json:"backpressure_active"` - PressureLevel string `json:"pressure_level,omitempty"` - PressureScore int `json:"pressure_score,omitempty"` - PressureReasons []string `json:"pressure_reasons,omitempty"` - RecommendedAction string `json:"recommended_action,omitempty"` - InFlight int `json:"in_flight"` - MaxInFlight int `json:"max_in_flight"` - AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"` - AdaptiveBackpressureReason string `json:"adaptive_backpressure_reason,omitempty"` - RecommendedParallelWindows map[string]int `json:"recommended_parallel_windows,omitempty"` - AdaptivePolicyFingerprint string `json:"adaptive_policy_fingerprint,omitempty"` - BulkPressureActive bool `json:"bulk_pressure_active,omitempty"` - BulkPressureChannelCount int `json:"bulk_pressure_channel_count,omitempty"` - InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"` - RouteRecoveredChannelCount int `json:"route_recovered_channel_count,omitempty"` - RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` - RouteRecoveryMaxMillis int64 `json:"route_recovery_max_ms,omitempty"` - RouteRecoveryAvgMillis int64 `json:"route_recovery_avg_ms,omitempty"` - RouteSwitchReasonCounts map[string]int `json:"route_switch_reason_counts,omitempty"` - SlowChannelCount int `json:"slow_channel_count"` - FailingChannelCount int `json:"failing_channel_count"` - QualityWindowSampleCount int `json:"quality_window_sample_count"` - QualityWindowFailureCount int `json:"quality_window_failure_count"` - QualityWindowSlowCount int `json:"quality_window_slow_count"` - QualityWindowDropCount int `json:"quality_window_drop_count"` - QueueDepths map[string]int `json:"queue_depths"` - TrafficClassCounts map[string]int `json:"traffic_class_counts,omitempty"` - ChannelStats map[string]FabricFlowStat `json:"channel_stats"` + SchemaVersion string `json:"schema_version"` + Enabled bool `json:"enabled"` + ServiceNeutral bool `json:"service_neutral"` + Classifier string `json:"classifier"` + ServiceMode string `json:"service_mode"` + ShardCount int `json:"shard_count"` + QueueCapacity int `json:"queue_capacity"` + ChannelCount int `json:"channel_count"` + Enqueued uint64 `json:"enqueued"` + Dequeued uint64 `json:"dequeued"` + Dropped uint64 `json:"dropped"` + HighWatermark int `json:"high_watermark"` + BackpressureActive bool `json:"backpressure_active"` + PressureLevel string `json:"pressure_level,omitempty"` + PressureScore int `json:"pressure_score,omitempty"` + PressureReasons []string `json:"pressure_reasons,omitempty"` + RecommendedAction string `json:"recommended_action,omitempty"` + PressureHistory []FabricFlowPressureHistorySample `json:"pressure_history,omitempty"` + InFlight int `json:"in_flight"` + MaxInFlight int `json:"max_in_flight"` + AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"` + AdaptiveBackpressureReason string `json:"adaptive_backpressure_reason,omitempty"` + RecommendedParallelWindows map[string]int `json:"recommended_parallel_windows,omitempty"` + AdaptivePolicyFingerprint string `json:"adaptive_policy_fingerprint,omitempty"` + BulkPressureActive bool `json:"bulk_pressure_active,omitempty"` + BulkPressureChannelCount int `json:"bulk_pressure_channel_count,omitempty"` + InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"` + RouteRecoveredChannelCount int `json:"route_recovered_channel_count,omitempty"` + RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` + RouteRecoveryMaxMillis int64 `json:"route_recovery_max_ms,omitempty"` + RouteRecoveryAvgMillis int64 `json:"route_recovery_avg_ms,omitempty"` + RouteSwitchReasonCounts map[string]int `json:"route_switch_reason_counts,omitempty"` + SlowChannelCount int `json:"slow_channel_count"` + FailingChannelCount int `json:"failing_channel_count"` + QualityWindowSampleCount int `json:"quality_window_sample_count"` + QualityWindowFailureCount int `json:"quality_window_failure_count"` + QualityWindowSlowCount int `json:"quality_window_slow_count"` + QualityWindowDropCount int `json:"quality_window_drop_count"` + QueueDepths map[string]int `json:"queue_depths"` + TrafficClassCounts map[string]int `json:"traffic_class_counts,omitempty"` + ChannelStats map[string]FabricFlowStat `json:"channel_stats"` +} + +type FabricFlowPressureHistorySample struct { + ObservedAt string `json:"observed_at"` + PressureLevel string `json:"pressure_level"` + PressureScore int `json:"pressure_score"` + PressureReasons []string `json:"pressure_reasons,omitempty"` + RecommendedAction string `json:"recommended_action"` } type FabricFlowStat struct { @@ -866,9 +879,53 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { } snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons = fabricFlowSchedulerPressure(snapshot) snapshot.RecommendedAction = fabricFlowSchedulerRecommendedAction(snapshot) + s.recordPressureHistoryLocked(&snapshot, time.Now()) return snapshot } +func (s *FabricFlowScheduler) recordPressureHistoryLocked(snapshot *FabricFlowSchedulerSnapshot, observedAt time.Time) { + if s == nil || snapshot == nil { + return + } + fingerprint := fabricFlowPressureFingerprint(*snapshot) + if fingerprint != s.lastPressureFingerprint { + s.pressureHistory = append(s.pressureHistory, FabricFlowPressureHistorySample{ + ObservedAt: observedAt.UTC().Format(time.RFC3339Nano), + PressureLevel: snapshot.PressureLevel, + PressureScore: snapshot.PressureScore, + PressureReasons: append([]string{}, snapshot.PressureReasons...), + RecommendedAction: snapshot.RecommendedAction, + }) + if len(s.pressureHistory) > defaultFabricFlowPressureHistoryCapacity { + start := len(s.pressureHistory) - defaultFabricFlowPressureHistoryCapacity + s.pressureHistory = append([]FabricFlowPressureHistorySample{}, s.pressureHistory[start:]...) + } + s.lastPressureFingerprint = fingerprint + } + snapshot.PressureHistory = copyFabricFlowPressureHistory(s.pressureHistory) +} + +func fabricFlowPressureFingerprint(snapshot FabricFlowSchedulerSnapshot) string { + return strings.Join([]string{ + snapshot.PressureLevel, + strconv.Itoa(snapshot.PressureScore), + snapshot.RecommendedAction, + strings.Join(snapshot.PressureReasons, ","), + }, "|") +} + +func copyFabricFlowPressureHistory(in []FabricFlowPressureHistorySample) []FabricFlowPressureHistorySample { + if len(in) == 0 { + return nil + } + out := make([]FabricFlowPressureHistorySample, 0, len(in)) + for _, sample := range in { + sample.PressureReasons = append([]string{}, sample.PressureReasons...) + out = append(out, sample) + } + return out +} + func fabricFlowSchedulerPressure(snapshot FabricFlowSchedulerSnapshot) (string, int, []string) { level := "nominal" score := 0 diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 89a661f..cb2d4aa 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -805,6 +805,39 @@ func TestFabricFlowSchedulerSnapshotReportsNominalAction(t *testing.T) { snapshot.RecommendedAction != "observe" { t.Fatalf("nominal pressure snapshot = %+v", snapshot) } + if len(snapshot.PressureHistory) != 1 || + snapshot.PressureHistory[0].PressureLevel != "nominal" || + snapshot.PressureHistory[0].RecommendedAction != "observe" { + t.Fatalf("nominal pressure history = %+v", snapshot.PressureHistory) + } +} + +func TestFabricFlowSchedulerRecordsPressureHistoryTransitions(t *testing.T) { + scheduler := NewFabricFlowScheduler(1, 1) + nominal := scheduler.Snapshot() + if len(nominal.PressureHistory) != 1 || nominal.PressureHistory[0].RecommendedAction != "observe" { + t.Fatalf("nominal pressure history = %+v", nominal.PressureHistory) + } + + packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + packetB := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + scheduler.scheduleClientPackets("", "", [][]byte{packetA, packetB}) + + pressure := scheduler.Snapshot() + if len(pressure.PressureHistory) != 2 { + t.Fatalf("pressure history = %+v, want nominal plus critical transition", pressure.PressureHistory) + } + last := pressure.PressureHistory[len(pressure.PressureHistory)-1] + if last.PressureLevel != "critical" || + last.RecommendedAction != "shed_or_reroute" || + !containsString(last.PressureReasons, "drops") { + t.Fatalf("last pressure history sample = %+v", last) + } + + unchanged := scheduler.Snapshot() + if len(unchanged.PressureHistory) != 2 { + t.Fatalf("unchanged pressure history duplicated: %+v", unchanged.PressureHistory) + } } func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index a21cbba..dbd49f7 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -479,6 +479,10 @@ The `flow_pressure` summary includes a `recommended_action` such as contract, so heartbeat reports and smoke diagnostics consume the same runtime decision. The scheduler's nominal snapshot explicitly reports the `observe` action. +Flow-scheduler snapshots keep a bounded pressure transition history with the +observed level, score, reasons, and recommended action. Repeated snapshots do +not duplicate unchanged pressure states, so controllers can distinguish current +state from recent worsening or recovery without unbounded heartbeat growth. `mesh-live-smoke` reports the recommended action for its mixed bulk/interactive load scenario. Nodes advertise the `vpn_fabric_flow_pressure` capability when that heartbeat