diff --git a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go index 6ed3319..411b400 100644 --- a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go +++ b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go @@ -43,6 +43,11 @@ type smokeReport struct { FabricVPNPacketAccepted bool `json:"fabric_vpn_packet_accepted"` FabricVPNPacketSharded bool `json:"fabric_vpn_packet_sharded"` FabricVPNPacketFanout int `json:"fabric_vpn_packet_fanout"` + FabricVPNBulkPressure bool `json:"fabric_vpn_bulk_pressure_active"` + FabricVPNBulkChannels int `json:"fabric_vpn_bulk_pressure_channels"` + FabricVPNInteractive int `json:"fabric_vpn_interactive_or_control_channels"` + FabricVPNBulkWindow int `json:"fabric_vpn_bulk_parallel_window"` + FabricVPNInteractiveWin int `json:"fabric_vpn_interactive_parallel_window"` FabricQUICAccepted bool `json:"fabric_quic_accepted"` FabricQUICEndpoint string `json:"fabric_quic_endpoint"` FabricQUICPressure int `json:"fabric_quic_capacity_pressure_percent"` @@ -146,6 +151,7 @@ func run(ctx context.Context) (smokeReport, error) { if err != nil { return smokeReport{}, fmt.Errorf("fabric vpn packet session smoke: %w", err) } + fabricVPNBulkPressure, fabricVPNBulkChannels, fabricVPNInteractiveChannels, fabricVPNBulkWindow, fabricVPNInteractiveWindow := smokeVPNFlowSchedulerBulkPressure() fabricQUICAccepted, fabricQUICEndpoint, fabricQUICPressure, err := smokeQUICFabricSession(ctx) if err != nil { return smokeReport{}, fmt.Errorf("fabric quic smoke: %w", err) @@ -166,6 +172,11 @@ func run(ctx context.Context) (smokeReport, error) { FabricVPNPacketAccepted: fabricVPNPacketAccepted, FabricVPNPacketSharded: fabricVPNPacketSharded, FabricVPNPacketFanout: fabricVPNPacketFanout, + FabricVPNBulkPressure: fabricVPNBulkPressure, + FabricVPNBulkChannels: fabricVPNBulkChannels, + FabricVPNInteractive: fabricVPNInteractiveChannels, + FabricVPNBulkWindow: fabricVPNBulkWindow, + FabricVPNInteractiveWin: fabricVPNInteractiveWindow, FabricQUICAccepted: fabricQUICAccepted, FabricQUICEndpoint: fabricQUICEndpoint, FabricQUICPressure: fabricQUICPressure, @@ -179,6 +190,30 @@ func run(ctx context.Context) (smokeReport, error) { }, nil } +func smokeVPNFlowSchedulerBulkPressure() (bool, int, int, int, int) { + scheduler := vpnruntime.NewFabricFlowScheduler(32, 16) + bulkPacket := []byte("bulk") + interactivePacket := []byte("interactive-rdp-like") + for i := 0; i < 16; i++ { + scheduler.ScheduleClientPacketsForConnectionClass( + fmt.Sprintf("vpn-bulk-%02d", i), + vpnruntime.FabricTrafficClassBulk, + [][]byte{bulkPacket}, + ) + } + scheduler.ScheduleClientPacketsForConnectionClass( + "vpn-interactive", + vpnruntime.FabricTrafficClassInteractive, + [][]byte{interactivePacket}, + ) + snapshot := scheduler.Snapshot() + return snapshot.BulkPressureActive, + snapshot.BulkPressureChannelCount, + snapshot.InteractiveOrControlCount, + snapshot.RecommendedParallelWindows[vpnruntime.FabricTrafficClassBulk], + snapshot.RecommendedParallelWindows[vpnruntime.FabricTrafficClassInteractive] +} + func smokeQUICFabricSession(ctx context.Context) (bool, string, int, error) { server, err := mesh.StartQUICFabricServer(ctx, mesh.QUICFabricServerConfig{ ListenAddr: "127.0.0.1:0", diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index dbc32c8..7c7ae4a 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -438,6 +438,8 @@ visible without digging through the full carrier snapshot. VPN fabric flow-scheduler snapshots now expose bulk pressure activation plus bulk and interactive/control channel counts, making mixed browser/RDP load diagnosis explicit when bulk windows are reduced to protect interactive traffic. +`mesh-live-smoke` now exercises that mixed-load scheduler path and reports bulk +pressure activation plus bulk/interactive window recommendations. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.