diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index dc78edf..8f7d0c7 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -903,6 +903,46 @@ func vpnFabricQUICPressureReport(snapshot mesh.QUICFabricTransportSnapshot, maxE return entries[:maxEntries] } +func vpnFabricFlowPressureReport(snapshot vpnruntime.FabricFlowSchedulerSnapshot) map[string]any { + report := map[string]any{ + "schema_version": "rap.vpn_fabric_flow_pressure.v1", + "pressure_level": snapshot.PressureLevel, + "pressure_score": snapshot.PressureScore, + "pressure_reasons": append([]string{}, snapshot.PressureReasons...), + "backpressure_active": snapshot.BackpressureActive, + "bulk_pressure_active": snapshot.BulkPressureActive, + "bulk_pressure_channel_count": snapshot.BulkPressureChannelCount, + "interactive_or_control_channels": snapshot.InteractiveOrControlCount, + "route_recovered_channel_count": snapshot.RouteRecoveredChannelCount, + "route_switch_count": snapshot.RouteSwitchCount, + "route_recovery_max_ms": snapshot.RouteRecoveryMaxMillis, + "route_recovery_avg_ms": snapshot.RouteRecoveryAvgMillis, + "route_switch_reason_counts": copyStringIntMap(snapshot.RouteSwitchReasonCounts), + "adaptive_backpressure_active": snapshot.AdaptiveBackpressureActive, + "adaptive_backpressure_reason": snapshot.AdaptiveBackpressureReason, + "quality_window_failure_count": snapshot.QualityWindowFailureCount, + "quality_window_slow_count": snapshot.QualityWindowSlowCount, + "quality_window_drop_count": snapshot.QualityWindowDropCount, + "recommended_parallel_windows": copyStringIntMap(snapshot.RecommendedParallelWindows), + "adaptive_policy_fingerprint": snapshot.AdaptivePolicyFingerprint, + } + if report["pressure_level"] == "" { + report["pressure_level"] = "nominal" + } + return report +} + +func copyStringIntMap(in map[string]int) map[string]int { + if len(in) == 0 { + return map[string]int{} + } + out := make(map[string]int, len(in)) + for key, value := range in { + out[key] = value + } + return out +} + func newFabricServiceChannelAccessStats() *fabricServiceChannelAccessStats { return &fabricServiceChannelAccessStats{} } @@ -3225,6 +3265,10 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn if meshState != nil && meshState.VPNFabricSessionDialStats != nil { report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) } + if meshState != nil && meshState.VPNFabricIngress != nil { + ingressSnapshot := meshState.VPNFabricIngress.Snapshot(identity.ClusterID) + report["flow_pressure"] = vpnFabricFlowPressureReport(ingressSnapshot.FlowScheduler) + } payload.Metadata["vpn_fabric_session_transport_report"] = report payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 6cdd6b6..6845d21 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -722,6 +722,11 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { transport.MaxStreamsPerConn = 24 return transport }(), + VPNFabricIngress: &vpnruntime.FabricClientPacketIngress{ + FlowScheduler: vpnruntime.NewFabricFlowScheduler(0, 0), + ClusterID: "cluster-1", + LocalNodeID: "node-a", + }, QUICFabricListenAddr: "127.0.0.1:19443", }, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)) @@ -767,6 +772,11 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { t.Fatalf("vpn fabric quic session report missing: %+v", report) } else if report["quic_capacity_pressure"] == nil { t.Fatalf("vpn fabric quic pressure report missing: %+v", report) + } else if pressure, ok := report["flow_pressure"].(map[string]any); !ok || + pressure["schema_version"] != "rap.vpn_fabric_flow_pressure.v1" || + pressure["pressure_level"] != "nominal" || + pressure["pressure_score"] != 0 { + t.Fatalf("vpn fabric flow pressure report missing: %+v", report) } if payload.Capabilities["vpn_fabric_session_stream_shards"] != true { t.Fatalf("vpn fabric stream shard capability missing: %+v", payload.Capabilities) diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 1c51a6a..02f0358 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -469,6 +469,9 @@ The same pressure classification includes a bounded 0-100 score for automated route, endpoint, and node comparisons. `mesh-live-smoke` reports the mixed-load scheduler pressure level, score, and reasons. +Heartbeat VPN fabric transport reports now include a compact +`flow_pressure` summary with level, score, reasons, bulk pressure, route +recovery timing, reason counts, and recommended per-class windows. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.