Add fabric websocket transport adapter

This commit is contained in:
2026-05-16 00:12:42 +03:00
parent f82e6990f2
commit 01f28693f5
4 changed files with 244 additions and 13 deletions
@@ -43,20 +43,10 @@ func (l TransportLoop) Run(ctx context.Context, r io.Reader, w io.Writer) error
} }
func (l TransportLoop) Handle(ctx context.Context, frame Frame, w io.Writer) error { func (l TransportLoop) Handle(ctx context.Context, frame Frame, w io.Writer) error {
if l.Session == nil { responses, err := l.HandleFrame(ctx, frame)
return ErrSessionNotConfigured
}
event, responses, err := l.Session.HandleFrame(frame)
if err != nil { if err != nil {
return err return err
} }
if l.OnEvent != nil && event.Type != SessionEventNone {
extra, err := l.OnEvent(event)
if err != nil {
return err
}
responses = append(responses, extra...)
}
for _, response := range responses { for _, response := range responses {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -70,4 +60,27 @@ func (l TransportLoop) Handle(ctx context.Context, frame Frame, w io.Writer) err
return nil return nil
} }
func (l TransportLoop) HandleFrame(ctx context.Context, frame Frame) ([]Frame, error) {
if l.Session == nil {
return nil, ErrSessionNotConfigured
}
event, responses, err := l.Session.HandleFrame(frame)
if err != nil {
return nil, err
}
if l.OnEvent != nil && event.Type != SessionEventNone {
extra, err := l.OnEvent(event)
if err != nil {
return nil, err
}
responses = append(responses, extra...)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return responses, nil
}
var ErrSessionNotConfigured = errors.New("fabric transport loop session is not configured") var ErrSessionNotConfigured = errors.New("fabric transport loop session is not configured")
@@ -0,0 +1,67 @@
package fabricproto
import (
"context"
"time"
"github.com/gorilla/websocket"
)
type WebSocketTransportConfig struct {
MaxPayload int
WriteTimeout time.Duration
}
func (l TransportLoop) RunWebSocket(ctx context.Context, conn *websocket.Conn, cfg WebSocketTransportConfig) error {
if l.Session == nil {
return ErrSessionNotConfigured
}
maxPayload := cfg.MaxPayload
if maxPayload <= 0 {
maxPayload = DefaultMaxPayload
}
writeTimeout := cfg.WriteTimeout
if writeTimeout <= 0 {
writeTimeout = 5 * time.Second
}
conn.SetReadLimit(int64(HeaderSize + maxPayload))
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
messageType, payload, err := conn.ReadMessage()
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
return nil
}
if err != nil {
return err
}
if messageType != websocket.BinaryMessage {
continue
}
frame, err := UnmarshalFrame(payload, maxPayload)
if err != nil {
return err
}
responses, err := l.HandleFrame(ctx, frame)
if err != nil {
return err
}
for _, response := range responses {
encoded, err := MarshalFrame(response)
if err != nil {
return err
}
if err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil {
return err
}
if err := conn.WriteMessage(websocket.BinaryMessage, encoded); err != nil {
return err
}
}
}
}
@@ -0,0 +1,151 @@
package fabricproto
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gorilla/websocket"
)
func TestRunWebSocketHandlesBinaryFrames(t *testing.T) {
events := make(chan SessionEventType, 4)
server := newFabricWebSocketTestServer(t, TransportLoop{
Session: NewSession(SessionConfig{}),
OnEvent: func(event SessionEvent) ([]Frame, error) {
events <- event.Type
return nil, nil
},
})
defer server.Close()
conn := dialFabricWebSocket(t, server.URL)
defer conn.Close()
writeWebSocketFrame(t, conn, Frame{Type: FrameOpenStream, TrafficClass: TrafficClassInteractive, StreamID: 7})
writeWebSocketFrame(t, conn, Frame{Type: FrameData, TrafficClass: TrafficClassInteractive, StreamID: 7, Sequence: 9, Payload: []byte("hello")})
if got := <-events; got != SessionEventStreamOpened {
t.Fatalf("first event = %s, want stream_opened", got)
}
if got := <-events; got != SessionEventData {
t.Fatalf("second event = %s, want data", got)
}
ack := readWebSocketFrame(t, conn)
if ack.Type != FrameAck || ack.StreamID != 7 || ack.Sequence != 9 {
t.Fatalf("ack = %+v", ack)
}
}
func TestRunWebSocketPongAndHandlerResponse(t *testing.T) {
server := newFabricWebSocketTestServer(t, TransportLoop{
Session: NewSession(SessionConfig{}),
OnEvent: func(event SessionEvent) ([]Frame, error) {
if event.Type != SessionEventPing {
return nil, nil
}
return []Frame{{Type: FrameSessionReady, Payload: []byte("ready")}}, nil
},
})
defer server.Close()
conn := dialFabricWebSocket(t, server.URL)
defer conn.Close()
writeWebSocketFrame(t, conn, Frame{Type: FramePing, Sequence: 4, Payload: []byte("probe")})
pong := readWebSocketFrame(t, conn)
if pong.Type != FramePong || pong.Sequence != 4 || string(pong.Payload) != "probe" {
t.Fatalf("pong = %+v", pong)
}
ready := readWebSocketFrame(t, conn)
if ready.Type != FrameSessionReady || string(ready.Payload) != "ready" {
t.Fatalf("ready = %+v", ready)
}
}
func TestRunWebSocketIgnoresTextMessages(t *testing.T) {
server := newFabricWebSocketTestServer(t, TransportLoop{Session: NewSession(SessionConfig{})})
defer server.Close()
conn := dialFabricWebSocket(t, server.URL)
defer conn.Close()
if err := conn.WriteMessage(websocket.TextMessage, []byte("ignore me")); err != nil {
t.Fatalf("write text: %v", err)
}
writeWebSocketFrame(t, conn, Frame{Type: FramePing, Sequence: 1})
pong := readWebSocketFrame(t, conn)
if pong.Type != FramePong || pong.Sequence != 1 {
t.Fatalf("pong = %+v", pong)
}
}
func newFabricWebSocketTestServer(t *testing.T, loop TransportLoop) *httptest.Server {
t.Helper()
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
_ = loop.RunWebSocket(r.Context(), conn, WebSocketTransportConfig{WriteTimeout: time.Second})
}))
return server
}
func dialFabricWebSocket(t *testing.T, httpURL string) *websocket.Conn {
t.Helper()
wsURL := "ws" + httpURL[len("http"):]
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial websocket: %v", err)
}
return conn
}
func writeWebSocketFrame(t *testing.T, conn *websocket.Conn, frame Frame) {
t.Helper()
encoded, err := MarshalFrame(frame)
if err != nil {
t.Fatalf("marshal frame: %v", err)
}
if err := conn.WriteMessage(websocket.BinaryMessage, encoded); err != nil {
t.Fatalf("write websocket frame: %v", err)
}
}
func readWebSocketFrame(t *testing.T, conn *websocket.Conn) Frame {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
type result struct {
messageType int
payload []byte
err error
}
ch := make(chan result, 1)
go func() {
messageType, payload, err := conn.ReadMessage()
ch <- result{messageType: messageType, payload: payload, err: err}
}()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for websocket frame")
case got := <-ch:
if got.err != nil {
t.Fatalf("read websocket frame: %v", got.err)
}
if got.messageType != websocket.BinaryMessage {
t.Fatalf("message type = %d, want binary", got.messageType)
}
frame, err := UnmarshalFrame(got.payload, DefaultMaxPayload)
if err != nil {
t.Fatalf("unmarshal websocket frame: %v", err)
}
return frame
}
return Frame{}
}
@@ -256,8 +256,8 @@ Deliverables:
### Stage FNP-3: WebSocket/TCP Compatibility Transport ### Stage FNP-3: WebSocket/TCP Compatibility Transport
Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop in Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop and
`agents/rap-node-agent/internal/fabricproto`. WebSocket frame adapter in `agents/rap-node-agent/internal/fabricproto`.
Deliverables: Deliverables: