From 8a972ea68f2177932985299a39cc4bc717c06a85 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 00:16:13 +0300 Subject: [PATCH] Add gated fabric session websocket endpoint --- agents/rap-node-agent/internal/mesh/server.go | 85 ++++++++++++++ .../internal/mesh/server_test.go | 105 ++++++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 5 +- 3 files changed, 193 insertions(+), 2 deletions(-) diff --git a/agents/rap-node-agent/internal/mesh/server.go b/agents/rap-node-agent/internal/mesh/server.go index fe20730..3106729 100644 --- a/agents/rap-node-agent/internal/mesh/server.go +++ b/agents/rap-node-agent/internal/mesh/server.go @@ -19,6 +19,7 @@ import ( "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority" + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/gorilla/websocket" ) @@ -26,6 +27,7 @@ type ProductionEnvelopeObserver func(context.Context, ProductionEnvelopeObservat type ProductionEnvelopeDelivery func(context.Context, ProductionEnvelope) error type ProductionForwardLogger func(ProductionForwardLogEntry) type FabricServiceChannelAccessLogger func(FabricServiceChannelAccessLogEntry) +type FabricSessionEventLogger func(FabricSessionEventLogEntry) type RemoteWorkspaceFrameSink interface { AcceptRemoteWorkspaceFrameBatchProbe(context.Context, RemoteWorkspaceFrameBatchDelivery) (RemoteWorkspaceFrameBatchDeliveryReceipt, error) } @@ -81,6 +83,8 @@ type Server struct { BackendProxyBaseURL string ClusterAuthorityPublicKey string ServiceChannelIntrospection bool + FabricSessionEnabled bool + FabricSessionLogger FabricSessionEventLogger } func (s Server) Handler() http.Handler { @@ -88,6 +92,9 @@ func (s Server) Handler() http.Handler { mux.HandleFunc("/mesh/v1/health", s.handleHealth) mux.HandleFunc("/mesh/v1/forward", s.handleForward) mux.HandleFunc("/mesh/v1/synthetic/probe", s.handleSyntheticProbe) + if s.FabricSessionEnabled { + mux.HandleFunc("/mesh/v1/fabric/session/ws", s.handleFabricSessionWebSocket) + } if s.RemoteWorkspaceFrameSink != nil { mux.HandleFunc("/mesh/v1/remote-workspace/adapter-sessions/", s.handleRemoteWorkspaceAdapterSessionControl) } @@ -187,6 +194,84 @@ func (s Server) handleRemoteWorkspaceAdapterSessionSnapshot(w http.ResponseWrite _ = json.NewEncoder(w).Encode(snapshotter.SnapshotAdapterSessions(includeTerminal, limit, time.Now().UTC())) } +type FabricSessionEventLogEntry struct { + Event string `json:"event"` + ClusterID string `json:"cluster_id,omitempty"` + NodeID string `json:"node_id,omitempty"` + SessionEvent fabricproto.SessionEventType `json:"session_event,omitempty"` + StreamID uint64 `json:"stream_id,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + TrafficClass fabricproto.TrafficClass `json:"traffic_class,omitempty"` + RemoteAddr string `json:"remote_addr,omitempty"` + Reason string `json:"reason,omitempty"` + ObservedAt time.Time `json:"observed_at"` +} + +func (s Server) handleFabricSessionWebSocket(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + upgrader := websocket.Upgrader{ + CheckOrigin: func(_ *http.Request) bool { return true }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + s.logFabricSession(FabricSessionEventLogEntry{ + Event: "fabric_session_websocket_opened", + ClusterID: s.Local.ClusterID, + NodeID: s.Local.NodeID, + RemoteAddr: r.RemoteAddr, + ObservedAt: time.Now().UTC(), + }) + loop := fabricproto.TransportLoop{ + Session: fabricproto.NewSession(fabricproto.SessionConfig{}), + OnEvent: func(event fabricproto.SessionEvent) ([]fabricproto.Frame, error) { + s.logFabricSession(FabricSessionEventLogEntry{ + Event: "fabric_session_event", + ClusterID: s.Local.ClusterID, + NodeID: s.Local.NodeID, + SessionEvent: event.Type, + StreamID: event.StreamID, + Sequence: event.Sequence, + TrafficClass: event.TrafficClass, + RemoteAddr: r.RemoteAddr, + ObservedAt: time.Now().UTC(), + }) + return nil, nil + }, + } + err = loop.RunWebSocket(r.Context(), conn, fabricproto.WebSocketTransportConfig{}) + if err != nil && !errors.Is(err, context.Canceled) { + s.logFabricSession(FabricSessionEventLogEntry{ + Event: "fabric_session_websocket_closed", + ClusterID: s.Local.ClusterID, + NodeID: s.Local.NodeID, + RemoteAddr: r.RemoteAddr, + Reason: err.Error(), + ObservedAt: time.Now().UTC(), + }) + return + } + s.logFabricSession(FabricSessionEventLogEntry{ + Event: "fabric_session_websocket_closed", + ClusterID: s.Local.ClusterID, + NodeID: s.Local.NodeID, + RemoteAddr: r.RemoteAddr, + ObservedAt: time.Now().UTC(), + }) +} + +func (s Server) logFabricSession(entry FabricSessionEventLogEntry) { + if s.FabricSessionLogger != nil { + s.FabricSessionLogger(entry) + } +} + func (s Server) handleRemoteWorkspaceAdapterSessionMailbox(w http.ResponseWriter, r *http.Request) { reader, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailbox) if !ok { diff --git a/agents/rap-node-agent/internal/mesh/server_test.go b/agents/rap-node-agent/internal/mesh/server_test.go index eb44e60..fa6d954 100644 --- a/agents/rap-node-agent/internal/mesh/server_test.go +++ b/agents/rap-node-agent/internal/mesh/server_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority" + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/gorilla/websocket" ) @@ -74,6 +75,80 @@ func TestMeshForwardingDisabled(t *testing.T) { } } +func TestFabricSessionWebSocketDisabledByDefault(t *testing.T) { + server := httptest.NewServer(Server{Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}}.Handler()) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/mesh/v1/fabric/session/ws" + _, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err == nil { + t.Fatal("dial fabric session unexpectedly succeeded") + } + if resp == nil || resp.StatusCode != http.StatusNotFound { + t.Fatalf("status = %v err=%v, want 404", resp, err) + } +} + +func TestFabricSessionWebSocketPingPongAndEvents(t *testing.T) { + var events []FabricSessionEventLogEntry + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + FabricSessionLogger: func(entry FabricSessionEventLogEntry) { + events = append(events, entry) + }, + }.Handler()) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/mesh/v1/fabric/session/ws" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial fabric session websocket: %v", err) + } + defer conn.Close() + + writeMeshFabricFrame(t, conn, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 17, Payload: []byte("probe")}) + pong := readMeshFabricFrame(t, conn) + if pong.Type != fabricproto.FramePong || pong.Sequence != 17 || string(pong.Payload) != "probe" { + t.Fatalf("pong = %+v", pong) + } + if len(events) < 2 || events[0].Event != "fabric_session_websocket_opened" || events[1].SessionEvent != fabricproto.SessionEventPing { + t.Fatalf("events = %+v", events) + } +} + +func TestFabricSessionWebSocketOpenStreamDataAck(t *testing.T) { + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + }.Handler()) + defer server.Close() + + wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/mesh/v1/fabric/session/ws" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial fabric session websocket: %v", err) + } + defer conn.Close() + + writeMeshFabricFrame(t, conn, fabricproto.Frame{ + Type: fabricproto.FrameOpenStream, + TrafficClass: fabricproto.TrafficClassInteractive, + StreamID: 9, + }) + writeMeshFabricFrame(t, conn, fabricproto.Frame{ + Type: fabricproto.FrameData, + TrafficClass: fabricproto.TrafficClassInteractive, + StreamID: 9, + Sequence: 3, + Payload: []byte("input"), + }) + ack := readMeshFabricFrame(t, conn) + if ack.Type != fabricproto.FrameAck || ack.StreamID != 9 || ack.Sequence != 3 { + t.Fatalf("ack = %+v", ack) + } +} + func TestMeshForwardingGateEnabledStillHasNoProductionRuntime(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-b"} server := httptest.NewServer(Server{ @@ -96,6 +171,36 @@ func TestMeshForwardingGateEnabledStillHasNoProductionRuntime(t *testing.T) { } } +func writeMeshFabricFrame(t *testing.T, conn *websocket.Conn, frame fabricproto.Frame) { + t.Helper() + encoded, err := fabricproto.MarshalFrame(frame) + if err != nil { + t.Fatalf("marshal fabric frame: %v", err) + } + if err := conn.WriteMessage(websocket.BinaryMessage, encoded); err != nil { + t.Fatalf("write fabric websocket frame: %v", err) + } +} + +func readMeshFabricFrame(t *testing.T, conn *websocket.Conn) fabricproto.Frame { + t.Helper() + if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { + t.Fatalf("set websocket read deadline: %v", err) + } + messageType, payload, err := conn.ReadMessage() + if err != nil { + t.Fatalf("read fabric websocket frame: %v", err) + } + if messageType != websocket.BinaryMessage { + t.Fatalf("message type = %d, want binary", messageType) + } + frame, err := fabricproto.UnmarshalFrame(payload, fabricproto.DefaultMaxPayload) + if err != nil { + t.Fatalf("unmarshal fabric websocket frame: %v", err) + } + return frame +} + func TestMeshForwardingGateDeliversFabricControlAtDestination(t *testing.T) { local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-c"} var events []ProductionForwardLogEntry diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index b04d340..412cada 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -256,8 +256,9 @@ Deliverables: ### Stage FNP-3: WebSocket/TCP Compatibility Transport -Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop and -WebSocket frame adapter in `agents/rap-node-agent/internal/fabricproto`. +Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop, +WebSocket frame adapter in `agents/rap-node-agent/internal/fabricproto`, and a +gated mesh smoke endpoint at `/mesh/v1/fabric/session/ws`. Deliverables: