Add async fabric session pump

This commit is contained in:
2026-05-16 00:40:59 +03:00
parent ce6b9beb6b
commit aa09c10252
3 changed files with 204 additions and 1 deletions
@@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
@@ -31,6 +32,24 @@ type FabricSessionClient struct {
conn *websocket.Conn
timeout time.Duration
maxPayload int
readMu sync.Mutex
writeMu sync.Mutex
}
type FabricSessionPumpOptions struct {
OutboundBuffer int
InboundBuffer int
ErrorBuffer int
}
type FabricSessionPump struct {
session *FabricSessionClient
outbound chan fabricproto.Frame
inbound chan fabricproto.Frame
errors chan error
done chan struct{}
cancel context.CancelFunc
closeMu sync.Once
}
func NewClient(baseURL string) Client {
@@ -192,6 +211,8 @@ func (c *FabricSessionClient) WriteFrame(ctx context.Context, frame fabricproto.
if err != nil {
return err
}
c.writeMu.Lock()
defer c.writeMu.Unlock()
c.applyWriteDeadline(ctx)
return c.conn.WriteMessage(websocket.BinaryMessage, payload)
}
@@ -200,6 +221,8 @@ func (c *FabricSessionClient) ReadFrame(ctx context.Context) (fabricproto.Frame,
if c == nil || c.conn == nil {
return fabricproto.Frame{}, fmt.Errorf("fabric session client is closed")
}
c.readMu.Lock()
defer c.readMu.Unlock()
c.applyReadDeadline(ctx)
messageType, responsePayload, err := c.conn.ReadMessage()
if err != nil {
@@ -218,6 +241,118 @@ func (c *FabricSessionClient) RoundTrip(ctx context.Context, frame fabricproto.F
return c.ReadFrame(ctx)
}
func (c *FabricSessionClient) StartPump(ctx context.Context, opts FabricSessionPumpOptions) *FabricSessionPump {
if opts.OutboundBuffer <= 0 {
opts.OutboundBuffer = 64
}
if opts.InboundBuffer <= 0 {
opts.InboundBuffer = 64
}
if opts.ErrorBuffer <= 0 {
opts.ErrorBuffer = 8
}
pumpCtx, cancel := context.WithCancel(ctx)
pump := &FabricSessionPump{
session: c,
outbound: make(chan fabricproto.Frame, opts.OutboundBuffer),
inbound: make(chan fabricproto.Frame, opts.InboundBuffer),
errors: make(chan error, opts.ErrorBuffer),
done: make(chan struct{}),
cancel: cancel,
}
go pump.writeLoop(pumpCtx)
go pump.readLoop(pumpCtx)
return pump
}
func (p *FabricSessionPump) Send(ctx context.Context, frame fabricproto.Frame) error {
if p == nil {
return fmt.Errorf("fabric session pump is nil")
}
select {
case <-ctx.Done():
return ctx.Err()
case <-p.done:
return fmt.Errorf("fabric session pump is closed")
case p.outbound <- frame:
return nil
}
}
func (p *FabricSessionPump) Frames() <-chan fabricproto.Frame {
if p == nil {
return nil
}
return p.inbound
}
func (p *FabricSessionPump) Errors() <-chan error {
if p == nil {
return nil
}
return p.errors
}
func (p *FabricSessionPump) Close() error {
if p == nil {
return nil
}
var err error
p.closeMu.Do(func() {
close(p.done)
p.cancel()
err = p.session.Close()
})
return err
}
func (p *FabricSessionPump) writeLoop(ctx context.Context) {
defer p.Close()
for {
select {
case <-ctx.Done():
p.reportError(ctx.Err())
return
case <-p.done:
return
case frame := <-p.outbound:
if err := p.session.WriteFrame(ctx, frame); err != nil {
p.reportError(err)
return
}
}
}
}
func (p *FabricSessionPump) readLoop(ctx context.Context) {
defer p.Close()
for {
frame, err := p.session.ReadFrame(ctx)
if err != nil {
p.reportError(err)
return
}
select {
case <-ctx.Done():
p.reportError(ctx.Err())
return
case <-p.done:
return
case p.inbound <- frame:
}
}
}
func (p *FabricSessionPump) reportError(err error) {
if err == nil {
return
}
select {
case p.errors <- err:
default:
}
}
func (c *FabricSessionClient) applyReadDeadline(ctx context.Context) {
if deadline, ok := ctx.Deadline(); ok {
_ = c.conn.SetReadDeadline(deadline)
@@ -133,6 +133,73 @@ func TestClientFabricSessionPersistentDataAcks(t *testing.T) {
}
}
func TestClientFabricSessionPumpMovesIndependentFrames(t *testing.T) {
server := httptest.NewServer(Server{
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
FabricSessionEnabled: true,
}.Handler())
defer server.Close()
client := NewClient(server.URL)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
session, _, err := client.OpenFabricSession(ctx, FabricSessionDialOptions{
Token: "rap_fsn_pump",
Timeout: time.Second,
})
if err != nil {
t.Fatalf("open fabric session: %v", err)
}
pump := session.StartPump(ctx, FabricSessionPumpOptions{
OutboundBuffer: 4,
InboundBuffer: 4,
ErrorBuffer: 4,
})
defer pump.Close()
if err := pump.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameOpenStream,
StreamID: 900,
TrafficClass: fabricproto.TrafficClassBulk,
}); err != nil {
t.Fatalf("send open bulk stream: %v", err)
}
if err := pump.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameData,
StreamID: 900,
Sequence: 31,
TrafficClass: fabricproto.TrafficClassBulk,
Payload: []byte("bulk payload"),
}); err != nil {
t.Fatalf("send bulk data: %v", err)
}
if err := pump.Send(ctx, fabricproto.Frame{
Type: fabricproto.FramePing,
Sequence: 32,
Payload: []byte("control ping"),
}); err != nil {
t.Fatalf("send ping: %v", err)
}
gotAck := false
gotPong := false
for !gotAck || !gotPong {
select {
case frame := <-pump.Frames():
switch {
case frame.Type == fabricproto.FrameAck && frame.StreamID == 900 && frame.Sequence == 31:
gotAck = true
case frame.Type == fabricproto.FramePong && frame.Sequence == 32 && string(frame.Payload) == "control ping":
gotPong = true
}
case err := <-pump.Errors():
t.Fatalf("pump error: %v", err)
case <-ctx.Done():
t.Fatalf("timed out waiting for pump frames: ack=%v pong=%v", gotAck, gotPong)
}
}
}
func TestClientFabricSessionReportsRejectedStatus(t *testing.T) {
server := httptest.NewServer(Server{
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},