Files
rdp-proxy/agents/rap-node-agent/internal/mesh/fabric_transport.go
T

112 lines
3.0 KiB
Go

package mesh
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
)
type FabricTransportSession interface {
Send(context.Context, fabricproto.Frame) error
Frames() <-chan fabricproto.Frame
Errors() <-chan error
Close() error
Closed() bool
}
type FabricTransport interface {
Connect(context.Context, FabricTransportTarget) (FabricTransportSession, error)
Close() error
}
type FabricTransportTarget struct {
PeerID string
Endpoint string
Transport string
Token string
Header http.Header
TLSConfig *tls.Config
PeerCertSHA256 string
Timeout time.Duration
MaxPayload int
OutboundBuffer int
InboundBuffer int
ErrorBuffer int
}
func FabricTransportForTarget(target FabricTransportTarget, websocket *WebSocketFabricTransport, quicTransport *QUICFabricTransport) (FabricTransport, FabricTransportTarget, error) {
transportLabel := strings.ToLower(strings.TrimSpace(target.Transport))
endpoint := strings.TrimSpace(target.Endpoint)
if strings.HasPrefix(strings.ToLower(endpoint), "quic://") {
transportLabel = "quic"
target.Endpoint = strings.TrimPrefix(endpoint, "quic://")
}
switch transportLabel {
case "quic", "direct_quic", "udp_quic", "quic_udp":
if quicTransport == nil {
quicTransport = NewQUICFabricTransport(nil)
}
return quicTransport, target, nil
case "", "websocket", "ws", "wss", "direct_http", "direct_https", "direct_tcp_tls":
if websocket == nil {
websocket = NewWebSocketFabricTransport(nil)
}
return websocket, target, nil
default:
return nil, target, fmt.Errorf("unsupported fabric transport %q", target.Transport)
}
}
type WebSocketFabricTransport struct {
Manager *FabricSessionPeerManager
}
func NewWebSocketFabricTransport(manager *FabricSessionPeerManager) *WebSocketFabricTransport {
if manager == nil {
manager = NewFabricSessionPeerManager()
}
return &WebSocketFabricTransport{Manager: manager}
}
func (t *WebSocketFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) {
manager := t.Manager
if manager == nil {
manager = NewFabricSessionPeerManager()
t.Manager = manager
}
return manager.Get(ctx, FabricSessionPeerTarget{
PeerID: target.PeerID,
BaseURL: target.Endpoint,
Options: FabricSessionDialOptions{
Token: target.Token,
Header: target.Header,
Timeout: target.Timeout,
MaxPayload: target.MaxPayload,
},
Pump: FabricSessionPumpOptions{
OutboundBuffer: target.OutboundBuffer,
InboundBuffer: target.InboundBuffer,
ErrorBuffer: target.ErrorBuffer,
},
})
}
func (t *WebSocketFabricTransport) Close() error {
if t == nil || t.Manager == nil {
return nil
}
return t.Manager.Close()
}
func (t *WebSocketFabricTransport) Snapshot() FabricSessionPeerManagerSnapshot {
if t == nil || t.Manager == nil {
return FabricSessionPeerManagerSnapshot{SchemaVersion: "rap.fabric_session_peer_manager.v1"}
}
return t.Manager.Snapshot()
}