diff --git a/agents/rap-node-agent/internal/fabricproto/transport.go b/agents/rap-node-agent/internal/fabricproto/transport.go index e5544bb..03d5d37 100644 --- a/agents/rap-node-agent/internal/fabricproto/transport.go +++ b/agents/rap-node-agent/internal/fabricproto/transport.go @@ -43,20 +43,10 @@ func (l TransportLoop) Run(ctx context.Context, r io.Reader, w io.Writer) error } func (l TransportLoop) Handle(ctx context.Context, frame Frame, w io.Writer) error { - if l.Session == nil { - return ErrSessionNotConfigured - } - event, responses, err := l.Session.HandleFrame(frame) + responses, err := l.HandleFrame(ctx, frame) if err != nil { return err } - if l.OnEvent != nil && event.Type != SessionEventNone { - extra, err := l.OnEvent(event) - if err != nil { - return err - } - responses = append(responses, extra...) - } for _, response := range responses { select { case <-ctx.Done(): @@ -70,4 +60,27 @@ func (l TransportLoop) Handle(ctx context.Context, frame Frame, w io.Writer) err return nil } +func (l TransportLoop) HandleFrame(ctx context.Context, frame Frame) ([]Frame, error) { + if l.Session == nil { + return nil, ErrSessionNotConfigured + } + event, responses, err := l.Session.HandleFrame(frame) + if err != nil { + return nil, err + } + if l.OnEvent != nil && event.Type != SessionEventNone { + extra, err := l.OnEvent(event) + if err != nil { + return nil, err + } + responses = append(responses, extra...) + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + return responses, nil +} + var ErrSessionNotConfigured = errors.New("fabric transport loop session is not configured") diff --git a/agents/rap-node-agent/internal/fabricproto/websocket.go b/agents/rap-node-agent/internal/fabricproto/websocket.go new file mode 100644 index 0000000..23ffc4c --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/websocket.go @@ -0,0 +1,67 @@ +package fabricproto + +import ( + "context" + "time" + + "github.com/gorilla/websocket" +) + +type WebSocketTransportConfig struct { + MaxPayload int + WriteTimeout time.Duration +} + +func (l TransportLoop) RunWebSocket(ctx context.Context, conn *websocket.Conn, cfg WebSocketTransportConfig) error { + if l.Session == nil { + return ErrSessionNotConfigured + } + maxPayload := cfg.MaxPayload + if maxPayload <= 0 { + maxPayload = DefaultMaxPayload + } + writeTimeout := cfg.WriteTimeout + if writeTimeout <= 0 { + writeTimeout = 5 * time.Second + } + conn.SetReadLimit(int64(HeaderSize + maxPayload)) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + messageType, payload, err := conn.ReadMessage() + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + return nil + } + if err != nil { + return err + } + if messageType != websocket.BinaryMessage { + continue + } + frame, err := UnmarshalFrame(payload, maxPayload) + if err != nil { + return err + } + responses, err := l.HandleFrame(ctx, frame) + if err != nil { + return err + } + for _, response := range responses { + encoded, err := MarshalFrame(response) + if err != nil { + return err + } + if err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil { + return err + } + if err := conn.WriteMessage(websocket.BinaryMessage, encoded); err != nil { + return err + } + } + } +} diff --git a/agents/rap-node-agent/internal/fabricproto/websocket_test.go b/agents/rap-node-agent/internal/fabricproto/websocket_test.go new file mode 100644 index 0000000..4a4457c --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/websocket_test.go @@ -0,0 +1,151 @@ +package fabricproto + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gorilla/websocket" +) + +func TestRunWebSocketHandlesBinaryFrames(t *testing.T) { + events := make(chan SessionEventType, 4) + server := newFabricWebSocketTestServer(t, TransportLoop{ + Session: NewSession(SessionConfig{}), + OnEvent: func(event SessionEvent) ([]Frame, error) { + events <- event.Type + return nil, nil + }, + }) + defer server.Close() + + conn := dialFabricWebSocket(t, server.URL) + defer conn.Close() + + writeWebSocketFrame(t, conn, Frame{Type: FrameOpenStream, TrafficClass: TrafficClassInteractive, StreamID: 7}) + writeWebSocketFrame(t, conn, Frame{Type: FrameData, TrafficClass: TrafficClassInteractive, StreamID: 7, Sequence: 9, Payload: []byte("hello")}) + + if got := <-events; got != SessionEventStreamOpened { + t.Fatalf("first event = %s, want stream_opened", got) + } + if got := <-events; got != SessionEventData { + t.Fatalf("second event = %s, want data", got) + } + ack := readWebSocketFrame(t, conn) + if ack.Type != FrameAck || ack.StreamID != 7 || ack.Sequence != 9 { + t.Fatalf("ack = %+v", ack) + } +} + +func TestRunWebSocketPongAndHandlerResponse(t *testing.T) { + server := newFabricWebSocketTestServer(t, TransportLoop{ + Session: NewSession(SessionConfig{}), + OnEvent: func(event SessionEvent) ([]Frame, error) { + if event.Type != SessionEventPing { + return nil, nil + } + return []Frame{{Type: FrameSessionReady, Payload: []byte("ready")}}, nil + }, + }) + defer server.Close() + + conn := dialFabricWebSocket(t, server.URL) + defer conn.Close() + + writeWebSocketFrame(t, conn, Frame{Type: FramePing, Sequence: 4, Payload: []byte("probe")}) + pong := readWebSocketFrame(t, conn) + if pong.Type != FramePong || pong.Sequence != 4 || string(pong.Payload) != "probe" { + t.Fatalf("pong = %+v", pong) + } + ready := readWebSocketFrame(t, conn) + if ready.Type != FrameSessionReady || string(ready.Payload) != "ready" { + t.Fatalf("ready = %+v", ready) + } +} + +func TestRunWebSocketIgnoresTextMessages(t *testing.T) { + server := newFabricWebSocketTestServer(t, TransportLoop{Session: NewSession(SessionConfig{})}) + defer server.Close() + + conn := dialFabricWebSocket(t, server.URL) + defer conn.Close() + + if err := conn.WriteMessage(websocket.TextMessage, []byte("ignore me")); err != nil { + t.Fatalf("write text: %v", err) + } + writeWebSocketFrame(t, conn, Frame{Type: FramePing, Sequence: 1}) + pong := readWebSocketFrame(t, conn) + if pong.Type != FramePong || pong.Sequence != 1 { + t.Fatalf("pong = %+v", pong) + } +} + +func newFabricWebSocketTestServer(t *testing.T, loop TransportLoop) *httptest.Server { + t.Helper() + upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + _ = loop.RunWebSocket(r.Context(), conn, WebSocketTransportConfig{WriteTimeout: time.Second}) + })) + return server +} + +func dialFabricWebSocket(t *testing.T, httpURL string) *websocket.Conn { + t.Helper() + wsURL := "ws" + httpURL[len("http"):] + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial websocket: %v", err) + } + return conn +} + +func writeWebSocketFrame(t *testing.T, conn *websocket.Conn, frame Frame) { + t.Helper() + encoded, err := MarshalFrame(frame) + if err != nil { + t.Fatalf("marshal frame: %v", err) + } + if err := conn.WriteMessage(websocket.BinaryMessage, encoded); err != nil { + t.Fatalf("write websocket frame: %v", err) + } +} + +func readWebSocketFrame(t *testing.T, conn *websocket.Conn) Frame { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + type result struct { + messageType int + payload []byte + err error + } + ch := make(chan result, 1) + go func() { + messageType, payload, err := conn.ReadMessage() + ch <- result{messageType: messageType, payload: payload, err: err} + }() + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for websocket frame") + case got := <-ch: + if got.err != nil { + t.Fatalf("read websocket frame: %v", got.err) + } + if got.messageType != websocket.BinaryMessage { + t.Fatalf("message type = %d, want binary", got.messageType) + } + frame, err := UnmarshalFrame(got.payload, DefaultMaxPayload) + if err != nil { + t.Fatalf("unmarshal websocket frame: %v", err) + } + return frame + } + return Frame{} +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 4eb7bd2..b04d340 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -256,8 +256,8 @@ Deliverables: ### Stage FNP-3: WebSocket/TCP Compatibility Transport -Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop in -`agents/rap-node-agent/internal/fabricproto`. +Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop and +WebSocket frame adapter in `agents/rap-node-agent/internal/fabricproto`. Deliverables: