From a6ee9ba26f6152c3754d38ca8f6507350c418a5c Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:41:53 +0300 Subject: [PATCH] Expose VPN fabric smoke fanout --- .../cmd/mesh-live-smoke/main.go | 19 +++++++++++-------- .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go index 4b0263d..5bb811a 100644 --- a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go +++ b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go @@ -42,6 +42,7 @@ type smokeReport struct { FabricSessionRoundTrips int `json:"fabric_session_round_trips"` FabricVPNPacketAccepted bool `json:"fabric_vpn_packet_accepted"` FabricVPNPacketSharded bool `json:"fabric_vpn_packet_sharded"` + FabricVPNPacketFanout int `json:"fabric_vpn_packet_fanout"` FabricQUICAccepted bool `json:"fabric_quic_accepted"` FabricQUICEndpoint string `json:"fabric_quic_endpoint"` FabricSessionLatencyMS int64 `json:"fabric_session_latency_ms"` @@ -140,7 +141,7 @@ func run(ctx context.Context) (smokeReport, error) { string(firstFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session" && secondFabricSessionResponse.Type == fabricproto.FramePong && string(secondFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session-2" - fabricVPNPacketAccepted, fabricVPNPacketSharded, err := smokeFabricVPNPacketOverSession(ctx, fabricSession) + fabricVPNPacketAccepted, fabricVPNPacketSharded, fabricVPNPacketFanout, err := smokeFabricVPNPacketOverSession(ctx, fabricSession) if err != nil { return smokeReport{}, fmt.Errorf("fabric vpn packet session smoke: %w", err) } @@ -163,6 +164,7 @@ func run(ctx context.Context) (smokeReport, error) { FabricSessionRoundTrips: 2, FabricVPNPacketAccepted: fabricVPNPacketAccepted, FabricVPNPacketSharded: fabricVPNPacketSharded, + FabricVPNPacketFanout: fabricVPNPacketFanout, FabricQUICAccepted: fabricQUICAccepted, FabricQUICEndpoint: fabricQUICEndpoint, FabricSessionLatencyMS: fabricSessionLatency.Milliseconds(), @@ -243,7 +245,7 @@ func smokeQUICTLSConfig() *tls.Config { } } -func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.FabricSessionClient) (bool, bool, error) { +func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.FabricSessionClient) (bool, bool, int, error) { const interactiveStreamID uint64 = 4400 const bulkStreamID uint64 = 4401 pump := fabricSession.StartPump(ctx, mesh.FabricSessionPumpOptions{ @@ -257,7 +259,7 @@ func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.Fa {Type: fabricproto.FrameOpenStream, StreamID: bulkStreamID, TrafficClass: fabricproto.TrafficClassBulk}, } { if err := pump.Send(ctx, frame); err != nil { - return false, false, err + return false, false, 0, err } } transport := &vpnruntime.FabricSessionPacketTransport{ @@ -273,7 +275,7 @@ func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.Fa bulkPacket := smokeIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443, 0) controlPacket := smokeIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389, 0x02) if err := transport.SendGatewayPacketBatch(ctx, [][]byte{bulkPacket, controlPacket}); err != nil { - return false, false, err + return false, false, 0, err } timer := time.NewTimer(3 * time.Second) defer timer.Stop() @@ -291,15 +293,16 @@ func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.Fa snapshot["sharding_active"] == true && snapshot["send_class_count"] == 2 && snapshot["send_stream_count"] == 2 - return true, sharded, nil + fanout, _ := snapshot["last_batch_frame_count"].(uint64) + return true, sharded, int(fanout), nil } } case err := <-pump.Errors(): - return false, false, err + return false, false, 0, err case <-timer.C: - return false, false, fmt.Errorf("timed out waiting for fabric vpn packet ack") + return false, false, 0, fmt.Errorf("timed out waiting for fabric vpn packet ack") case <-ctx.Done(): - return false, false, ctx.Err() + return false, false, 0, ctx.Err() } } } diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 142ea53..f24d7fd 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -404,6 +404,8 @@ before they are framed, so one gateway batch containing many browser flows does not collapse onto the first packet's logical stream. `mesh-live-smoke` now sends mixed bulk and interactive VPN packets in a single fabric-session batch and requires them to remain sharded. +The smoke report also exposes the mixed-batch frame fanout so regressions show +up as a concrete fanout drop, not just a failed boolean. Fabric-session packet transport snapshots now report packets per stream plus last/max batch fanout, making real multi-site load distribution measurable from gateway status.