From d3e5d540bb9d9de3852e4ee91391ca719027c393 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 10:22:44 +0300 Subject: [PATCH] Select fabric carrier by endpoint --- .../rap-node-agent/cmd/rap-node-agent/main.go | 10 +++++-- .../internal/mesh/fabric_quic_transport.go | 4 ++- .../internal/mesh/fabric_session_manager.go | 2 +- .../internal/mesh/fabric_transport.go | 26 ++++++++++++++++ .../internal/mesh/fabric_transport_test.go | 30 +++++++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 ++ 6 files changed, 70 insertions(+), 4 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 05eb55f..b4fcbe1 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -4447,7 +4447,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if meshState.VPNFabricTransport == nil { meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } - session, err := meshState.VPNFabricTransport.Connect(dialCtx, mesh.FabricTransportTarget{ + target := mesh.FabricTransportTarget{ PeerID: nextHop, Endpoint: endpoint, Token: fabricSessionGatewayToken(identity, assignment, nextHop), @@ -4455,7 +4455,13 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st OutboundBuffer: 256, InboundBuffer: 256, ErrorBuffer: 16, - }) + } + carrier, target, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil) + if err != nil { + log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, err) + return nil + } + session, err := carrier.Connect(dialCtx, target) if err != nil { log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err) return nil diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go index 667cb3b..96a30f5 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "strings" "sync" "time" @@ -37,6 +38,7 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor if target.Endpoint == "" { return nil, fmt.Errorf("quic fabric endpoint is required") } + target.Endpoint = strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://") tlsConfig := target.TLSConfig if tlsConfig == nil { tlsConfig = &tls.Config{NextProtos: []string{fabricQUICNextProto}} @@ -76,7 +78,7 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor maxPayload: maxPayload, timeout: target.Timeout, } - go session.readLoop(ctx) + go session.readLoop(context.Background()) return session, nil } diff --git a/agents/rap-node-agent/internal/mesh/fabric_session_manager.go b/agents/rap-node-agent/internal/mesh/fabric_session_manager.go index 177f063..b1ed653 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_session_manager.go +++ b/agents/rap-node-agent/internal/mesh/fabric_session_manager.go @@ -66,7 +66,7 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession if err != nil { return nil, err } - pump := session.StartPump(ctx, target.Pump) + pump := session.StartPump(context.Background(), target.Pump) m.mu.Lock() if existing := m.sessions[key]; existing != nil { diff --git a/agents/rap-node-agent/internal/mesh/fabric_transport.go b/agents/rap-node-agent/internal/mesh/fabric_transport.go index 0067b1b..7791a31 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_transport.go @@ -3,7 +3,9 @@ package mesh import ( "context" "crypto/tls" + "fmt" "net/http" + "strings" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" @@ -25,6 +27,7 @@ type FabricTransport interface { type FabricTransportTarget struct { PeerID string Endpoint string + Transport string Token string Header http.Header TLSConfig *tls.Config @@ -35,6 +38,29 @@ type FabricTransportTarget struct { ErrorBuffer int } +func FabricTransportForTarget(target FabricTransportTarget, websocket *WebSocketFabricTransport, quicTransport *QUICFabricTransport) (FabricTransport, FabricTransportTarget, error) { + transportLabel := strings.ToLower(strings.TrimSpace(target.Transport)) + endpoint := strings.TrimSpace(target.Endpoint) + if strings.HasPrefix(strings.ToLower(endpoint), "quic://") { + transportLabel = "quic" + target.Endpoint = strings.TrimPrefix(endpoint, "quic://") + } + switch transportLabel { + case "quic", "direct_quic", "udp_quic", "quic_udp": + if quicTransport == nil { + quicTransport = NewQUICFabricTransport(nil) + } + return quicTransport, target, nil + case "", "websocket", "ws", "wss", "direct_http", "direct_https", "direct_tcp_tls": + if websocket == nil { + websocket = NewWebSocketFabricTransport(nil) + } + return websocket, target, nil + default: + return nil, target, fmt.Errorf("unsupported fabric transport %q", target.Transport) + } +} + type WebSocketFabricTransport struct { Manager *FabricSessionPeerManager } diff --git a/agents/rap-node-agent/internal/mesh/fabric_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_transport_test.go index d2b79fb..7510c52 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_transport_test.go @@ -107,3 +107,33 @@ func TestWebSocketFabricTransportReopensClosedSession(t *testing.T) { t.Fatalf("opened = %d, want 2", opened) } } + +func TestFabricTransportForTargetSelectsQUICByScheme(t *testing.T) { + transport, target, err := FabricTransportForTarget(FabricTransportTarget{ + Endpoint: "quic://127.0.0.1:4433", + }, nil, nil) + if err != nil { + t.Fatalf("select transport: %v", err) + } + if _, ok := transport.(*QUICFabricTransport); !ok { + t.Fatalf("transport = %T, want QUIC", transport) + } + if target.Endpoint != "127.0.0.1:4433" { + t.Fatalf("endpoint = %q", target.Endpoint) + } +} + +func TestFabricTransportForTargetSelectsWebSocketByDefault(t *testing.T) { + transport, target, err := FabricTransportForTarget(FabricTransportTarget{ + Endpoint: "https://node.example", + }, nil, nil) + if err != nil { + t.Fatalf("select transport: %v", err) + } + if _, ok := transport.(*WebSocketFabricTransport); !ok { + t.Fatalf("transport = %T, want websocket", transport) + } + if target.Endpoint != "https://node.example" { + t.Fatalf("endpoint = %q", target.Endpoint) + } +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 23d140f..30f3118 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -297,6 +297,8 @@ future QUIC/UDP transport can be added without changing VPN/RDP/HTTP services. `QUICFabricTransport` now implements the same interface and carries the same binary `fabricproto` frames over a QUIC stream, with local smoke coverage for `PING`/`PONG` and DATA/ACK. +Carrier selection understands QUIC transport labels and `quic://host:port` +endpoints while preserving WebSocket as the default fallback. Deliverables: