package main import ( "context" "crypto/rand" "crypto/rsa" "crypto/tls" "crypto/x509" "crypto/x509/pkix" "encoding/json" "fmt" "math/big" "net/http" "net/http/httptest" "os" "path/filepath" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/vpnruntime" ) type smokeNode struct { Local mesh.PeerIdentity Runtime *mesh.SyntheticRuntime URL string server *httptest.Server } type smokeReport struct { Stage string `json:"stage"` ProductionForwarding bool `json:"production_forwarding"` ScopedConfigLoaded bool `json:"scoped_config_loaded"` DirectProbeAccepted bool `json:"direct_probe_accepted"` DirectPath []string `json:"direct_path"` RelayProbeAccepted bool `json:"relay_probe_accepted"` RelayPath []string `json:"relay_path"` TestServiceAccepted bool `json:"test_service_accepted"` TestServiceEchoPayload string `json:"test_service_echo_payload"` FabricSessionAccepted bool `json:"fabric_session_accepted"` FabricSessionRoundTrips int `json:"fabric_session_round_trips"` FabricVPNPacketAccepted bool `json:"fabric_vpn_packet_accepted"` FabricVPNPacketSharded bool `json:"fabric_vpn_packet_sharded"` FabricVPNPacketFanout int `json:"fabric_vpn_packet_fanout"` FabricQUICAccepted bool `json:"fabric_quic_accepted"` FabricQUICEndpoint string `json:"fabric_quic_endpoint"` FabricSessionLatencyMS int64 `json:"fabric_session_latency_ms"` FabricSessionEndpoint string `json:"fabric_session_endpoint"` PeerEndpoints map[string]any `json:"peer_endpoints"` } func main() { report, err := run(context.Background()) if err != nil { fmt.Fprintf(os.Stderr, "mesh live smoke failed: %v\n", err) os.Exit(1) } payload, err := json.MarshalIndent(report, "", " ") if err != nil { fmt.Fprintf(os.Stderr, "marshal report: %v\n", err) os.Exit(1) } fmt.Println(string(payload)) } func run(ctx context.Context) (smokeReport, error) { nodeA := newSmokeNode(mesh.PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}) defer nodeA.Close() nodeR := newSmokeNode(mesh.PeerIdentity{ClusterID: "cluster-1", NodeID: "node-r"}) defer nodeR.Close() nodeB := newSmokeNode(mesh.PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"}) defer nodeB.Close() directRoute := smokeRoute("route-direct", []string{"node-a", "node-b"}) relayRoute := smokeRoute("route-relay", []string{"node-a", "node-r", "node-b"}) routes := []mesh.SyntheticRoute{directRoute, relayRoute} nodeAConfigPath, err := writeSmokeScopedConfig(nodeA.Local, map[string]string{ "node-r": nodeR.URL, "node-b": nodeB.URL, }, routes) if err != nil { return smokeReport{}, err } nodeAConfig, err := mesh.LoadScopedSyntheticConfig(nodeAConfigPath, nodeA.Local) if err != nil { return smokeReport{}, fmt.Errorf("load node-a scoped config: %w", err) } nodeA.Runtime = smokeRuntime(nodeA.Local, nodeAConfig.Routes, nodeAConfig.PeerEndpoints) nodeR.Runtime = smokeRuntime(nodeR.Local, routes, map[string]string{ "node-b": nodeB.URL, }) nodeB.Runtime = smokeRuntime(nodeB.Local, routes, map[string]string{}) directAck, err := nodeA.Runtime.SendProbe(ctx, directRoute.RouteID, mesh.SyntheticChannelFabricControl, "smoke-direct") if err != nil { return smokeReport{}, fmt.Errorf("direct probe: %w", err) } relayAck, err := nodeA.Runtime.SendProbe(ctx, relayRoute.RouteID, mesh.SyntheticChannelFabricControl, "smoke-relay") if err != nil { return smokeReport{}, fmt.Errorf("relay probe: %w", err) } testService, err := nodeA.Runtime.SendTestService(ctx, relayRoute.RouteID, mesh.SyntheticChannelRouteControl, mesh.SyntheticTestServiceRequest{ RequestID: "smoke-test-service", OrganizationID: mesh.SyntheticDefaultTestOrganizationID, ServiceType: mesh.SyntheticTestServiceType, Payload: "hello-c17e", SentAt: time.Now().UTC(), }) if err != nil { return smokeReport{}, fmt.Errorf("test service: %w", err) } fabricSessionStartedAt := time.Now() fabricSession, _, err := mesh.NewClient(nodeB.URL).OpenFabricSession(ctx, mesh.FabricSessionDialOptions{ Token: "rap_fsn_mesh_live_smoke", Timeout: 3 * time.Second, }) if err != nil { return smokeReport{}, fmt.Errorf("fabric session open: %w", err) } defer fabricSession.Close() firstFabricSessionResponse, err := fabricSession.RoundTrip(ctx, fabricproto.Frame{ Type: fabricproto.FramePing, Sequence: uint64(fabricSessionStartedAt.UnixNano()), Payload: []byte("mesh-live-smoke-fabric-session"), }) if err != nil { return smokeReport{}, fmt.Errorf("fabric session first round trip: %w", err) } secondFabricSessionResponse, err := fabricSession.RoundTrip(ctx, fabricproto.Frame{ Type: fabricproto.FramePing, Sequence: uint64(fabricSessionStartedAt.UnixNano()) + 1, Payload: []byte("mesh-live-smoke-fabric-session-2"), }) if err != nil { return smokeReport{}, fmt.Errorf("fabric session second round trip: %w", err) } fabricSessionLatency := time.Since(fabricSessionStartedAt) fabricSessionAccepted := firstFabricSessionResponse.Type == fabricproto.FramePong && string(firstFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session" && secondFabricSessionResponse.Type == fabricproto.FramePong && string(secondFabricSessionResponse.Payload) == "mesh-live-smoke-fabric-session-2" fabricVPNPacketAccepted, fabricVPNPacketSharded, fabricVPNPacketFanout, err := smokeFabricVPNPacketOverSession(ctx, fabricSession) if err != nil { return smokeReport{}, fmt.Errorf("fabric vpn packet session smoke: %w", err) } fabricQUICAccepted, fabricQUICEndpoint, err := smokeQUICFabricSession(ctx) if err != nil { return smokeReport{}, fmt.Errorf("fabric quic smoke: %w", err) } return smokeReport{ Stage: "C17F scoped synthetic config plus live HTTP transport", ProductionForwarding: false, ScopedConfigLoaded: nodeAConfig.ConfigVersion == "smoke-config-v1", DirectProbeAccepted: directAck.MessageType == mesh.SyntheticMessageProbeAck, DirectPath: decodeProbePath(directAck), RelayProbeAccepted: relayAck.MessageType == mesh.SyntheticMessageProbeAck, RelayPath: decodeProbePath(relayAck), TestServiceAccepted: testService.Ack.MessageType == mesh.SyntheticMessageTestServiceAck, TestServiceEchoPayload: testService.Response.EchoPayload, FabricSessionAccepted: fabricSessionAccepted, FabricSessionRoundTrips: 2, FabricVPNPacketAccepted: fabricVPNPacketAccepted, FabricVPNPacketSharded: fabricVPNPacketSharded, FabricVPNPacketFanout: fabricVPNPacketFanout, FabricQUICAccepted: fabricQUICAccepted, FabricQUICEndpoint: fabricQUICEndpoint, FabricSessionLatencyMS: fabricSessionLatency.Milliseconds(), FabricSessionEndpoint: nodeB.URL + "/mesh/v1/fabric/session/ws", PeerEndpoints: map[string]any{ "node-a": nodeA.URL, "node-r": nodeR.URL, "node-b": nodeB.URL, }, }, nil } func smokeQUICFabricSession(ctx context.Context) (bool, string, error) { server, err := mesh.StartQUICFabricServer(ctx, mesh.QUICFabricServerConfig{ ListenAddr: "127.0.0.1:0", TLSConfig: smokeQUICTLSConfig(), }) if err != nil { return false, "", err } defer server.Close() endpoint := server.Addr().String() session, err := mesh.NewQUICFabricTransport(nil).Connect(ctx, mesh.FabricTransportTarget{ Endpoint: endpoint, TLSConfig: &tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"rap-fabric-data-session-v1"}, }, Timeout: 3 * time.Second, InboundBuffer: 4, ErrorBuffer: 4, }) if err != nil { return false, endpoint, err } defer session.Close() if err := session.Send(ctx, fabricproto.Frame{ Type: fabricproto.FramePing, Sequence: uint64(time.Now().UnixNano()), Payload: []byte("mesh-live-smoke-quic"), }); err != nil { return false, endpoint, err } timer := time.NewTimer(3 * time.Second) defer timer.Stop() for { select { case frame := <-session.Frames(): return frame.Type == fabricproto.FramePong && string(frame.Payload) == "mesh-live-smoke-quic", endpoint, nil case err := <-session.Errors(): return false, endpoint, err case <-timer.C: return false, endpoint, fmt.Errorf("timed out waiting for quic pong") case <-ctx.Done(): return false, endpoint, ctx.Err() } } } func smokeQUICTLSConfig() *tls.Config { key, _ := rsa.GenerateKey(rand.Reader, 2048) template := x509.Certificate{ SerialNumber: big.NewInt(time.Now().UnixNano()), Subject: pkix.Name{CommonName: "mesh-live-smoke"}, NotBefore: time.Now().Add(-time.Minute), NotAfter: time.Now().Add(time.Hour), KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, DNSNames: []string{"localhost"}, } certDER, _ := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) return &tls.Config{ Certificates: []tls.Certificate{{ Certificate: [][]byte{certDER}, PrivateKey: key, }}, NextProtos: []string{"rap-fabric-data-session-v1"}, } } func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.FabricSessionClient) (bool, bool, int, error) { const interactiveStreamID uint64 = 4400 const bulkStreamID uint64 = 4401 pump := fabricSession.StartPump(ctx, mesh.FabricSessionPumpOptions{ OutboundBuffer: 4, InboundBuffer: 4, ErrorBuffer: 4, }) defer pump.Close() for _, frame := range []fabricproto.Frame{ {Type: fabricproto.FrameOpenStream, StreamID: interactiveStreamID, TrafficClass: fabricproto.TrafficClassInteractive}, {Type: fabricproto.FrameOpenStream, StreamID: bulkStreamID, TrafficClass: fabricproto.TrafficClassBulk}, } { if err := pump.Send(ctx, frame); err != nil { return false, false, 0, err } } transport := &vpnruntime.FabricSessionPacketTransport{ Sender: pump, StreamID: interactiveStreamID, VPNConnectionID: "vpn-smoke", SendDirection: vpnruntime.FabricDirectionGatewayToClient, StreamIDsByTrafficClass: map[string][]uint64{ vpnruntime.FabricTrafficClassInteractive: []uint64{interactiveStreamID}, vpnruntime.FabricTrafficClassBulk: []uint64{bulkStreamID}, }, } bulkPacket := smokeIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443, 0) controlPacket := smokeIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389, 0x02) if err := transport.SendGatewayPacketBatch(ctx, [][]byte{bulkPacket, controlPacket}); err != nil { return false, false, 0, err } timer := time.NewTimer(3 * time.Second) defer timer.Stop() acked := map[uint64]bool{} for { select { case frame := <-pump.Frames(): if frame.Type == fabricproto.FrameAck && frame.Sequence == 1 { acked[frame.StreamID] = true if acked[interactiveStreamID] && acked[bulkStreamID] { snapshot := transport.Snapshot() framesByClass, _ := snapshot["send_frames_by_class"].(map[string]uint64) sharded := framesByClass[vpnruntime.FabricTrafficClassInteractive] == 1 && framesByClass[vpnruntime.FabricTrafficClassBulk] == 1 && snapshot["sharding_active"] == true && snapshot["send_class_count"] == 2 && snapshot["send_stream_count"] == 2 fanout, _ := snapshot["last_batch_frame_count"].(uint64) return true, sharded, int(fanout), nil } } case err := <-pump.Errors(): return false, false, 0, err case <-timer.C: return false, false, 0, fmt.Errorf("timed out waiting for fabric vpn packet ack") case <-ctx.Done(): return false, false, 0, ctx.Err() } } } func smokeIPv4TCPPacket(src [4]byte, dst [4]byte, srcPort uint16, dstPort uint16, flags byte) []byte { packet := make([]byte, 40) packet[0] = 0x45 packet[2] = 0 packet[3] = 40 packet[8] = 64 packet[9] = 6 copy(packet[12:16], src[:]) copy(packet[16:20], dst[:]) packet[20] = byte(srcPort >> 8) packet[21] = byte(srcPort) packet[22] = byte(dstPort >> 8) packet[23] = byte(dstPort) packet[32] = 0x50 packet[33] = flags return packet } func writeSmokeScopedConfig(local mesh.PeerIdentity, peers map[string]string, routes []mesh.SyntheticRoute) (string, error) { path := filepath.Join(os.TempDir(), "rap-c17e-node-a-scoped-mesh.json") payload, err := json.Marshal(mesh.ScopedSyntheticConfig{ SchemaVersion: "c17f.synthetic.v1", ClusterID: local.ClusterID, LocalNodeID: local.NodeID, ConfigVersion: "smoke-config-v1", PeerDirectoryVersion: "smoke-peers-v1", PolicyVersion: "smoke-policy-v1", PeerEndpoints: peers, Routes: routes, }) if err != nil { return "", err } if err := os.WriteFile(path, payload, 0o600); err != nil { return "", err } return path, nil } func newSmokeNode(local mesh.PeerIdentity) *smokeNode { node := &smokeNode{Local: local} node.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mesh.Server{Local: node.Local, SyntheticRuntime: node.Runtime, FabricSessionEnabled: true}.Handler().ServeHTTP(w, r) })) node.URL = node.server.URL return node } func (n *smokeNode) Close() { if n.server != nil { n.server.Close() } } func smokeRuntime(local mesh.PeerIdentity, routes []mesh.SyntheticRoute, peers map[string]string) *mesh.SyntheticRuntime { return mesh.NewSyntheticRuntime(mesh.SyntheticRuntimeConfig{ Enabled: true, Local: local, Routes: routes, AllowedChannels: []string{ mesh.SyntheticChannelFabricControl, mesh.SyntheticChannelRouteControl, }, Transport: mesh.NewHTTPPeerTransport(peers), }) } func smokeRoute(routeID string, hops []string) mesh.SyntheticRoute { return mesh.SyntheticRoute{ RouteID: routeID, ClusterID: "cluster-1", SourceNodeID: hops[0], DestinationNodeID: hops[len(hops)-1], Hops: hops, AllowedChannels: []string{mesh.SyntheticChannelFabricControl, mesh.SyntheticChannelRouteControl}, MaxTTL: 8, MaxHops: 8, ExpiresAt: time.Now().UTC().Add(time.Hour), RouteVersion: "route-v1", PolicyVersion: "policy-v1", PeerDirectoryVersion: "peers-v1", } } func decodeProbePath(envelope mesh.SyntheticEnvelope) []string { var payload mesh.SyntheticProbeAckPayload _ = json.Unmarshal(envelope.Payload, &payload) return payload.Path }