From a5b91113bf327b27b3b1b39a7f666f8a28624333 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:24:44 +0300 Subject: [PATCH] Summarize VPN fabric stream sharding health --- .../cmd/mesh-live-smoke/main.go | 5 +++- .../vpnruntime/fabric_session_transport.go | 26 ++++++++++++++++++- .../vpnruntime/fabric_transport_test.go | 7 +++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 +++ 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go index 63736e3..ebbae05 100644 --- a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go +++ b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go @@ -290,7 +290,10 @@ func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.Fa snapshot := transport.Snapshot() framesByClass, _ := snapshot["send_frames_by_class"].(map[string]uint64) sharded := framesByClass[vpnruntime.FabricTrafficClassInteractive] == 1 && - framesByClass[vpnruntime.FabricTrafficClassBulk] == 1 + framesByClass[vpnruntime.FabricTrafficClassBulk] == 1 && + snapshot["sharding_active"] == true && + snapshot["send_class_count"] == 2 && + snapshot["send_stream_count"] == 2 return true, sharded, nil } } diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go index 47856c3..2ff995a 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -281,10 +281,16 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any { sendFramesByStream[fmt.Sprintf("%d", streamID)] = count } t.statsMu.Unlock() + streamIDsByClass := copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass) return map[string]any{ "schema_version": "rap.vpn_fabric_session_packet_transport.v1", "stream_id": t.StreamID, - "stream_ids_by_class": copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass), + "stream_ids_by_class": streamIDsByClass, + "stream_class_count": len(streamIDsByClass), + "stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs), + "send_class_count": countNonZeroStringUint64Values(sendFramesByClass), + "send_stream_count": countNonZeroStringUint64Values(sendFramesByStream), + "sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1, "send_frames_by_class": sendFramesByClass, "send_packets_by_class": sendPacketsByClass, "send_frames_by_stream_id": sendFramesByStream, @@ -322,3 +328,21 @@ func copyStreamIDsByTrafficClass(values map[string][]uint64) map[string][]uint64 } return out } + +func countStreamIDs(values map[string][]uint64) int { + total := 0 + for _, ids := range values { + total += len(ids) + } + return total +} + +func countNonZeroStringUint64Values(values map[string]uint64) int { + total := 0 + for _, value := range values { + if value > 0 { + total++ + } + } + return total +} diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index d0bc640..9907368 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -265,6 +265,13 @@ func TestFabricSessionPacketTransportShardsStreamsByTrafficClass(t *testing.T) { if snapshot["schema_version"] != "rap.vpn_fabric_session_packet_transport.v1" { t.Fatalf("snapshot schema missing: %+v", snapshot) } + if snapshot["stream_class_count"] != 2 || + snapshot["stream_shard_count"] != 4 || + snapshot["send_class_count"] != 2 || + snapshot["send_stream_count"] != 2 || + snapshot["sharding_active"] != true { + t.Fatalf("unexpected shard summary: %+v", snapshot) + } framesByClass := snapshot["send_frames_by_class"].(map[string]uint64) if framesByClass[FabricTrafficClassBulk] != 1 || framesByClass[FabricTrafficClassInteractive] != 1 { t.Fatalf("send frames by class = %+v", framesByClass) diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index c1d6e69..8063669 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -388,6 +388,9 @@ VPN fabric-session/QUIC tuning flags as install profiles, keeping manual and profile-based rollout paths aligned. Gateway runtime snapshots include the fabric-session packet transport stream layout and send counters by traffic class/stream id for load-test diagnosis. +Those snapshots also summarize configured stream class/shard counts and active +send class/stream counts, making sharding health visible without expanding +per-stream maps. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.