diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index 993fb9c..e1be09b 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -251,6 +251,9 @@ type FabricFlowSchedulerSnapshot struct { AdaptiveBackpressureReason string `json:"adaptive_backpressure_reason,omitempty"` RecommendedParallelWindows map[string]int `json:"recommended_parallel_windows,omitempty"` AdaptivePolicyFingerprint string `json:"adaptive_policy_fingerprint,omitempty"` + BulkPressureActive bool `json:"bulk_pressure_active,omitempty"` + BulkPressureChannelCount int `json:"bulk_pressure_channel_count,omitempty"` + InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"` SlowChannelCount int `json:"slow_channel_count"` FailingChannelCount int `json:"failing_channel_count"` QualityWindowSampleCount int `json:"quality_window_sample_count"` @@ -778,6 +781,16 @@ func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { if snapshot.QualityWindowDropCount > 0 { snapshot.BackpressureActive = true } + snapshot.BulkPressureChannelCount = snapshot.TrafficClassCounts[FabricTrafficClassBulk] + snapshot.InteractiveOrControlCount = snapshot.TrafficClassCounts[FabricTrafficClassControl] + snapshot.TrafficClassCounts[FabricTrafficClassInteractive] + bulkPressureThreshold := s.adaptivePolicy.BulkPressureChannelThreshold + if bulkPressureThreshold <= 0 { + bulkPressureThreshold = defaultFabricServiceChannelAdaptivePolicy().BulkPressureChannelThreshold + } + if snapshot.BulkPressureChannelCount >= bulkPressureThreshold && snapshot.InteractiveOrControlCount > 0 { + snapshot.BulkPressureActive = true + snapshot.BackpressureActive = true + } for _, trafficClass := range []string{FabricTrafficClassControl, FabricTrafficClassInteractive, FabricTrafficClassReliable, FabricTrafficClassBulk, FabricTrafficClassDroppable} { snapshot.RecommendedParallelWindows[trafficClass] = s.recommendedParallelSendWindowForTrafficClassLocked(trafficClass, s.adaptivePolicy.MaxParallelWindow) } diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 17635d8..f4fec25 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -1879,6 +1879,9 @@ func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testi if snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 16 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 { t.Fatalf("traffic class counts = %+v", snapshot.TrafficClassCounts) } + if !snapshot.BulkPressureActive || snapshot.BulkPressureChannelCount != 16 || snapshot.InteractiveOrControlCount != 1 || !snapshot.BackpressureActive { + t.Fatalf("bulk pressure telemetry = %+v", snapshot) + } } func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index b5e8c81..dbc32c8 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -435,6 +435,9 @@ fresh sample to hide a saturated endpoint. Heartbeat VPN fabric reports now include a bounded `quic_capacity_pressure` summary sorted by busiest cached QUIC connection, making overload diagnosis visible without digging through the full carrier snapshot. +VPN fabric flow-scheduler snapshots now expose bulk pressure activation plus +bulk and interactive/control channel counts, making mixed browser/RDP load +diagnosis explicit when bulk windows are reduced to protect interactive traffic. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.