diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go index 8cebc85..47856c3 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -3,6 +3,7 @@ package vpnruntime import ( "context" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -34,9 +35,13 @@ type FabricSessionPacketTransport struct { StreamIDsByTrafficClass map[string][]uint64 StreamIDs []uint64 - sequence uint64 - sequenceMu sync.Mutex - sequenceByStream map[uint64]uint64 + sequence uint64 + sequenceMu sync.Mutex + sequenceByStream map[uint64]uint64 + statsMu sync.Mutex + sendFramesByClass map[string]uint64 + sendPacketsByClass map[string]uint64 + sendFramesByStream map[uint64]uint64 } func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { @@ -66,7 +71,11 @@ func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Contex if err != nil { return err } - return t.Sender.Send(ctx, frame) + if err := t.Sender.Send(ctx, frame); err != nil { + return err + } + t.recordSend(streamID, trafficClass, len(packets)) + return nil } func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { @@ -239,6 +248,49 @@ func (t *FabricSessionPacketTransport) nextSequence(streamID uint64) uint64 { return t.sequenceByStream[streamID] } +func (t *FabricSessionPacketTransport) recordSend(streamID uint64, trafficClass string, packetCount int) { + if t == nil { + return + } + trafficClass = normalizeFabricTrafficClass(trafficClass) + t.statsMu.Lock() + defer t.statsMu.Unlock() + if t.sendFramesByClass == nil { + t.sendFramesByClass = map[string]uint64{} + } + if t.sendPacketsByClass == nil { + t.sendPacketsByClass = map[string]uint64{} + } + if t.sendFramesByStream == nil { + t.sendFramesByStream = map[uint64]uint64{} + } + t.sendFramesByClass[trafficClass]++ + t.sendPacketsByClass[trafficClass] += uint64(packetCount) + t.sendFramesByStream[streamID]++ +} + +func (t *FabricSessionPacketTransport) Snapshot() map[string]any { + if t == nil { + return nil + } + t.statsMu.Lock() + sendFramesByClass := copyStringUint64Map(t.sendFramesByClass) + sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass) + sendFramesByStream := make(map[string]uint64, len(t.sendFramesByStream)) + for streamID, count := range t.sendFramesByStream { + sendFramesByStream[fmt.Sprintf("%d", streamID)] = count + } + t.statsMu.Unlock() + return map[string]any{ + "schema_version": "rap.vpn_fabric_session_packet_transport.v1", + "stream_id": t.StreamID, + "stream_ids_by_class": copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass), + "send_frames_by_class": sendFramesByClass, + "send_packets_by_class": sendPacketsByClass, + "send_frames_by_stream_id": sendFramesByStream, + } +} + func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) string { if fallback = normalizeFabricTrafficClass(fallback); fallback != "" && fallback != FabricTrafficClassBulk { return fallback @@ -248,3 +300,25 @@ func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) stri } return FabricTrafficClassBulk } + +func copyStringUint64Map(values map[string]uint64) map[string]uint64 { + if len(values) == 0 { + return map[string]uint64{} + } + out := make(map[string]uint64, len(values)) + for key, value := range values { + out[key] = value + } + return out +} + +func copyStreamIDsByTrafficClass(values map[string][]uint64) map[string][]uint64 { + if len(values) == 0 { + return map[string][]uint64{} + } + out := make(map[string][]uint64, len(values)) + for key, ids := range values { + out[key] = append([]uint64(nil), ids...) + } + return out +} diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index a185444..d0bc640 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -261,6 +261,14 @@ func TestFabricSessionPacketTransportShardsStreamsByTrafficClass(t *testing.T) { if sender.frames[0].Sequence != 1 || sender.frames[1].Sequence != 1 { t.Fatalf("per-stream sequences = %d/%d, want 1/1", sender.frames[0].Sequence, sender.frames[1].Sequence) } + snapshot := transport.Snapshot() + if snapshot["schema_version"] != "rap.vpn_fabric_session_packet_transport.v1" { + t.Fatalf("snapshot schema missing: %+v", snapshot) + } + framesByClass := snapshot["send_frames_by_class"].(map[string]uint64) + if framesByClass[FabricTrafficClassBulk] != 1 || framesByClass[FabricTrafficClassInteractive] != 1 { + t.Fatalf("send frames by class = %+v", framesByClass) + } } func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) { diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway.go b/agents/rap-node-agent/internal/vpnruntime/gateway.go index 5d7c0ff..2bb3c20 100644 --- a/agents/rap-node-agent/internal/vpnruntime/gateway.go +++ b/agents/rap-node-agent/internal/vpnruntime/gateway.go @@ -65,6 +65,10 @@ type PacketTransport interface { ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) } +type packetTransportSnapshotter interface { + Snapshot() map[string]any +} + type BackendPacketTransport struct { API *client.Client ClusterID string @@ -188,6 +192,11 @@ func (g *Gateway) Snapshot() map[string]any { if platform := gatewayPlatformSnapshot(g.InterfaceName, g.RouteCIDR); len(platform) > 0 { out["platform"] = platform } + if snapshotter, ok := g.Transport.(packetTransportSnapshotter); ok { + if snapshot := snapshotter.Snapshot(); len(snapshot) > 0 { + out["transport_snapshot"] = snapshot + } + } return out } diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 9679fe4..27b1590 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -383,6 +383,8 @@ declaring that carrier failed. VPN fabric-session transport now opens configurable per-class stream shards for interactive and bulk packet traffic, so heavy browser flows do not share a single logical stream with latency-sensitive RDP/control packets. +Gateway runtime snapshots include the fabric-session packet transport stream +layout and send counters by traffic class/stream id for load-test diagnosis. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.