Report VPN fabric batch fanout

This commit is contained in:
2026-05-16 12:37:06 +03:00
parent 09fc6ac659
commit 611de5471c
3 changed files with 71 additions and 25 deletions
@@ -39,17 +39,21 @@ type FabricSessionPacketTransport struct {
StreamIDsByTrafficClass map[string][]uint64
StreamIDs []uint64
sequence uint64
sequenceMu sync.Mutex
sequenceByStream map[uint64]uint64
statsMu sync.Mutex
sendFramesByClass map[string]uint64
sendPacketsByClass map[string]uint64
sendFramesByStream map[uint64]uint64
closeStreamFrames uint64
closeErrors uint64
closeOnce sync.Once
closeErr error
sequence uint64
sequenceMu sync.Mutex
sequenceByStream map[uint64]uint64
statsMu sync.Mutex
sendFramesByClass map[string]uint64
sendPacketsByClass map[string]uint64
sendFramesByStream map[uint64]uint64
sendPacketsByStream map[uint64]uint64
splitBatchCount uint64
lastBatchFrameCount uint64
maxBatchFrameCount uint64
closeStreamFrames uint64
closeErrors uint64
closeOnce sync.Once
closeErr error
}
func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error {
@@ -85,6 +89,7 @@ func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Contex
}
t.recordSend(group.StreamID, group.TrafficClass, len(group.Packets))
}
t.recordBatchFanout(len(groups))
return nil
}
@@ -362,9 +367,28 @@ func (t *FabricSessionPacketTransport) recordSend(streamID uint64, trafficClass
if t.sendFramesByStream == nil {
t.sendFramesByStream = map[uint64]uint64{}
}
if t.sendPacketsByStream == nil {
t.sendPacketsByStream = map[uint64]uint64{}
}
t.sendFramesByClass[trafficClass]++
t.sendPacketsByClass[trafficClass] += uint64(packetCount)
t.sendFramesByStream[streamID]++
t.sendPacketsByStream[streamID] += uint64(packetCount)
}
func (t *FabricSessionPacketTransport) recordBatchFanout(frameCount int) {
if t == nil || frameCount <= 0 {
return
}
t.statsMu.Lock()
defer t.statsMu.Unlock()
t.lastBatchFrameCount = uint64(frameCount)
if frameCount > 1 {
t.splitBatchCount++
}
if uint64(frameCount) > t.maxBatchFrameCount {
t.maxBatchFrameCount = uint64(frameCount)
}
}
func (t *FabricSessionPacketTransport) Snapshot() map[string]any {
@@ -374,28 +398,39 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any {
t.statsMu.Lock()
sendFramesByClass := copyStringUint64Map(t.sendFramesByClass)
sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass)
lastBatchFrameCount := t.lastBatchFrameCount
maxBatchFrameCount := t.maxBatchFrameCount
splitBatchCount := t.splitBatchCount
closeStreamFrames := t.closeStreamFrames
closeErrors := t.closeErrors
sendFramesByStream := make(map[string]uint64, len(t.sendFramesByStream))
for streamID, count := range t.sendFramesByStream {
sendFramesByStream[fmt.Sprintf("%d", streamID)] = count
}
sendPacketsByStream := make(map[string]uint64, len(t.sendPacketsByStream))
for streamID, count := range t.sendPacketsByStream {
sendPacketsByStream[fmt.Sprintf("%d", streamID)] = count
}
t.statsMu.Unlock()
streamIDsByClass := copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass)
return map[string]any{
"schema_version": "rap.vpn_fabric_session_packet_transport.v1",
"stream_id": t.StreamID,
"stream_ids_by_class": streamIDsByClass,
"stream_class_count": len(streamIDsByClass),
"stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs),
"send_class_count": countNonZeroStringUint64Values(sendFramesByClass),
"send_stream_count": countNonZeroStringUint64Values(sendFramesByStream),
"sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1,
"close_stream_frames": closeStreamFrames,
"close_errors": closeErrors,
"send_frames_by_class": sendFramesByClass,
"send_packets_by_class": sendPacketsByClass,
"send_frames_by_stream_id": sendFramesByStream,
"schema_version": "rap.vpn_fabric_session_packet_transport.v1",
"stream_id": t.StreamID,
"stream_ids_by_class": streamIDsByClass,
"stream_class_count": len(streamIDsByClass),
"stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs),
"send_class_count": countNonZeroStringUint64Values(sendFramesByClass),
"send_stream_count": countNonZeroStringUint64Values(sendFramesByStream),
"sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1,
"split_batch_count": splitBatchCount,
"last_batch_frame_count": lastBatchFrameCount,
"max_batch_frame_count": maxBatchFrameCount,
"close_stream_frames": closeStreamFrames,
"close_errors": closeErrors,
"send_frames_by_class": sendFramesByClass,
"send_packets_by_class": sendPacketsByClass,
"send_frames_by_stream_id": sendFramesByStream,
"send_packets_by_stream_id": sendPacketsByStream,
}
}
@@ -316,9 +316,17 @@ func TestFabricSessionPacketTransportSplitsMixedBatchByStream(t *testing.T) {
t.Fatalf("unexpected stream/class split: %+v", sender.frames)
}
snapshot := transport.Snapshot()
if snapshot["send_stream_count"] != 3 || snapshot["send_class_count"] != 2 {
if snapshot["send_stream_count"] != 3 ||
snapshot["send_class_count"] != 2 ||
snapshot["split_batch_count"] != uint64(1) ||
snapshot["last_batch_frame_count"] != uint64(3) ||
snapshot["max_batch_frame_count"] != uint64(3) {
t.Fatalf("unexpected mixed-batch shard summary: %+v", snapshot)
}
packetsByStream := snapshot["send_packets_by_stream_id"].(map[string]uint64)
if packetsByStream["801"] != 1 || packetsByStream["901"] != 1 || packetsByStream["902"] != 1 {
t.Fatalf("unexpected packets by stream: %+v", packetsByStream)
}
}
func TestFabricSessionPacketTransportClosesAllStreamShards(t *testing.T) {
@@ -404,6 +404,9 @@ before they are framed, so one gateway batch containing many browser flows does
not collapse onto the first packet's logical stream.
`mesh-live-smoke` now sends mixed bulk and interactive VPN packets in a single
fabric-session batch and requires them to remain sharded.
Fabric-session packet transport snapshots now report packets per stream plus
last/max batch fanout, making real multi-site load distribution measurable from
gateway status.
Endpoint ranking treats `capacity_limited` observations as a soft pressure
penalty instead of a hard recent failure, enabling load spreading without
marking the carrier unhealthy.