111 lines
3.0 KiB
Go
111 lines
3.0 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
|
)
|
|
|
|
type FabricTransportSession interface {
|
|
Send(context.Context, fabricproto.Frame) error
|
|
Frames() <-chan fabricproto.Frame
|
|
Errors() <-chan error
|
|
Close() error
|
|
Closed() bool
|
|
}
|
|
|
|
type FabricTransport interface {
|
|
Connect(context.Context, FabricTransportTarget) (FabricTransportSession, error)
|
|
Close() error
|
|
}
|
|
|
|
type FabricTransportTarget struct {
|
|
PeerID string
|
|
Endpoint string
|
|
Transport string
|
|
Token string
|
|
Header http.Header
|
|
TLSConfig *tls.Config
|
|
Timeout time.Duration
|
|
MaxPayload int
|
|
OutboundBuffer int
|
|
InboundBuffer int
|
|
ErrorBuffer int
|
|
}
|
|
|
|
func FabricTransportForTarget(target FabricTransportTarget, websocket *WebSocketFabricTransport, quicTransport *QUICFabricTransport) (FabricTransport, FabricTransportTarget, error) {
|
|
transportLabel := strings.ToLower(strings.TrimSpace(target.Transport))
|
|
endpoint := strings.TrimSpace(target.Endpoint)
|
|
if strings.HasPrefix(strings.ToLower(endpoint), "quic://") {
|
|
transportLabel = "quic"
|
|
target.Endpoint = strings.TrimPrefix(endpoint, "quic://")
|
|
}
|
|
switch transportLabel {
|
|
case "quic", "direct_quic", "udp_quic", "quic_udp":
|
|
if quicTransport == nil {
|
|
quicTransport = NewQUICFabricTransport(nil)
|
|
}
|
|
return quicTransport, target, nil
|
|
case "", "websocket", "ws", "wss", "direct_http", "direct_https", "direct_tcp_tls":
|
|
if websocket == nil {
|
|
websocket = NewWebSocketFabricTransport(nil)
|
|
}
|
|
return websocket, target, nil
|
|
default:
|
|
return nil, target, fmt.Errorf("unsupported fabric transport %q", target.Transport)
|
|
}
|
|
}
|
|
|
|
type WebSocketFabricTransport struct {
|
|
Manager *FabricSessionPeerManager
|
|
}
|
|
|
|
func NewWebSocketFabricTransport(manager *FabricSessionPeerManager) *WebSocketFabricTransport {
|
|
if manager == nil {
|
|
manager = NewFabricSessionPeerManager()
|
|
}
|
|
return &WebSocketFabricTransport{Manager: manager}
|
|
}
|
|
|
|
func (t *WebSocketFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) {
|
|
manager := t.Manager
|
|
if manager == nil {
|
|
manager = NewFabricSessionPeerManager()
|
|
t.Manager = manager
|
|
}
|
|
return manager.Get(ctx, FabricSessionPeerTarget{
|
|
PeerID: target.PeerID,
|
|
BaseURL: target.Endpoint,
|
|
Options: FabricSessionDialOptions{
|
|
Token: target.Token,
|
|
Header: target.Header,
|
|
Timeout: target.Timeout,
|
|
MaxPayload: target.MaxPayload,
|
|
},
|
|
Pump: FabricSessionPumpOptions{
|
|
OutboundBuffer: target.OutboundBuffer,
|
|
InboundBuffer: target.InboundBuffer,
|
|
ErrorBuffer: target.ErrorBuffer,
|
|
},
|
|
})
|
|
}
|
|
|
|
func (t *WebSocketFabricTransport) Close() error {
|
|
if t == nil || t.Manager == nil {
|
|
return nil
|
|
}
|
|
return t.Manager.Close()
|
|
}
|
|
|
|
func (t *WebSocketFabricTransport) Snapshot() FabricSessionPeerManagerSnapshot {
|
|
if t == nil || t.Manager == nil {
|
|
return FabricSessionPeerManagerSnapshot{SchemaVersion: "rap.fabric_session_peer_manager.v1"}
|
|
}
|
|
return t.Manager.Snapshot()
|
|
}
|