package mesh import ( "context" "crypto/tls" "fmt" "net/http" "strings" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" ) type FabricTransportSession interface { Send(context.Context, fabricproto.Frame) error Frames() <-chan fabricproto.Frame Errors() <-chan error Close() error Closed() bool } type FabricTransport interface { Connect(context.Context, FabricTransportTarget) (FabricTransportSession, error) Close() error } type FabricTransportTarget struct { PeerID string Endpoint string Transport string Token string Header http.Header TLSConfig *tls.Config PeerCertSHA256 string Timeout time.Duration MaxPayload int OutboundBuffer int InboundBuffer int ErrorBuffer int } func FabricTransportForTarget(target FabricTransportTarget, websocket *WebSocketFabricTransport, quicTransport *QUICFabricTransport) (FabricTransport, FabricTransportTarget, error) { transportLabel := strings.ToLower(strings.TrimSpace(target.Transport)) endpoint := strings.TrimSpace(target.Endpoint) if strings.HasPrefix(strings.ToLower(endpoint), "quic://") { transportLabel = "quic" target.Endpoint = strings.TrimPrefix(endpoint, "quic://") } switch transportLabel { case "quic", "direct_quic", "udp_quic", "quic_udp": if quicTransport == nil { quicTransport = NewQUICFabricTransport(nil) } return quicTransport, target, nil case "", "websocket", "ws", "wss", "direct_http", "direct_https", "direct_tcp_tls": if websocket == nil { websocket = NewWebSocketFabricTransport(nil) } return websocket, target, nil default: return nil, target, fmt.Errorf("unsupported fabric transport %q", target.Transport) } } type WebSocketFabricTransport struct { Manager *FabricSessionPeerManager } func NewWebSocketFabricTransport(manager *FabricSessionPeerManager) *WebSocketFabricTransport { if manager == nil { manager = NewFabricSessionPeerManager() } return &WebSocketFabricTransport{Manager: manager} } func (t *WebSocketFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) { manager := t.Manager if manager == nil { manager = NewFabricSessionPeerManager() t.Manager = manager } return manager.Get(ctx, FabricSessionPeerTarget{ PeerID: target.PeerID, BaseURL: target.Endpoint, Options: FabricSessionDialOptions{ Token: target.Token, Header: target.Header, Timeout: target.Timeout, MaxPayload: target.MaxPayload, }, Pump: FabricSessionPumpOptions{ OutboundBuffer: target.OutboundBuffer, InboundBuffer: target.InboundBuffer, ErrorBuffer: target.ErrorBuffer, }, }) } func (t *WebSocketFabricTransport) Close() error { if t == nil || t.Manager == nil { return nil } return t.Manager.Close() } func (t *WebSocketFabricTransport) Snapshot() FabricSessionPeerManagerSnapshot { if t == nil || t.Manager == nil { return FabricSessionPeerManagerSnapshot{SchemaVersion: "rap.fabric_session_peer_manager.v1"} } return t.Manager.Snapshot() }