From db9ea53e6cb487f2ae0fe274326a4f0814c7d11e Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:40:31 +0300 Subject: [PATCH] Report VPN fabric receive distribution --- .../vpnruntime/fabric_session_transport.go | 143 +++++++++++++----- .../vpnruntime/fabric_transport_test.go | 6 + .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 3 files changed, 118 insertions(+), 34 deletions(-) diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go index a862b2c..04ef41a 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -39,21 +39,25 @@ type FabricSessionPacketTransport struct { StreamIDsByTrafficClass map[string][]uint64 StreamIDs []uint64 - sequence uint64 - sequenceMu sync.Mutex - sequenceByStream map[uint64]uint64 - statsMu sync.Mutex - sendFramesByClass map[string]uint64 - sendPacketsByClass map[string]uint64 - sendFramesByStream map[uint64]uint64 - sendPacketsByStream map[uint64]uint64 - splitBatchCount uint64 - lastBatchFrameCount uint64 - maxBatchFrameCount uint64 - closeStreamFrames uint64 - closeErrors uint64 - closeOnce sync.Once - closeErr error + sequence uint64 + sequenceMu sync.Mutex + sequenceByStream map[uint64]uint64 + statsMu sync.Mutex + sendFramesByClass map[string]uint64 + sendPacketsByClass map[string]uint64 + sendFramesByStream map[uint64]uint64 + sendPacketsByStream map[uint64]uint64 + splitBatchCount uint64 + lastBatchFrameCount uint64 + maxBatchFrameCount uint64 + receiveFramesByClass map[string]uint64 + receivePacketsByClass map[string]uint64 + receiveFramesByStream map[uint64]uint64 + receivePacketsByStream map[uint64]uint64 + closeStreamFrames uint64 + closeErrors uint64 + closeOnce sync.Once + closeErr error } func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { @@ -140,9 +144,10 @@ func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Con return nil, err } if payload.VPNConnectionID == t.VPNConnectionID && payload.Direction == direction { + t.recordReceive(frame.StreamID, fabricSessionTrafficClassName(frame.TrafficClass), len(payload.Packets)) return cleanPacketBatch(payload.Packets), nil } - if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil { + if err := t.deliverDecodedFabricSessionFrame(frame, payload); err != nil { return nil, err } } @@ -174,13 +179,29 @@ func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) erro if frame.Type != fabricproto.FrameData || !t.acceptsStream(frame.StreamID) { continue } - if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil { + payload, err := DecodeFabricVPNPacketDataFrame(frame) + if err != nil { + return err + } + if err := t.deliverDecodedFabricSessionFrame(frame, payload); err != nil { return err } } } } +func (t *FabricSessionPacketTransport) deliverDecodedFabricSessionFrame(frame fabricproto.Frame, payload mesh.VPNPacketBatchPayload) error { + if t == nil || t.Inbox == nil { + return mesh.ErrForwardRuntimeUnavailable + } + payload.Packets = cleanPacketBatch(payload.Packets) + if len(payload.Packets) == 0 { + return nil + } + t.recordReceive(frame.StreamID, fabricSessionTrafficClassName(frame.TrafficClass), len(payload.Packets)) + return t.Inbox.enqueue(payload) +} + func (t *FabricSessionPacketTransport) Close() error { if t == nil { return nil @@ -391,6 +412,31 @@ func (t *FabricSessionPacketTransport) recordBatchFanout(frameCount int) { } } +func (t *FabricSessionPacketTransport) recordReceive(streamID uint64, trafficClass string, packetCount int) { + if t == nil { + return + } + trafficClass = normalizeFabricTrafficClass(trafficClass) + t.statsMu.Lock() + defer t.statsMu.Unlock() + if t.receiveFramesByClass == nil { + t.receiveFramesByClass = map[string]uint64{} + } + if t.receivePacketsByClass == nil { + t.receivePacketsByClass = map[string]uint64{} + } + if t.receiveFramesByStream == nil { + t.receiveFramesByStream = map[uint64]uint64{} + } + if t.receivePacketsByStream == nil { + t.receivePacketsByStream = map[uint64]uint64{} + } + t.receiveFramesByClass[trafficClass]++ + t.receivePacketsByClass[trafficClass] += uint64(packetCount) + t.receiveFramesByStream[streamID]++ + t.receivePacketsByStream[streamID] += uint64(packetCount) +} + func (t *FabricSessionPacketTransport) Snapshot() map[string]any { if t == nil { return nil @@ -398,6 +444,8 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any { t.statsMu.Lock() sendFramesByClass := copyStringUint64Map(t.sendFramesByClass) sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass) + receiveFramesByClass := copyStringUint64Map(t.receiveFramesByClass) + receivePacketsByClass := copyStringUint64Map(t.receivePacketsByClass) lastBatchFrameCount := t.lastBatchFrameCount maxBatchFrameCount := t.maxBatchFrameCount splitBatchCount := t.splitBatchCount @@ -411,26 +459,38 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any { for streamID, count := range t.sendPacketsByStream { sendPacketsByStream[fmt.Sprintf("%d", streamID)] = count } + receiveFramesByStream := make(map[string]uint64, len(t.receiveFramesByStream)) + for streamID, count := range t.receiveFramesByStream { + receiveFramesByStream[fmt.Sprintf("%d", streamID)] = count + } + receivePacketsByStream := make(map[string]uint64, len(t.receivePacketsByStream)) + for streamID, count := range t.receivePacketsByStream { + receivePacketsByStream[fmt.Sprintf("%d", streamID)] = count + } t.statsMu.Unlock() streamIDsByClass := copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass) return map[string]any{ - "schema_version": "rap.vpn_fabric_session_packet_transport.v1", - "stream_id": t.StreamID, - "stream_ids_by_class": streamIDsByClass, - "stream_class_count": len(streamIDsByClass), - "stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs), - "send_class_count": countNonZeroStringUint64Values(sendFramesByClass), - "send_stream_count": countNonZeroStringUint64Values(sendFramesByStream), - "sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1, - "split_batch_count": splitBatchCount, - "last_batch_frame_count": lastBatchFrameCount, - "max_batch_frame_count": maxBatchFrameCount, - "close_stream_frames": closeStreamFrames, - "close_errors": closeErrors, - "send_frames_by_class": sendFramesByClass, - "send_packets_by_class": sendPacketsByClass, - "send_frames_by_stream_id": sendFramesByStream, - "send_packets_by_stream_id": sendPacketsByStream, + "schema_version": "rap.vpn_fabric_session_packet_transport.v1", + "stream_id": t.StreamID, + "stream_ids_by_class": streamIDsByClass, + "stream_class_count": len(streamIDsByClass), + "stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs), + "send_class_count": countNonZeroStringUint64Values(sendFramesByClass), + "send_stream_count": countNonZeroStringUint64Values(sendFramesByStream), + "sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1, + "split_batch_count": splitBatchCount, + "last_batch_frame_count": lastBatchFrameCount, + "max_batch_frame_count": maxBatchFrameCount, + "close_stream_frames": closeStreamFrames, + "close_errors": closeErrors, + "send_frames_by_class": sendFramesByClass, + "send_packets_by_class": sendPacketsByClass, + "send_frames_by_stream_id": sendFramesByStream, + "send_packets_by_stream_id": sendPacketsByStream, + "receive_frames_by_class": receiveFramesByClass, + "receive_packets_by_class": receivePacketsByClass, + "receive_frames_by_stream_id": receiveFramesByStream, + "receive_packets_by_stream_id": receivePacketsByStream, } } @@ -462,6 +522,21 @@ func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) stri return FabricTrafficClassBulk } +func fabricSessionTrafficClassName(value fabricproto.TrafficClass) string { + switch value { + case fabricproto.TrafficClassControl: + return FabricTrafficClassControl + case fabricproto.TrafficClassInteractive: + return FabricTrafficClassInteractive + case fabricproto.TrafficClassReliable: + return FabricTrafficClassReliable + case fabricproto.TrafficClassDroppable: + return FabricTrafficClassDroppable + default: + return FabricTrafficClassBulk + } +} + func copyStringUint64Map(values map[string]uint64) map[string]uint64 { if len(values) == 0 { return map[string]uint64{} diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 2320d55..82399c2 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -432,6 +432,12 @@ func TestFabricSessionPacketTransportReceiveReadsPumpFrames(t *testing.T) { if len(packets) != 1 || string(packets[0]) != "request" { t.Fatalf("packets = %#v", packets) } + snapshot := transport.Snapshot() + framesByClass := snapshot["receive_frames_by_class"].(map[string]uint64) + packetsByStream := snapshot["receive_packets_by_stream_id"].(map[string]uint64) + if framesByClass[FabricTrafficClassBulk] != 1 || packetsByStream["711"] != 1 { + t.Fatalf("unexpected receive counters: %+v", snapshot) + } } func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 7108101..142ea53 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -407,6 +407,9 @@ fabric-session batch and requires them to remain sharded. Fabric-session packet transport snapshots now report packets per stream plus last/max batch fanout, making real multi-site load distribution measurable from gateway status. +Receive-side fabric-session packet counters are reported by traffic class and +stream id as well, so gateway status can compare TX and RX distribution under +browser/RDP load. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.