From ba3522d966c094bb6238deb368e77f3afc287cdb Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 10:14:07 +0300 Subject: [PATCH] Introduce fabric transport abstraction --- .../rap-node-agent/cmd/rap-node-agent/main.go | 50 +++++--- .../internal/mesh/fabric_transport.go | 82 +++++++++++++ .../internal/mesh/fabric_transport_test.go | 109 ++++++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 4 files changed, 226 insertions(+), 18 deletions(-) create mode 100644 agents/rap-node-agent/internal/mesh/fabric_transport.go create mode 100644 agents/rap-node-agent/internal/mesh/fabric_transport_test.go diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index ba84f2f..05eb55f 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -353,6 +353,7 @@ type syntheticMeshState struct { VPNFabricInbox *vpnruntime.FabricPacketInbox VPNFabricIngress *vpnruntime.FabricClientPacketIngress VPNFabricSessionPeers *mesh.FabricSessionPeerManager + VPNFabricTransport *mesh.WebSocketFabricTransport PeerEndpoints map[string]string VPNGateway *vpnruntime.Gateway ServiceChannelAccessStats *fabricServiceChannelAccessStats @@ -776,6 +777,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c dynamicListenerHandler := newDynamicHTTPHandler(serverHandler) listenerCfg := meshListenerRuntimeConfig(cfg, loadedConfig.MeshListener) listenerReport, stopListener := startSyntheticMeshHTTPServer(ctx, listenerCfg, identity, dynamicListenerHandler, len(peerEndpoints), len(routes), gateEnabled, runtimeEnabled) + vpnFabricSessionPeers := mesh.NewFabricSessionPeerManager() return &syntheticMeshState{ Runtime: runtime, Routes: routes, @@ -800,7 +802,8 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c ProductionForwardingEnabled: productionForwardingEnabled, VPNFabricInbox: vpnFabricInbox, VPNFabricIngress: vpnFabricIngress, - VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(), + VPNFabricSessionPeers: vpnFabricSessionPeers, + VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), PeerEndpoints: copyStringMap(peerEndpoints), VPNGateway: vpnGateway, ServiceChannelAccessStats: serviceChannelAccessStats, @@ -1616,6 +1619,13 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i if !sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) && meshState.VPNFabricSessionPeers != nil { _ = meshState.VPNFabricSessionPeers.Close() meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() + meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) + } + if meshState.VPNFabricSessionPeers == nil { + meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() + } + if meshState.VPNFabricTransport == nil { + meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) if productionForwardingEnabled { @@ -2500,7 +2510,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn "gated": true, "observed_at": observedAt.UTC().Format(time.RFC3339Nano), } - if meshState != nil && meshState.VPNFabricSessionPeers != nil { + if meshState != nil && meshState.VPNFabricTransport != nil { + report["peer_sessions"] = meshState.VPNFabricTransport.Snapshot() + } else if meshState != nil && meshState.VPNFabricSessionPeers != nil { report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot() } payload.Metadata["vpn_fabric_session_transport_report"] = report @@ -4419,7 +4431,7 @@ func fabricGatewayTransportForAssignment(ctx context.Context, cfg config.Config, } func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, nextHop string) vpnruntime.PacketTransport { - if meshState == nil || meshState.VPNFabricInbox == nil || meshState.VPNFabricSessionPeers == nil || assignment.VPNConnectionID == "" || nextHop == "" { + if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" { return nil } endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/") @@ -4429,18 +4441,20 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st } dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - pump, err := meshState.VPNFabricSessionPeers.Get(dialCtx, mesh.FabricSessionPeerTarget{ - PeerID: nextHop, - BaseURL: endpoint, - Options: mesh.FabricSessionDialOptions{ - Token: fabricSessionGatewayToken(identity, assignment, nextHop), - Timeout: 3 * time.Second, - }, - Pump: mesh.FabricSessionPumpOptions{ - OutboundBuffer: 256, - InboundBuffer: 256, - ErrorBuffer: 16, - }, + if meshState.VPNFabricSessionPeers == nil { + meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() + } + if meshState.VPNFabricTransport == nil { + meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) + } + session, err := meshState.VPNFabricTransport.Connect(dialCtx, mesh.FabricTransportTarget{ + PeerID: nextHop, + Endpoint: endpoint, + Token: fabricSessionGatewayToken(identity, assignment, nextHop), + Timeout: 3 * time.Second, + OutboundBuffer: 256, + InboundBuffer: 256, + ErrorBuffer: 16, }) if err != nil { log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err) @@ -4450,7 +4464,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if streamID == 0 { streamID = 1 } - if err := pump.Send(dialCtx, fabricproto.Frame{ + if err := session.Send(dialCtx, fabricproto.Frame{ Type: fabricproto.FrameOpenStream, StreamID: streamID, TrafficClass: fabricproto.TrafficClassInteractive, @@ -4459,8 +4473,8 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st return nil } return &vpnruntime.FabricSessionPacketTransport{ - Sender: pump, - Receiver: pump, + Sender: session, + Receiver: session, Inbox: meshState.VPNFabricInbox, StreamID: streamID, VPNConnectionID: assignment.VPNConnectionID, diff --git a/agents/rap-node-agent/internal/mesh/fabric_transport.go b/agents/rap-node-agent/internal/mesh/fabric_transport.go new file mode 100644 index 0000000..71f577b --- /dev/null +++ b/agents/rap-node-agent/internal/mesh/fabric_transport.go @@ -0,0 +1,82 @@ +package mesh + +import ( + "context" + "net/http" + "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" +) + +type FabricTransportSession interface { + Send(context.Context, fabricproto.Frame) error + Frames() <-chan fabricproto.Frame + Errors() <-chan error + Close() error + Closed() bool +} + +type FabricTransport interface { + Connect(context.Context, FabricTransportTarget) (FabricTransportSession, error) + Close() error +} + +type FabricTransportTarget struct { + PeerID string + Endpoint string + Token string + Header http.Header + Timeout time.Duration + MaxPayload int + OutboundBuffer int + InboundBuffer int + ErrorBuffer int +} + +type WebSocketFabricTransport struct { + Manager *FabricSessionPeerManager +} + +func NewWebSocketFabricTransport(manager *FabricSessionPeerManager) *WebSocketFabricTransport { + if manager == nil { + manager = NewFabricSessionPeerManager() + } + return &WebSocketFabricTransport{Manager: manager} +} + +func (t *WebSocketFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) { + manager := t.Manager + if manager == nil { + manager = NewFabricSessionPeerManager() + t.Manager = manager + } + return manager.Get(ctx, FabricSessionPeerTarget{ + PeerID: target.PeerID, + BaseURL: target.Endpoint, + Options: FabricSessionDialOptions{ + Token: target.Token, + Header: target.Header, + Timeout: target.Timeout, + MaxPayload: target.MaxPayload, + }, + Pump: FabricSessionPumpOptions{ + OutboundBuffer: target.OutboundBuffer, + InboundBuffer: target.InboundBuffer, + ErrorBuffer: target.ErrorBuffer, + }, + }) +} + +func (t *WebSocketFabricTransport) Close() error { + if t == nil || t.Manager == nil { + return nil + } + return t.Manager.Close() +} + +func (t *WebSocketFabricTransport) Snapshot() FabricSessionPeerManagerSnapshot { + if t == nil || t.Manager == nil { + return FabricSessionPeerManagerSnapshot{SchemaVersion: "rap.fabric_session_peer_manager.v1"} + } + return t.Manager.Snapshot() +} diff --git a/agents/rap-node-agent/internal/mesh/fabric_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_transport_test.go new file mode 100644 index 0000000..d2b79fb --- /dev/null +++ b/agents/rap-node-agent/internal/mesh/fabric_transport_test.go @@ -0,0 +1,109 @@ +package mesh + +import ( + "context" + "net/http/httptest" + "testing" + "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" +) + +func TestWebSocketFabricTransportConnectsAndReusesSession(t *testing.T) { + var opened int + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + FabricSessionLogger: func(entry FabricSessionEventLogEntry) { + if entry.Event == "fabric_session_websocket_opened" { + opened++ + } + }, + }.Handler()) + defer server.Close() + + transport := NewWebSocketFabricTransport(nil) + defer transport.Close() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + target := FabricTransportTarget{ + PeerID: "node-a", + Endpoint: server.URL, + Token: "rap_fsn_transport", + Timeout: time.Second, + OutboundBuffer: 4, + InboundBuffer: 4, + ErrorBuffer: 4, + } + + first, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("first connect: %v", err) + } + second, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("second connect: %v", err) + } + if first != second { + t.Fatal("transport did not reuse session") + } + if opened != 1 { + t.Fatalf("opened = %d, want 1", opened) + } + if err := first.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 1, Payload: []byte("transport")}); err != nil { + t.Fatalf("send ping: %v", err) + } + select { + case frame := <-first.Frames(): + if frame.Type != fabricproto.FramePong || frame.Sequence != 1 || string(frame.Payload) != "transport" { + t.Fatalf("frame = %+v", frame) + } + case err := <-first.Errors(): + t.Fatalf("session error: %v", err) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} + +func TestWebSocketFabricTransportReopensClosedSession(t *testing.T) { + var opened int + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + FabricSessionLogger: func(entry FabricSessionEventLogEntry) { + if entry.Event == "fabric_session_websocket_opened" { + opened++ + } + }, + }.Handler()) + defer server.Close() + + transport := NewWebSocketFabricTransport(nil) + defer transport.Close() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + target := FabricTransportTarget{ + PeerID: "node-a", + Endpoint: server.URL, + Token: "rap_fsn_transport_reopen", + Timeout: time.Second, + } + + first, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("first connect: %v", err) + } + if err := first.Close(); err != nil { + t.Fatalf("close first session: %v", err) + } + second, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("second connect: %v", err) + } + if first == second { + t.Fatal("transport reused closed session") + } + if opened != 2 { + t.Fatalf("opened = %d, want 2", opened) + } +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 354d47e..bc11aa8 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -291,6 +291,9 @@ Peer session reuse now evicts closed pumps before reuse, so failed WebSocket sessions can be reopened on the next transport acquisition. Heartbeat telemetry includes peer session manager counters for active sessions, reuses, opens, closed-pump evictions, and explicit close operations. +The mesh package now exposes a service-neutral `FabricTransport` abstraction; +the current WebSocket carrier implements it as `WebSocketFabricTransport`, so +future QUIC/UDP transport can be added without changing VPN/RDP/HTTP services. Deliverables: