Select fabric carrier by endpoint

This commit is contained in:
2026-05-16 10:22:44 +03:00
parent 130ff117f3
commit d3e5d540bb
6 changed files with 70 additions and 4 deletions
@@ -4447,7 +4447,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
if meshState.VPNFabricTransport == nil {
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
}
session, err := meshState.VPNFabricTransport.Connect(dialCtx, mesh.FabricTransportTarget{
target := mesh.FabricTransportTarget{
PeerID: nextHop,
Endpoint: endpoint,
Token: fabricSessionGatewayToken(identity, assignment, nextHop),
@@ -4455,7 +4455,13 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
OutboundBuffer: 256,
InboundBuffer: 256,
ErrorBuffer: 16,
})
}
carrier, target, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil)
if err != nil {
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, err)
return nil
}
session, err := carrier.Connect(dialCtx, target)
if err != nil {
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err)
return nil
@@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"strings"
"sync"
"time"
@@ -37,6 +38,7 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor
if target.Endpoint == "" {
return nil, fmt.Errorf("quic fabric endpoint is required")
}
target.Endpoint = strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://")
tlsConfig := target.TLSConfig
if tlsConfig == nil {
tlsConfig = &tls.Config{NextProtos: []string{fabricQUICNextProto}}
@@ -76,7 +78,7 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor
maxPayload: maxPayload,
timeout: target.Timeout,
}
go session.readLoop(ctx)
go session.readLoop(context.Background())
return session, nil
}
@@ -66,7 +66,7 @@ func (m *FabricSessionPeerManager) Get(ctx context.Context, target FabricSession
if err != nil {
return nil, err
}
pump := session.StartPump(ctx, target.Pump)
pump := session.StartPump(context.Background(), target.Pump)
m.mu.Lock()
if existing := m.sessions[key]; existing != nil {
@@ -3,7 +3,9 @@ package mesh
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
@@ -25,6 +27,7 @@ type FabricTransport interface {
type FabricTransportTarget struct {
PeerID string
Endpoint string
Transport string
Token string
Header http.Header
TLSConfig *tls.Config
@@ -35,6 +38,29 @@ type FabricTransportTarget struct {
ErrorBuffer int
}
func FabricTransportForTarget(target FabricTransportTarget, websocket *WebSocketFabricTransport, quicTransport *QUICFabricTransport) (FabricTransport, FabricTransportTarget, error) {
transportLabel := strings.ToLower(strings.TrimSpace(target.Transport))
endpoint := strings.TrimSpace(target.Endpoint)
if strings.HasPrefix(strings.ToLower(endpoint), "quic://") {
transportLabel = "quic"
target.Endpoint = strings.TrimPrefix(endpoint, "quic://")
}
switch transportLabel {
case "quic", "direct_quic", "udp_quic", "quic_udp":
if quicTransport == nil {
quicTransport = NewQUICFabricTransport(nil)
}
return quicTransport, target, nil
case "", "websocket", "ws", "wss", "direct_http", "direct_https", "direct_tcp_tls":
if websocket == nil {
websocket = NewWebSocketFabricTransport(nil)
}
return websocket, target, nil
default:
return nil, target, fmt.Errorf("unsupported fabric transport %q", target.Transport)
}
}
type WebSocketFabricTransport struct {
Manager *FabricSessionPeerManager
}
@@ -107,3 +107,33 @@ func TestWebSocketFabricTransportReopensClosedSession(t *testing.T) {
t.Fatalf("opened = %d, want 2", opened)
}
}
func TestFabricTransportForTargetSelectsQUICByScheme(t *testing.T) {
transport, target, err := FabricTransportForTarget(FabricTransportTarget{
Endpoint: "quic://127.0.0.1:4433",
}, nil, nil)
if err != nil {
t.Fatalf("select transport: %v", err)
}
if _, ok := transport.(*QUICFabricTransport); !ok {
t.Fatalf("transport = %T, want QUIC", transport)
}
if target.Endpoint != "127.0.0.1:4433" {
t.Fatalf("endpoint = %q", target.Endpoint)
}
}
func TestFabricTransportForTargetSelectsWebSocketByDefault(t *testing.T) {
transport, target, err := FabricTransportForTarget(FabricTransportTarget{
Endpoint: "https://node.example",
}, nil, nil)
if err != nil {
t.Fatalf("select transport: %v", err)
}
if _, ok := transport.(*WebSocketFabricTransport); !ok {
t.Fatalf("transport = %T, want websocket", transport)
}
if target.Endpoint != "https://node.example" {
t.Fatalf("endpoint = %q", target.Endpoint)
}
}
@@ -297,6 +297,8 @@ future QUIC/UDP transport can be added without changing VPN/RDP/HTTP services.
`QUICFabricTransport` now implements the same interface and carries the same
binary `fabricproto` frames over a QUIC stream, with local smoke coverage for
`PING`/`PONG` and DATA/ACK.
Carrier selection understands QUIC transport labels and `quic://host:port`
endpoints while preserving WebSocket as the default fallback.
Deliverables: