diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index fe4bc6e..bbeb65c 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -262,6 +262,8 @@ type FabricFlowSchedulerSnapshot struct { InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"` RouteRecoveredChannelCount int `json:"route_recovered_channel_count,omitempty"` RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` + RouteRecoveryMaxMillis int64 `json:"route_recovery_max_ms,omitempty"` + RouteRecoveryAvgMillis int64 `json:"route_recovery_avg_ms,omitempty"` SlowChannelCount int `json:"slow_channel_count"` FailingChannelCount int `json:"failing_channel_count"` QualityWindowSampleCount int `json:"quality_window_sample_count"` @@ -678,6 +680,8 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { snapshot.HighWatermark = s.highWatermark snapshot.InFlight = s.inFlight snapshot.MaxInFlight = s.maxInFlight + var routeRecoveryTotalMillis int64 + var routeRecoverySamples int64 for channelID, queue := range s.queues { qualityStats := queue.qualityWindowStats() snapshot.QueueDepths[channelID] = queue.Depth @@ -798,6 +802,11 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { snapshot.RouteSwitchCount += queue.RouteSwitchCount if queue.LastRecoveredFromRouteID != "" { snapshot.RouteRecoveredChannelCount++ + if queue.LastRouteRecoveryMillis > snapshot.RouteRecoveryMaxMillis { + snapshot.RouteRecoveryMaxMillis = queue.LastRouteRecoveryMillis + } + routeRecoveryTotalMillis += queue.LastRouteRecoveryMillis + routeRecoverySamples++ } if queue.Depth >= s.queueCapacity || qualityStats.DropCount > 0 { snapshot.BackpressureActive = true @@ -815,6 +824,9 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { if snapshot.QualityWindowDropCount > 0 { snapshot.BackpressureActive = true } + if routeRecoverySamples > 0 { + snapshot.RouteRecoveryAvgMillis = routeRecoveryTotalMillis / routeRecoverySamples + } snapshot.BulkPressureChannelCount = snapshot.TrafficClassCounts[FabricTrafficClassBulk] snapshot.InteractiveOrControlCount = snapshot.TrafficClassCounts[FabricTrafficClassControl] + snapshot.TrafficClassCounts[FabricTrafficClassInteractive] bulkPressureThreshold := s.adaptivePolicy.BulkPressureChannelThreshold diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 8a86dcc..7a7d687 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -1537,7 +1537,9 @@ func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *test statA.LastRouteSwitchAt == "" || statA.LastRouteRecoveryMillis < 0 || snapshot.FlowScheduler.RouteRecoveredChannelCount != 1 || - snapshot.FlowScheduler.RouteSwitchCount != 1 { + snapshot.FlowScheduler.RouteSwitchCount != 1 || + snapshot.FlowScheduler.RouteRecoveryMaxMillis != statA.LastRouteRecoveryMillis || + snapshot.FlowScheduler.RouteRecoveryAvgMillis != statA.LastRouteRecoveryMillis { t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler) } if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index f1956a5..54abc80 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -449,6 +449,8 @@ The same smoke output reports measured route recovery milliseconds for the synthetic failover path. Route recovery telemetry includes failure/switch timestamps and recovery duration in milliseconds for each recovered flow channel. +Scheduler snapshots also aggregate route recovery max/average milliseconds +across recovered channels for quick load-test health checks. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.