From f82e6990f280b40ab34cdd470754afd2401c73cb Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 00:10:04 +0300 Subject: [PATCH] Add fabric binary transport loop --- .../internal/fabricproto/frame.go | 19 ++- .../internal/fabricproto/transport.go | 73 +++++++++++ .../internal/fabricproto/transport_test.go | 116 ++++++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 4 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 agents/rap-node-agent/internal/fabricproto/transport.go create mode 100644 agents/rap-node-agent/internal/fabricproto/transport_test.go diff --git a/agents/rap-node-agent/internal/fabricproto/frame.go b/agents/rap-node-agent/internal/fabricproto/frame.go index 76e234c..f00008e 100644 --- a/agents/rap-node-agent/internal/fabricproto/frame.go +++ b/agents/rap-node-agent/internal/fabricproto/frame.go @@ -79,14 +79,13 @@ func WriteFrame(w io.Writer, frame Frame) error { } header := make([]byte, HeaderSize) writeHeader(header, frame, uint32(len(frame.Payload))) - if _, err := w.Write(header); err != nil { + if err := writeFull(w, header); err != nil { return err } if len(frame.Payload) == 0 { return nil } - _, err := w.Write(frame.Payload) - return err + return writeFull(w, frame.Payload) } func ReadFrame(r io.Reader, maxPayload int) (Frame, error) { @@ -201,3 +200,17 @@ func parseHeader(header []byte, maxPayload int) (Frame, int, error) { } return frame, payloadLength, nil } + +func writeFull(w io.Writer, data []byte) error { + for len(data) > 0 { + n, err := w.Write(data) + if err != nil { + return err + } + if n <= 0 { + return io.ErrShortWrite + } + data = data[n:] + } + return nil +} diff --git a/agents/rap-node-agent/internal/fabricproto/transport.go b/agents/rap-node-agent/internal/fabricproto/transport.go new file mode 100644 index 0000000..e5544bb --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/transport.go @@ -0,0 +1,73 @@ +package fabricproto + +import ( + "context" + "errors" + "io" +) + +type EventHandler func(SessionEvent) ([]Frame, error) + +type TransportLoop struct { + Session *Session + MaxPayload int + OnEvent EventHandler +} + +func (l TransportLoop) Run(ctx context.Context, r io.Reader, w io.Writer) error { + if l.Session == nil { + return ErrSessionNotConfigured + } + maxPayload := l.MaxPayload + if maxPayload <= 0 { + maxPayload = DefaultMaxPayload + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + frame, err := ReadFrame(r, maxPayload) + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + if err := l.Handle(ctx, frame, w); err != nil { + return err + } + } +} + +func (l TransportLoop) Handle(ctx context.Context, frame Frame, w io.Writer) error { + if l.Session == nil { + return ErrSessionNotConfigured + } + event, responses, err := l.Session.HandleFrame(frame) + if err != nil { + return err + } + if l.OnEvent != nil && event.Type != SessionEventNone { + extra, err := l.OnEvent(event) + if err != nil { + return err + } + responses = append(responses, extra...) + } + for _, response := range responses { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := WriteFrame(w, response); err != nil { + return err + } + } + return nil +} + +var ErrSessionNotConfigured = errors.New("fabric transport loop session is not configured") diff --git a/agents/rap-node-agent/internal/fabricproto/transport_test.go b/agents/rap-node-agent/internal/fabricproto/transport_test.go new file mode 100644 index 0000000..31091de --- /dev/null +++ b/agents/rap-node-agent/internal/fabricproto/transport_test.go @@ -0,0 +1,116 @@ +package fabricproto + +import ( + "bytes" + "context" + "errors" + "io" + "testing" +) + +func TestTransportLoopRunsFramesAndWritesResponses(t *testing.T) { + var input bytes.Buffer + writeTestFrame(t, &input, Frame{Type: FrameOpenStream, TrafficClass: TrafficClassInteractive, StreamID: 7}) + writeTestFrame(t, &input, Frame{Type: FrameData, TrafficClass: TrafficClassInteractive, StreamID: 7, Sequence: 3, Payload: []byte("input")}) + writeTestFrame(t, &input, Frame{Type: FramePing, Sequence: 4, Payload: []byte("probe")}) + + var output bytes.Buffer + var events []SessionEventType + loop := TransportLoop{ + Session: NewSession(SessionConfig{}), + OnEvent: func(event SessionEvent) ([]Frame, error) { + events = append(events, event.Type) + return nil, nil + }, + } + + if err := loop.Run(context.Background(), &input, &output); err != nil { + t.Fatalf("run loop: %v", err) + } + if len(events) != 3 || events[0] != SessionEventStreamOpened || events[1] != SessionEventData || events[2] != SessionEventPing { + t.Fatalf("events = %+v", events) + } + ack := readTestFrame(t, &output) + if ack.Type != FrameAck || ack.StreamID != 7 || ack.Sequence != 3 { + t.Fatalf("ack = %+v", ack) + } + pong := readTestFrame(t, &output) + if pong.Type != FramePong || pong.Sequence != 4 || string(pong.Payload) != "probe" { + t.Fatalf("pong = %+v", pong) + } + if _, err := ReadFrame(&output, DefaultMaxPayload); !errors.Is(err, io.EOF) { + t.Fatalf("extra response err = %v, want EOF", err) + } +} + +func TestTransportLoopEventHandlerCanEmitFrames(t *testing.T) { + var output bytes.Buffer + loop := TransportLoop{ + Session: NewSession(SessionConfig{}), + OnEvent: func(event SessionEvent) ([]Frame, error) { + if event.Type != SessionEventHello { + return nil, nil + } + return []Frame{{Type: FrameSessionReady, Payload: []byte("ready")}}, nil + }, + } + + if err := loop.Handle(context.Background(), Frame{Type: FrameHello, Payload: []byte("hello")}, &output); err != nil { + t.Fatalf("handle hello: %v", err) + } + ready := readTestFrame(t, &output) + if ready.Type != FrameSessionReady || string(ready.Payload) != "ready" { + t.Fatalf("ready = %+v", ready) + } +} + +func TestTransportLoopPropagatesHandlerError(t *testing.T) { + wantErr := errors.New("handler failed") + loop := TransportLoop{ + Session: NewSession(SessionConfig{}), + OnEvent: func(SessionEvent) ([]Frame, error) { + return nil, wantErr + }, + } + + err := loop.Handle(context.Background(), Frame{Type: FrameHello}, io.Discard) + if !errors.Is(err, wantErr) { + t.Fatalf("err = %v, want %v", err, wantErr) + } +} + +func TestTransportLoopRequiresSession(t *testing.T) { + err := (TransportLoop{}).Handle(context.Background(), Frame{Type: FrameHello}, io.Discard) + if !errors.Is(err, ErrSessionNotConfigured) { + t.Fatalf("err = %v, want %v", err, ErrSessionNotConfigured) + } +} + +func TestWriteFrameDetectsShortWrite(t *testing.T) { + err := WriteFrame(shortWriter{}, Frame{Type: FrameHello}) + if !errors.Is(err, io.ErrShortWrite) { + t.Fatalf("err = %v, want %v", err, io.ErrShortWrite) + } +} + +type shortWriter struct{} + +func (shortWriter) Write([]byte) (int, error) { + return 0, nil +} + +func writeTestFrame(t *testing.T, w io.Writer, frame Frame) { + t.Helper() + if err := WriteFrame(w, frame); err != nil { + t.Fatalf("write frame %+v: %v", frame, err) + } +} + +func readTestFrame(t *testing.T, r io.Reader) Frame { + t.Helper() + frame, err := ReadFrame(r, DefaultMaxPayload) + if err != nil { + t.Fatalf("read frame: %v", err) + } + return frame +} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 2308f14..4eb7bd2 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -256,6 +256,9 @@ Deliverables: ### Stage FNP-3: WebSocket/TCP Compatibility Transport +Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop in +`agents/rap-node-agent/internal/fabricproto`. + Deliverables: - carry binary frames over one persistent WebSocket/TCP connection;