From 611de5471cc9b73e6f3765d4a6b3b51c4195f279 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:37:06 +0300 Subject: [PATCH] Report VPN fabric batch fanout --- .../vpnruntime/fabric_session_transport.go | 83 +++++++++++++------ .../vpnruntime/fabric_transport_test.go | 10 ++- .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 3 files changed, 71 insertions(+), 25 deletions(-) diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go index b1eda41..a862b2c 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -39,17 +39,21 @@ type FabricSessionPacketTransport struct { StreamIDsByTrafficClass map[string][]uint64 StreamIDs []uint64 - sequence uint64 - sequenceMu sync.Mutex - sequenceByStream map[uint64]uint64 - statsMu sync.Mutex - sendFramesByClass map[string]uint64 - sendPacketsByClass map[string]uint64 - sendFramesByStream map[uint64]uint64 - closeStreamFrames uint64 - closeErrors uint64 - closeOnce sync.Once - closeErr error + sequence uint64 + sequenceMu sync.Mutex + sequenceByStream map[uint64]uint64 + statsMu sync.Mutex + sendFramesByClass map[string]uint64 + sendPacketsByClass map[string]uint64 + sendFramesByStream map[uint64]uint64 + sendPacketsByStream map[uint64]uint64 + splitBatchCount uint64 + lastBatchFrameCount uint64 + maxBatchFrameCount uint64 + closeStreamFrames uint64 + closeErrors uint64 + closeOnce sync.Once + closeErr error } func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { @@ -85,6 +89,7 @@ func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Contex } t.recordSend(group.StreamID, group.TrafficClass, len(group.Packets)) } + t.recordBatchFanout(len(groups)) return nil } @@ -362,9 +367,28 @@ func (t *FabricSessionPacketTransport) recordSend(streamID uint64, trafficClass if t.sendFramesByStream == nil { t.sendFramesByStream = map[uint64]uint64{} } + if t.sendPacketsByStream == nil { + t.sendPacketsByStream = map[uint64]uint64{} + } t.sendFramesByClass[trafficClass]++ t.sendPacketsByClass[trafficClass] += uint64(packetCount) t.sendFramesByStream[streamID]++ + t.sendPacketsByStream[streamID] += uint64(packetCount) +} + +func (t *FabricSessionPacketTransport) recordBatchFanout(frameCount int) { + if t == nil || frameCount <= 0 { + return + } + t.statsMu.Lock() + defer t.statsMu.Unlock() + t.lastBatchFrameCount = uint64(frameCount) + if frameCount > 1 { + t.splitBatchCount++ + } + if uint64(frameCount) > t.maxBatchFrameCount { + t.maxBatchFrameCount = uint64(frameCount) + } } func (t *FabricSessionPacketTransport) Snapshot() map[string]any { @@ -374,28 +398,39 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any { t.statsMu.Lock() sendFramesByClass := copyStringUint64Map(t.sendFramesByClass) sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass) + lastBatchFrameCount := t.lastBatchFrameCount + maxBatchFrameCount := t.maxBatchFrameCount + splitBatchCount := t.splitBatchCount closeStreamFrames := t.closeStreamFrames closeErrors := t.closeErrors sendFramesByStream := make(map[string]uint64, len(t.sendFramesByStream)) for streamID, count := range t.sendFramesByStream { sendFramesByStream[fmt.Sprintf("%d", streamID)] = count } + sendPacketsByStream := make(map[string]uint64, len(t.sendPacketsByStream)) + for streamID, count := range t.sendPacketsByStream { + sendPacketsByStream[fmt.Sprintf("%d", streamID)] = count + } t.statsMu.Unlock() streamIDsByClass := copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass) return map[string]any{ - "schema_version": "rap.vpn_fabric_session_packet_transport.v1", - "stream_id": t.StreamID, - "stream_ids_by_class": streamIDsByClass, - "stream_class_count": len(streamIDsByClass), - "stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs), - "send_class_count": countNonZeroStringUint64Values(sendFramesByClass), - "send_stream_count": countNonZeroStringUint64Values(sendFramesByStream), - "sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1, - "close_stream_frames": closeStreamFrames, - "close_errors": closeErrors, - "send_frames_by_class": sendFramesByClass, - "send_packets_by_class": sendPacketsByClass, - "send_frames_by_stream_id": sendFramesByStream, + "schema_version": "rap.vpn_fabric_session_packet_transport.v1", + "stream_id": t.StreamID, + "stream_ids_by_class": streamIDsByClass, + "stream_class_count": len(streamIDsByClass), + "stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs), + "send_class_count": countNonZeroStringUint64Values(sendFramesByClass), + "send_stream_count": countNonZeroStringUint64Values(sendFramesByStream), + "sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1, + "split_batch_count": splitBatchCount, + "last_batch_frame_count": lastBatchFrameCount, + "max_batch_frame_count": maxBatchFrameCount, + "close_stream_frames": closeStreamFrames, + "close_errors": closeErrors, + "send_frames_by_class": sendFramesByClass, + "send_packets_by_class": sendPacketsByClass, + "send_frames_by_stream_id": sendFramesByStream, + "send_packets_by_stream_id": sendPacketsByStream, } } diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 55bf6e0..2320d55 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -316,9 +316,17 @@ func TestFabricSessionPacketTransportSplitsMixedBatchByStream(t *testing.T) { t.Fatalf("unexpected stream/class split: %+v", sender.frames) } snapshot := transport.Snapshot() - if snapshot["send_stream_count"] != 3 || snapshot["send_class_count"] != 2 { + if snapshot["send_stream_count"] != 3 || + snapshot["send_class_count"] != 2 || + snapshot["split_batch_count"] != uint64(1) || + snapshot["last_batch_frame_count"] != uint64(3) || + snapshot["max_batch_frame_count"] != uint64(3) { t.Fatalf("unexpected mixed-batch shard summary: %+v", snapshot) } + packetsByStream := snapshot["send_packets_by_stream_id"].(map[string]uint64) + if packetsByStream["801"] != 1 || packetsByStream["901"] != 1 || packetsByStream["902"] != 1 { + t.Fatalf("unexpected packets by stream: %+v", packetsByStream) + } } func TestFabricSessionPacketTransportClosesAllStreamShards(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 01d90ac..7108101 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -404,6 +404,9 @@ before they are framed, so one gateway batch containing many browser flows does not collapse onto the first packet's logical stream. `mesh-live-smoke` now sends mixed bulk and interactive VPN packets in a single fabric-session batch and requires them to remain sharded. +Fabric-session packet transport snapshots now report packets per stream plus +last/max batch fanout, making real multi-site load distribution measurable from +gateway status. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.