From 9ea49c83382cb4ae58d9472cca08aa3bfd9fa9d2 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 13:17:35 +0300 Subject: [PATCH] Track VPN route switch reasons --- .../internal/vpnruntime/fabric_transport.go | 35 +++++++++++++++++++ .../vpnruntime/fabric_transport_test.go | 4 ++- .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 ++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index bbeb65c..1a8b5eb 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -187,6 +187,7 @@ type fabricFlowQueue struct { LastRecoveredFromRouteID string LastRecoveredNextHop string LastRouteSwitchAt time.Time + LastRouteSwitchReason string LastRouteRecoveryMillis int64 RouteSwitchCount uint64 LastError string @@ -264,6 +265,7 @@ type FabricFlowSchedulerSnapshot struct { RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` RouteRecoveryMaxMillis int64 `json:"route_recovery_max_ms,omitempty"` RouteRecoveryAvgMillis int64 `json:"route_recovery_avg_ms,omitempty"` + RouteSwitchReasonCounts map[string]int `json:"route_switch_reason_counts,omitempty"` SlowChannelCount int `json:"slow_channel_count"` FailingChannelCount int `json:"failing_channel_count"` QualityWindowSampleCount int `json:"quality_window_sample_count"` @@ -301,6 +303,7 @@ type FabricFlowStat struct { LastRecoveredFromRouteID string `json:"last_recovered_from_route_id,omitempty"` LastRecoveredNextHop string `json:"last_recovered_next_hop,omitempty"` LastRouteSwitchAt string `json:"last_route_switch_at,omitempty"` + LastRouteSwitchReason string `json:"last_route_switch_reason,omitempty"` LastRouteRecoveryMillis int64 `json:"last_route_recovery_ms,omitempty"` RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` LastError string `json:"last_error,omitempty"` @@ -661,6 +664,7 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { QueueDepths: map[string]int{}, TrafficClassCounts: map[string]int{}, RecommendedParallelWindows: map[string]int{}, + RouteSwitchReasonCounts: map[string]int{}, ChannelStats: map[string]FabricFlowStat{}, } if s == nil { @@ -710,6 +714,7 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { LastFailedRouteGeneration: queue.LastFailedRouteGeneration, LastRecoveredFromRouteID: queue.LastRecoveredFromRouteID, LastRecoveredNextHop: queue.LastRecoveredNextHop, + LastRouteSwitchReason: queue.LastRouteSwitchReason, LastRouteRecoveryMillis: queue.LastRouteRecoveryMillis, RouteSwitchCount: queue.RouteSwitchCount, LastError: queue.LastError, @@ -771,6 +776,7 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { LastRecoveredFromRouteID: stat.LastRecoveredFromRouteID, LastRecoveredNextHop: stat.LastRecoveredNextHop, LastRouteSwitchAt: stat.LastRouteSwitchAt, + LastRouteSwitchReason: stat.LastRouteSwitchReason, LastRouteRecoveryMillis: stat.LastRouteRecoveryMillis, RouteSwitchCount: stat.RouteSwitchCount, LastError: stat.LastError, @@ -802,6 +808,9 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { snapshot.RouteSwitchCount += queue.RouteSwitchCount if queue.LastRecoveredFromRouteID != "" { snapshot.RouteRecoveredChannelCount++ + if reason := strings.TrimSpace(queue.LastRouteSwitchReason); reason != "" { + snapshot.RouteSwitchReasonCounts[reason]++ + } if queue.LastRouteRecoveryMillis > snapshot.RouteRecoveryMaxMillis { snapshot.RouteRecoveryMaxMillis = queue.LastRouteRecoveryMillis } @@ -827,6 +836,9 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { if routeRecoverySamples > 0 { snapshot.RouteRecoveryAvgMillis = routeRecoveryTotalMillis / routeRecoverySamples } + if len(snapshot.RouteSwitchReasonCounts) == 0 { + snapshot.RouteSwitchReasonCounts = nil + } snapshot.BulkPressureChannelCount = snapshot.TrafficClassCounts[FabricTrafficClassBulk] snapshot.InteractiveOrControlCount = snapshot.TrafficClassCounts[FabricTrafficClassControl] + snapshot.TrafficClassCounts[FabricTrafficClassInteractive] bulkPressureThreshold := s.adaptivePolicy.BulkPressureChannelThreshold @@ -912,6 +924,27 @@ func classWindowLimit(policy FabricServiceChannelAdaptivePolicy, trafficClass st return maxWindow } +func normalizeFabricRouteSwitchReason(reason string) string { + reason = strings.ToLower(strings.TrimSpace(reason)) + if reason == "" { + return "route_failure" + } + replacer := strings.NewReplacer(" ", "_", "\t", "_", "\n", "_", "\r", "_", ":", "_", ";", "_", ",", "_") + reason = replacer.Replace(reason) + for strings.Contains(reason, "__") { + reason = strings.ReplaceAll(reason, "__", "_") + } + reason = strings.Trim(reason, "_") + if reason == "" { + return "route_failure" + } + if len(reason) > 80 { + reason = reason[:80] + reason = strings.TrimRight(reason, "_") + } + return reason +} + func clampFabricWindow(value, minValue, maxValue int) int { if value < minValue { return minValue @@ -1113,11 +1146,13 @@ func (s *FabricFlowScheduler) RecordRouteSuccessWithProvenance(channelID string, queue := s.ensureQueueLocked(channelID) failedRouteID := strings.TrimSpace(queue.LastFailedRouteID) failedNextHop := strings.TrimSpace(queue.LastNextHop) + failedReason := normalizeFabricRouteSwitchReason(queue.LastError) if failedRouteID != "" && strings.TrimSpace(routeID) != "" && failedRouteID != strings.TrimSpace(routeID) { switchedAt := time.Now().UTC() queue.LastRecoveredFromRouteID = failedRouteID queue.LastRecoveredNextHop = failedNextHop queue.LastRouteSwitchAt = switchedAt + queue.LastRouteSwitchReason = failedReason queue.LastRouteRecoveryMillis = 0 if !queue.LastRouteFailureAt.IsZero() { queue.LastRouteRecoveryMillis = switchedAt.Sub(queue.LastRouteFailureAt).Milliseconds() diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 7a7d687..c69dd6c 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -1532,6 +1532,7 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test } if statA.LastRecoveredFromRouteID != "route-primary" || statA.LastRecoveredNextHop != "relay-primary" || + statA.LastRouteSwitchReason != "production_mesh_next_peer_is_unavailable" || statA.RouteSwitchCount != 1 || statA.LastRouteFailureAt == "" || statA.LastRouteSwitchAt == "" || @@ -1539,7 +1540,8 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test snapshot.FlowScheduler.RouteRecoveredChannelCount != 1 || snapshot.FlowScheduler.RouteSwitchCount != 1 || snapshot.FlowScheduler.RouteRecoveryMaxMillis != statA.LastRouteRecoveryMillis || - snapshot.FlowScheduler.RouteRecoveryAvgMillis != statA.LastRouteRecoveryMillis { + snapshot.FlowScheduler.RouteRecoveryAvgMillis != statA.LastRouteRecoveryMillis || + snapshot.FlowScheduler.RouteSwitchReasonCounts["production_mesh_next_peer_is_unavailable"] != 1 { t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler) } if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 0b1b1e1..f26932d 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -453,6 +453,9 @@ Route recovery telemetry includes failure/switch timestamps and recovery duration in milliseconds for each recovered flow channel. Scheduler snapshots also aggregate route recovery max/average milliseconds across recovered channels for quick load-test health checks. +Route recovery telemetry now includes normalized switch reasons and aggregate +reason counts, so load tests can distinguish peer failures, timeouts, and other +route-break causes. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.