Add fabric binary transport loop
This commit is contained in:
@@ -79,14 +79,13 @@ func WriteFrame(w io.Writer, frame Frame) error {
|
||||
}
|
||||
header := make([]byte, HeaderSize)
|
||||
writeHeader(header, frame, uint32(len(frame.Payload)))
|
||||
if _, err := w.Write(header); err != nil {
|
||||
if err := writeFull(w, header); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(frame.Payload) == 0 {
|
||||
return nil
|
||||
}
|
||||
_, err := w.Write(frame.Payload)
|
||||
return err
|
||||
return writeFull(w, frame.Payload)
|
||||
}
|
||||
|
||||
func ReadFrame(r io.Reader, maxPayload int) (Frame, error) {
|
||||
@@ -201,3 +200,17 @@ func parseHeader(header []byte, maxPayload int) (Frame, int, error) {
|
||||
}
|
||||
return frame, payloadLength, nil
|
||||
}
|
||||
|
||||
func writeFull(w io.Writer, data []byte) error {
|
||||
for len(data) > 0 {
|
||||
n, err := w.Write(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n <= 0 {
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
data = data[n:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
package fabricproto
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
type EventHandler func(SessionEvent) ([]Frame, error)
|
||||
|
||||
type TransportLoop struct {
|
||||
Session *Session
|
||||
MaxPayload int
|
||||
OnEvent EventHandler
|
||||
}
|
||||
|
||||
func (l TransportLoop) Run(ctx context.Context, r io.Reader, w io.Writer) error {
|
||||
if l.Session == nil {
|
||||
return ErrSessionNotConfigured
|
||||
}
|
||||
maxPayload := l.MaxPayload
|
||||
if maxPayload <= 0 {
|
||||
maxPayload = DefaultMaxPayload
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
frame, err := ReadFrame(r, maxPayload)
|
||||
if errors.Is(err, io.EOF) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := l.Handle(ctx, frame, w); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l TransportLoop) Handle(ctx context.Context, frame Frame, w io.Writer) error {
|
||||
if l.Session == nil {
|
||||
return ErrSessionNotConfigured
|
||||
}
|
||||
event, responses, err := l.Session.HandleFrame(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if l.OnEvent != nil && event.Type != SessionEventNone {
|
||||
extra, err := l.OnEvent(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
responses = append(responses, extra...)
|
||||
}
|
||||
for _, response := range responses {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
if err := WriteFrame(w, response); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var ErrSessionNotConfigured = errors.New("fabric transport loop session is not configured")
|
||||
@@ -0,0 +1,116 @@
|
||||
package fabricproto
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestTransportLoopRunsFramesAndWritesResponses(t *testing.T) {
|
||||
var input bytes.Buffer
|
||||
writeTestFrame(t, &input, Frame{Type: FrameOpenStream, TrafficClass: TrafficClassInteractive, StreamID: 7})
|
||||
writeTestFrame(t, &input, Frame{Type: FrameData, TrafficClass: TrafficClassInteractive, StreamID: 7, Sequence: 3, Payload: []byte("input")})
|
||||
writeTestFrame(t, &input, Frame{Type: FramePing, Sequence: 4, Payload: []byte("probe")})
|
||||
|
||||
var output bytes.Buffer
|
||||
var events []SessionEventType
|
||||
loop := TransportLoop{
|
||||
Session: NewSession(SessionConfig{}),
|
||||
OnEvent: func(event SessionEvent) ([]Frame, error) {
|
||||
events = append(events, event.Type)
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
|
||||
if err := loop.Run(context.Background(), &input, &output); err != nil {
|
||||
t.Fatalf("run loop: %v", err)
|
||||
}
|
||||
if len(events) != 3 || events[0] != SessionEventStreamOpened || events[1] != SessionEventData || events[2] != SessionEventPing {
|
||||
t.Fatalf("events = %+v", events)
|
||||
}
|
||||
ack := readTestFrame(t, &output)
|
||||
if ack.Type != FrameAck || ack.StreamID != 7 || ack.Sequence != 3 {
|
||||
t.Fatalf("ack = %+v", ack)
|
||||
}
|
||||
pong := readTestFrame(t, &output)
|
||||
if pong.Type != FramePong || pong.Sequence != 4 || string(pong.Payload) != "probe" {
|
||||
t.Fatalf("pong = %+v", pong)
|
||||
}
|
||||
if _, err := ReadFrame(&output, DefaultMaxPayload); !errors.Is(err, io.EOF) {
|
||||
t.Fatalf("extra response err = %v, want EOF", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportLoopEventHandlerCanEmitFrames(t *testing.T) {
|
||||
var output bytes.Buffer
|
||||
loop := TransportLoop{
|
||||
Session: NewSession(SessionConfig{}),
|
||||
OnEvent: func(event SessionEvent) ([]Frame, error) {
|
||||
if event.Type != SessionEventHello {
|
||||
return nil, nil
|
||||
}
|
||||
return []Frame{{Type: FrameSessionReady, Payload: []byte("ready")}}, nil
|
||||
},
|
||||
}
|
||||
|
||||
if err := loop.Handle(context.Background(), Frame{Type: FrameHello, Payload: []byte("hello")}, &output); err != nil {
|
||||
t.Fatalf("handle hello: %v", err)
|
||||
}
|
||||
ready := readTestFrame(t, &output)
|
||||
if ready.Type != FrameSessionReady || string(ready.Payload) != "ready" {
|
||||
t.Fatalf("ready = %+v", ready)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportLoopPropagatesHandlerError(t *testing.T) {
|
||||
wantErr := errors.New("handler failed")
|
||||
loop := TransportLoop{
|
||||
Session: NewSession(SessionConfig{}),
|
||||
OnEvent: func(SessionEvent) ([]Frame, error) {
|
||||
return nil, wantErr
|
||||
},
|
||||
}
|
||||
|
||||
err := loop.Handle(context.Background(), Frame{Type: FrameHello}, io.Discard)
|
||||
if !errors.Is(err, wantErr) {
|
||||
t.Fatalf("err = %v, want %v", err, wantErr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportLoopRequiresSession(t *testing.T) {
|
||||
err := (TransportLoop{}).Handle(context.Background(), Frame{Type: FrameHello}, io.Discard)
|
||||
if !errors.Is(err, ErrSessionNotConfigured) {
|
||||
t.Fatalf("err = %v, want %v", err, ErrSessionNotConfigured)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteFrameDetectsShortWrite(t *testing.T) {
|
||||
err := WriteFrame(shortWriter{}, Frame{Type: FrameHello})
|
||||
if !errors.Is(err, io.ErrShortWrite) {
|
||||
t.Fatalf("err = %v, want %v", err, io.ErrShortWrite)
|
||||
}
|
||||
}
|
||||
|
||||
type shortWriter struct{}
|
||||
|
||||
func (shortWriter) Write([]byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func writeTestFrame(t *testing.T, w io.Writer, frame Frame) {
|
||||
t.Helper()
|
||||
if err := WriteFrame(w, frame); err != nil {
|
||||
t.Fatalf("write frame %+v: %v", frame, err)
|
||||
}
|
||||
}
|
||||
|
||||
func readTestFrame(t *testing.T, r io.Reader) Frame {
|
||||
t.Helper()
|
||||
frame, err := ReadFrame(r, DefaultMaxPayload)
|
||||
if err != nil {
|
||||
t.Fatalf("read frame: %v", err)
|
||||
}
|
||||
return frame
|
||||
}
|
||||
@@ -256,6 +256,9 @@ Deliverables:
|
||||
|
||||
### Stage FNP-3: WebSocket/TCP Compatibility Transport
|
||||
|
||||
Status: started with a transport-neutral `io.Reader`/`io.Writer` frame loop in
|
||||
`agents/rap-node-agent/internal/fabricproto`.
|
||||
|
||||
Deliverables:
|
||||
|
||||
- carry binary frames over one persistent WebSocket/TCP connection;
|
||||
|
||||
Reference in New Issue
Block a user