Introduce fabric transport abstraction

This commit is contained in:
2026-05-16 10:14:07 +03:00
parent 58c810d2e8
commit ba3522d966
4 changed files with 226 additions and 18 deletions
@@ -353,6 +353,7 @@ type syntheticMeshState struct {
VPNFabricInbox *vpnruntime.FabricPacketInbox
VPNFabricIngress *vpnruntime.FabricClientPacketIngress
VPNFabricSessionPeers *mesh.FabricSessionPeerManager
VPNFabricTransport *mesh.WebSocketFabricTransport
PeerEndpoints map[string]string
VPNGateway *vpnruntime.Gateway
ServiceChannelAccessStats *fabricServiceChannelAccessStats
@@ -776,6 +777,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
dynamicListenerHandler := newDynamicHTTPHandler(serverHandler)
listenerCfg := meshListenerRuntimeConfig(cfg, loadedConfig.MeshListener)
listenerReport, stopListener := startSyntheticMeshHTTPServer(ctx, listenerCfg, identity, dynamicListenerHandler, len(peerEndpoints), len(routes), gateEnabled, runtimeEnabled)
vpnFabricSessionPeers := mesh.NewFabricSessionPeerManager()
return &syntheticMeshState{
Runtime: runtime,
Routes: routes,
@@ -800,7 +802,8 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
ProductionForwardingEnabled: productionForwardingEnabled,
VPNFabricInbox: vpnFabricInbox,
VPNFabricIngress: vpnFabricIngress,
VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(),
VPNFabricSessionPeers: vpnFabricSessionPeers,
VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers),
PeerEndpoints: copyStringMap(peerEndpoints),
VPNGateway: vpnGateway,
ServiceChannelAccessStats: serviceChannelAccessStats,
@@ -1616,6 +1619,13 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
if !sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) && meshState.VPNFabricSessionPeers != nil {
_ = meshState.VPNFabricSessionPeers.Close()
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
}
if meshState.VPNFabricSessionPeers == nil {
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
}
if meshState.VPNFabricTransport == nil {
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
}
meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints)
if productionForwardingEnabled {
@@ -2500,7 +2510,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
"gated": true,
"observed_at": observedAt.UTC().Format(time.RFC3339Nano),
}
if meshState != nil && meshState.VPNFabricSessionPeers != nil {
if meshState != nil && meshState.VPNFabricTransport != nil {
report["peer_sessions"] = meshState.VPNFabricTransport.Snapshot()
} else if meshState != nil && meshState.VPNFabricSessionPeers != nil {
report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot()
}
payload.Metadata["vpn_fabric_session_transport_report"] = report
@@ -4419,7 +4431,7 @@ func fabricGatewayTransportForAssignment(ctx context.Context, cfg config.Config,
}
func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, nextHop string) vpnruntime.PacketTransport {
if meshState == nil || meshState.VPNFabricInbox == nil || meshState.VPNFabricSessionPeers == nil || assignment.VPNConnectionID == "" || nextHop == "" {
if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" {
return nil
}
endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/")
@@ -4429,18 +4441,20 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
}
dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
pump, err := meshState.VPNFabricSessionPeers.Get(dialCtx, mesh.FabricSessionPeerTarget{
if meshState.VPNFabricSessionPeers == nil {
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
}
if meshState.VPNFabricTransport == nil {
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
}
session, err := meshState.VPNFabricTransport.Connect(dialCtx, mesh.FabricTransportTarget{
PeerID: nextHop,
BaseURL: endpoint,
Options: mesh.FabricSessionDialOptions{
Endpoint: endpoint,
Token: fabricSessionGatewayToken(identity, assignment, nextHop),
Timeout: 3 * time.Second,
},
Pump: mesh.FabricSessionPumpOptions{
OutboundBuffer: 256,
InboundBuffer: 256,
ErrorBuffer: 16,
},
})
if err != nil {
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err)
@@ -4450,7 +4464,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
if streamID == 0 {
streamID = 1
}
if err := pump.Send(dialCtx, fabricproto.Frame{
if err := session.Send(dialCtx, fabricproto.Frame{
Type: fabricproto.FrameOpenStream,
StreamID: streamID,
TrafficClass: fabricproto.TrafficClassInteractive,
@@ -4459,8 +4473,8 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
return nil
}
return &vpnruntime.FabricSessionPacketTransport{
Sender: pump,
Receiver: pump,
Sender: session,
Receiver: session,
Inbox: meshState.VPNFabricInbox,
StreamID: streamID,
VPNConnectionID: assignment.VPNConnectionID,
@@ -0,0 +1,82 @@
package mesh
import (
"context"
"net/http"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
)
type FabricTransportSession interface {
Send(context.Context, fabricproto.Frame) error
Frames() <-chan fabricproto.Frame
Errors() <-chan error
Close() error
Closed() bool
}
type FabricTransport interface {
Connect(context.Context, FabricTransportTarget) (FabricTransportSession, error)
Close() error
}
type FabricTransportTarget struct {
PeerID string
Endpoint string
Token string
Header http.Header
Timeout time.Duration
MaxPayload int
OutboundBuffer int
InboundBuffer int
ErrorBuffer int
}
type WebSocketFabricTransport struct {
Manager *FabricSessionPeerManager
}
func NewWebSocketFabricTransport(manager *FabricSessionPeerManager) *WebSocketFabricTransport {
if manager == nil {
manager = NewFabricSessionPeerManager()
}
return &WebSocketFabricTransport{Manager: manager}
}
func (t *WebSocketFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) {
manager := t.Manager
if manager == nil {
manager = NewFabricSessionPeerManager()
t.Manager = manager
}
return manager.Get(ctx, FabricSessionPeerTarget{
PeerID: target.PeerID,
BaseURL: target.Endpoint,
Options: FabricSessionDialOptions{
Token: target.Token,
Header: target.Header,
Timeout: target.Timeout,
MaxPayload: target.MaxPayload,
},
Pump: FabricSessionPumpOptions{
OutboundBuffer: target.OutboundBuffer,
InboundBuffer: target.InboundBuffer,
ErrorBuffer: target.ErrorBuffer,
},
})
}
func (t *WebSocketFabricTransport) Close() error {
if t == nil || t.Manager == nil {
return nil
}
return t.Manager.Close()
}
func (t *WebSocketFabricTransport) Snapshot() FabricSessionPeerManagerSnapshot {
if t == nil || t.Manager == nil {
return FabricSessionPeerManagerSnapshot{SchemaVersion: "rap.fabric_session_peer_manager.v1"}
}
return t.Manager.Snapshot()
}
@@ -0,0 +1,109 @@
package mesh
import (
"context"
"net/http/httptest"
"testing"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
)
func TestWebSocketFabricTransportConnectsAndReusesSession(t *testing.T) {
var opened int
server := httptest.NewServer(Server{
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
FabricSessionEnabled: true,
FabricSessionLogger: func(entry FabricSessionEventLogEntry) {
if entry.Event == "fabric_session_websocket_opened" {
opened++
}
},
}.Handler())
defer server.Close()
transport := NewWebSocketFabricTransport(nil)
defer transport.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
target := FabricTransportTarget{
PeerID: "node-a",
Endpoint: server.URL,
Token: "rap_fsn_transport",
Timeout: time.Second,
OutboundBuffer: 4,
InboundBuffer: 4,
ErrorBuffer: 4,
}
first, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("first connect: %v", err)
}
second, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("second connect: %v", err)
}
if first != second {
t.Fatal("transport did not reuse session")
}
if opened != 1 {
t.Fatalf("opened = %d, want 1", opened)
}
if err := first.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 1, Payload: []byte("transport")}); err != nil {
t.Fatalf("send ping: %v", err)
}
select {
case frame := <-first.Frames():
if frame.Type != fabricproto.FramePong || frame.Sequence != 1 || string(frame.Payload) != "transport" {
t.Fatalf("frame = %+v", frame)
}
case err := <-first.Errors():
t.Fatalf("session error: %v", err)
case <-ctx.Done():
t.Fatal(ctx.Err())
}
}
func TestWebSocketFabricTransportReopensClosedSession(t *testing.T) {
var opened int
server := httptest.NewServer(Server{
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
FabricSessionEnabled: true,
FabricSessionLogger: func(entry FabricSessionEventLogEntry) {
if entry.Event == "fabric_session_websocket_opened" {
opened++
}
},
}.Handler())
defer server.Close()
transport := NewWebSocketFabricTransport(nil)
defer transport.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
target := FabricTransportTarget{
PeerID: "node-a",
Endpoint: server.URL,
Token: "rap_fsn_transport_reopen",
Timeout: time.Second,
}
first, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("first connect: %v", err)
}
if err := first.Close(); err != nil {
t.Fatalf("close first session: %v", err)
}
second, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("second connect: %v", err)
}
if first == second {
t.Fatal("transport reused closed session")
}
if opened != 2 {
t.Fatalf("opened = %d, want 2", opened)
}
}
@@ -291,6 +291,9 @@ Peer session reuse now evicts closed pumps before reuse, so failed WebSocket
sessions can be reopened on the next transport acquisition.
Heartbeat telemetry includes peer session manager counters for active sessions,
reuses, opens, closed-pump evictions, and explicit close operations.
The mesh package now exposes a service-neutral `FabricTransport` abstraction;
the current WebSocket carrier implements it as `WebSocketFabricTransport`, so
future QUIC/UDP transport can be added without changing VPN/RDP/HTTP services.
Deliverables: