From a614029d4aeb9b9e07a7740428a38654208c86fd Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:18:52 +0300 Subject: [PATCH] Verify sharded VPN fabric session smoke --- .../cmd/mesh-live-smoke/main.go | 74 ++++++++++++++----- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go index 551314c..63736e3 100644 --- a/agents/rap-node-agent/cmd/mesh-live-smoke/main.go +++ b/agents/rap-node-agent/cmd/mesh-live-smoke/main.go @@ -41,6 +41,7 @@ type smokeReport struct { FabricSessionAccepted bool `json:"fabric_session_accepted"` FabricSessionRoundTrips int `json:"fabric_session_round_trips"` FabricVPNPacketAccepted bool `json:"fabric_vpn_packet_accepted"` + FabricVPNPacketSharded bool `json:"fabric_vpn_packet_sharded"` FabricQUICAccepted bool `json:"fabric_quic_accepted"` FabricQUICEndpoint string `json:"fabric_quic_endpoint"` FabricSessionLatencyMS int64 `json:"fabric_session_latency_ms"` @@ -139,7 +140,7 @@ func run(ctx context.Context) (smokeReport, error) { string(firstFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session" && secondFabricSessionResponse.Type == fabricproto.FramePong && string(secondFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session-2" - fabricVPNPacketAccepted, err := smokeFabricVPNPacketOverSession(ctx, fabricSession) + fabricVPNPacketAccepted, fabricVPNPacketSharded, err := smokeFabricVPNPacketOverSession(ctx, fabricSession) if err != nil { return smokeReport{}, fmt.Errorf("fabric vpn packet session smoke: %w", err) } @@ -161,6 +162,7 @@ func run(ctx context.Context) (smokeReport, error) { FabricSessionAccepted: fabricSessionAccepted, FabricSessionRoundTrips: 2, FabricVPNPacketAccepted: fabricVPNPacketAccepted, + FabricVPNPacketSharded: fabricVPNPacketSharded, FabricQUICAccepted: fabricQUICAccepted, FabricQUICEndpoint: fabricQUICEndpoint, FabricSessionLatencyMS: fabricSessionLatency.Milliseconds(), @@ -241,49 +243,85 @@ func smokeQUICTLSConfig() *tls.Config { } } -func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.FabricSessionClient) (bool, error) { - const streamID uint64 = 4400 +func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.FabricSessionClient) (bool, bool, error) { + const interactiveStreamID uint64 = 4400 + const bulkStreamID uint64 = 4401 pump := fabricSession.StartPump(ctx, mesh.FabricSessionPumpOptions{ OutboundBuffer: 4, InboundBuffer: 4, ErrorBuffer: 4, }) defer pump.Close() - if err := pump.Send(ctx, fabricproto.Frame{ - Type: fabricproto.FrameOpenStream, - StreamID: streamID, - TrafficClass: fabricproto.TrafficClassInteractive, - }); err != nil { - return false, err + for _, frame := range []fabricproto.Frame{ + {Type: fabricproto.FrameOpenStream, StreamID: interactiveStreamID, TrafficClass: fabricproto.TrafficClassInteractive}, + {Type: fabricproto.FrameOpenStream, StreamID: bulkStreamID, TrafficClass: fabricproto.TrafficClassBulk}, + } { + if err := pump.Send(ctx, frame); err != nil { + return false, false, err + } } transport := &vpnruntime.FabricSessionPacketTransport{ Sender: pump, - StreamID: streamID, + StreamID: interactiveStreamID, VPNConnectionID: "vpn-smoke", SendDirection: vpnruntime.FabricDirectionGatewayToClient, - TrafficClass: vpnruntime.FabricTrafficClassInteractive, + StreamIDsByTrafficClass: map[string][]uint64{ + vpnruntime.FabricTrafficClassInteractive: []uint64{interactiveStreamID}, + vpnruntime.FabricTrafficClassBulk: []uint64{bulkStreamID}, + }, } - if err := transport.SendGatewayPacketBatch(ctx, [][]byte{[]byte("fabric-vpn-packet-smoke")}); err != nil { - return false, err + bulkPacket := smokeIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443, 0) + controlPacket := smokeIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389, 0x02) + if err := transport.SendGatewayPacketBatch(ctx, [][]byte{bulkPacket}); err != nil { + return false, false, err + } + if err := transport.SendGatewayPacketBatch(ctx, [][]byte{controlPacket}); err != nil { + return false, false, err } timer := time.NewTimer(3 * time.Second) defer timer.Stop() + acked := map[uint64]bool{} for { select { case frame := <-pump.Frames(): - if frame.Type == fabricproto.FrameAck && frame.StreamID == streamID && frame.Sequence == 1 { - return true, nil + if frame.Type == fabricproto.FrameAck && frame.Sequence == 1 { + acked[frame.StreamID] = true + if acked[interactiveStreamID] && acked[bulkStreamID] { + snapshot := transport.Snapshot() + framesByClass, _ := snapshot["send_frames_by_class"].(map[string]uint64) + sharded := framesByClass[vpnruntime.FabricTrafficClassInteractive] == 1 && + framesByClass[vpnruntime.FabricTrafficClassBulk] == 1 + return true, sharded, nil + } } case err := <-pump.Errors(): - return false, err + return false, false, err case <-timer.C: - return false, fmt.Errorf("timed out waiting for fabric vpn packet ack") + return false, false, fmt.Errorf("timed out waiting for fabric vpn packet ack") case <-ctx.Done(): - return false, ctx.Err() + return false, false, ctx.Err() } } } +func smokeIPv4TCPPacket(src [4]byte, dst [4]byte, srcPort uint16, dstPort uint16, flags byte) []byte { + packet := make([]byte, 40) + packet[0] = 0x45 + packet[2] = 0 + packet[3] = 40 + packet[8] = 64 + packet[9] = 6 + copy(packet[12:16], src[:]) + copy(packet[16:20], dst[:]) + packet[20] = byte(srcPort >> 8) + packet[21] = byte(srcPort) + packet[22] = byte(dstPort >> 8) + packet[23] = byte(dstPort) + packet[32] = 0x50 + packet[33] = flags + return packet +} + func writeSmokeScopedConfig(local mesh.PeerIdentity, peers map[string]string, routes []mesh.SyntheticRoute) (string, error) { path := filepath.Join(os.TempDir(), "rap-c17e-node-a-scoped-mesh.json") payload, err := json.Marshal(mesh.ScopedSyntheticConfig{