From aa09c1025277e3cc0bb0f5023ec3651541438a20 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 00:40:59 +0300 Subject: [PATCH] Add async fabric session pump --- agents/rap-node-agent/internal/mesh/client.go | 135 ++++++++++++++++++ .../internal/mesh/client_test.go | 67 +++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 +- 3 files changed, 204 insertions(+), 1 deletion(-) diff --git a/agents/rap-node-agent/internal/mesh/client.go b/agents/rap-node-agent/internal/mesh/client.go index fadbcc9..0f17963 100644 --- a/agents/rap-node-agent/internal/mesh/client.go +++ b/agents/rap-node-agent/internal/mesh/client.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" @@ -31,6 +32,24 @@ type FabricSessionClient struct { conn *websocket.Conn timeout time.Duration maxPayload int + readMu sync.Mutex + writeMu sync.Mutex +} + +type FabricSessionPumpOptions struct { + OutboundBuffer int + InboundBuffer int + ErrorBuffer int +} + +type FabricSessionPump struct { + session *FabricSessionClient + outbound chan fabricproto.Frame + inbound chan fabricproto.Frame + errors chan error + done chan struct{} + cancel context.CancelFunc + closeMu sync.Once } func NewClient(baseURL string) Client { @@ -192,6 +211,8 @@ func (c *FabricSessionClient) WriteFrame(ctx context.Context, frame fabricproto. if err != nil { return err } + c.writeMu.Lock() + defer c.writeMu.Unlock() c.applyWriteDeadline(ctx) return c.conn.WriteMessage(websocket.BinaryMessage, payload) } @@ -200,6 +221,8 @@ func (c *FabricSessionClient) ReadFrame(ctx context.Context) (fabricproto.Frame, if c == nil || c.conn == nil { return fabricproto.Frame{}, fmt.Errorf("fabric session client is closed") } + c.readMu.Lock() + defer c.readMu.Unlock() c.applyReadDeadline(ctx) messageType, responsePayload, err := c.conn.ReadMessage() if err != nil { @@ -218,6 +241,118 @@ func (c *FabricSessionClient) RoundTrip(ctx context.Context, frame fabricproto.F return c.ReadFrame(ctx) } +func (c *FabricSessionClient) StartPump(ctx context.Context, opts FabricSessionPumpOptions) *FabricSessionPump { + if opts.OutboundBuffer <= 0 { + opts.OutboundBuffer = 64 + } + if opts.InboundBuffer <= 0 { + opts.InboundBuffer = 64 + } + if opts.ErrorBuffer <= 0 { + opts.ErrorBuffer = 8 + } + pumpCtx, cancel := context.WithCancel(ctx) + pump := &FabricSessionPump{ + session: c, + outbound: make(chan fabricproto.Frame, opts.OutboundBuffer), + inbound: make(chan fabricproto.Frame, opts.InboundBuffer), + errors: make(chan error, opts.ErrorBuffer), + done: make(chan struct{}), + cancel: cancel, + } + go pump.writeLoop(pumpCtx) + go pump.readLoop(pumpCtx) + return pump +} + +func (p *FabricSessionPump) Send(ctx context.Context, frame fabricproto.Frame) error { + if p == nil { + return fmt.Errorf("fabric session pump is nil") + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.done: + return fmt.Errorf("fabric session pump is closed") + case p.outbound <- frame: + return nil + } +} + +func (p *FabricSessionPump) Frames() <-chan fabricproto.Frame { + if p == nil { + return nil + } + return p.inbound +} + +func (p *FabricSessionPump) Errors() <-chan error { + if p == nil { + return nil + } + return p.errors +} + +func (p *FabricSessionPump) Close() error { + if p == nil { + return nil + } + var err error + p.closeMu.Do(func() { + close(p.done) + p.cancel() + err = p.session.Close() + }) + return err +} + +func (p *FabricSessionPump) writeLoop(ctx context.Context) { + defer p.Close() + for { + select { + case <-ctx.Done(): + p.reportError(ctx.Err()) + return + case <-p.done: + return + case frame := <-p.outbound: + if err := p.session.WriteFrame(ctx, frame); err != nil { + p.reportError(err) + return + } + } + } +} + +func (p *FabricSessionPump) readLoop(ctx context.Context) { + defer p.Close() + for { + frame, err := p.session.ReadFrame(ctx) + if err != nil { + p.reportError(err) + return + } + select { + case <-ctx.Done(): + p.reportError(ctx.Err()) + return + case <-p.done: + return + case p.inbound <- frame: + } + } +} + +func (p *FabricSessionPump) reportError(err error) { + if err == nil { + return + } + select { + case p.errors <- err: + default: + } +} + func (c *FabricSessionClient) applyReadDeadline(ctx context.Context) { if deadline, ok := ctx.Deadline(); ok { _ = c.conn.SetReadDeadline(deadline) diff --git a/agents/rap-node-agent/internal/mesh/client_test.go b/agents/rap-node-agent/internal/mesh/client_test.go index d7855d8..fdff9ba 100644 --- a/agents/rap-node-agent/internal/mesh/client_test.go +++ b/agents/rap-node-agent/internal/mesh/client_test.go @@ -133,6 +133,73 @@ func TestClientFabricSessionPersistentDataAcks(t *testing.T) { } } +func TestClientFabricSessionPumpMovesIndependentFrames(t *testing.T) { + server := httptest.NewServer(Server{ + Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, + FabricSessionEnabled: true, + }.Handler()) + defer server.Close() + + client := NewClient(server.URL) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + session, _, err := client.OpenFabricSession(ctx, FabricSessionDialOptions{ + Token: "rap_fsn_pump", + Timeout: time.Second, + }) + if err != nil { + t.Fatalf("open fabric session: %v", err) + } + pump := session.StartPump(ctx, FabricSessionPumpOptions{ + OutboundBuffer: 4, + InboundBuffer: 4, + ErrorBuffer: 4, + }) + defer pump.Close() + + if err := pump.Send(ctx, fabricproto.Frame{ + Type: fabricproto.FrameOpenStream, + StreamID: 900, + TrafficClass: fabricproto.TrafficClassBulk, + }); err != nil { + t.Fatalf("send open bulk stream: %v", err) + } + if err := pump.Send(ctx, fabricproto.Frame{ + Type: fabricproto.FrameData, + StreamID: 900, + Sequence: 31, + TrafficClass: fabricproto.TrafficClassBulk, + Payload: []byte("bulk payload"), + }); err != nil { + t.Fatalf("send bulk data: %v", err) + } + if err := pump.Send(ctx, fabricproto.Frame{ + Type: fabricproto.FramePing, + Sequence: 32, + Payload: []byte("control ping"), + }); err != nil { + t.Fatalf("send ping: %v", err) + } + + gotAck := false + gotPong := false + for !gotAck || !gotPong { + select { + case frame := <-pump.Frames(): + switch { + case frame.Type == fabricproto.FrameAck && frame.StreamID == 900 && frame.Sequence == 31: + gotAck = true + case frame.Type == fabricproto.FramePong && frame.Sequence == 32 && string(frame.Payload) == "control ping": + gotPong = true + } + case err := <-pump.Errors(): + t.Fatalf("pump error: %v", err) + case <-ctx.Done(): + t.Fatalf("timed out waiting for pump frames: ack=%v pong=%v", gotAck, gotPong) + } + } +} + func TestClientFabricSessionReportsRejectedStatus(t *testing.T) { server := httptest.NewServer(Server{ Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index aa32e0a..0f38c02 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -268,7 +268,8 @@ heartbeat metadata. `mesh-live-smoke` includes a fabric-session `PING`/`PONG` check alongside the existing route and test-service probes. Mesh client code now has a reusable `FabricSessionClient` for multiple frame exchanges over one WebSocket session, -and live smoke verifies two `PING`/`PONG` round trips on the same connection. +plus a pump mode with outbound/inbound queues for asynchronous stream traffic. +Live smoke verifies two `PING`/`PONG` round trips on the same connection. Deliverables: