From 88db12617b484febe0331124cda71cbb2ea134f3 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 00:47:07 +0300 Subject: [PATCH] Add fabric session packet transport --- .../vpnruntime/fabric_session_transport.go | 109 +++++++++++++++ .../vpnruntime/fabric_transport_test.go | 130 ++++++++++++++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 3 files changed, 242 insertions(+) create mode 100644 agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go new file mode 100644 index 0000000..afaaad9 --- /dev/null +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -0,0 +1,109 @@ +package vpnruntime + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" +) + +type FabricSessionFrameSender interface { + Send(context.Context, fabricproto.Frame) error +} + +type FabricSessionFrameReceiver interface { + Frames() <-chan fabricproto.Frame + Errors() <-chan error +} + +type FabricSessionPacketTransport struct { + Sender FabricSessionFrameSender + Receiver FabricSessionFrameReceiver + Inbox *FabricPacketInbox + + StreamID uint64 + VPNConnectionID string + SendDirection string + ReceiveDirection string + TrafficClass string + + sequence uint64 +} + +func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { + packets = cleanPacketBatch(packets) + if len(packets) == 0 { + return nil + } + if t == nil || t.Sender == nil { + return mesh.ErrForwardRuntimeUnavailable + } + if t.StreamID == 0 || t.VPNConnectionID == "" { + return errors.New("fabric session packet transport identity is incomplete") + } + direction := t.SendDirection + if direction == "" { + direction = FabricDirectionGatewayToClient + } + frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ + StreamID: t.StreamID, + Sequence: atomic.AddUint64(&t.sequence, 1), + VPNConnectionID: t.VPNConnectionID, + Direction: direction, + TrafficClass: t.TrafficClass, + Packets: packets, + }) + if err != nil { + return err + } + return t.Sender.Send(ctx, frame) +} + +func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { + if t == nil || t.Inbox == nil { + return nil, mesh.ErrForwardRuntimeUnavailable + } + direction := t.ReceiveDirection + if direction == "" { + direction = FabricDirectionClientToGateway + } + return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout) +} + +func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) error { + if t == nil || t.Receiver == nil || t.Inbox == nil { + return mesh.ErrForwardRuntimeUnavailable + } + frames := t.Receiver.Frames() + errorsCh := t.Receiver.Errors() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err, ok := <-errorsCh: + if !ok { + errorsCh = nil + continue + } + if err != nil { + return err + } + case frame, ok := <-frames: + if !ok { + return nil + } + if frame.Type != fabricproto.FrameData { + continue + } + if t.StreamID != 0 && frame.StreamID != t.StreamID { + continue + } + if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil { + return err + } + } + } +} diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index f0b5897..cc8a6bb 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -123,6 +123,32 @@ type memoryPacketTransport struct { recv [][]byte } +type captureFabricSessionSender struct { + err error + frames []fabricproto.Frame +} + +func (s *captureFabricSessionSender) Send(_ context.Context, frame fabricproto.Frame) error { + if s.err != nil { + return s.err + } + s.frames = append(s.frames, frame) + return nil +} + +type memoryFabricSessionReceiver struct { + frames chan fabricproto.Frame + errors chan error +} + +func (r memoryFabricSessionReceiver) Frames() <-chan fabricproto.Frame { + return r.frames +} + +func (r memoryFabricSessionReceiver) Errors() <-chan error { + return r.errors +} + func (t *memoryPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error { if t.sendErr != nil { return t.sendErr @@ -172,6 +198,110 @@ func TestFabricPacketTransportSendsVPNPacketBatchEnvelope(t *testing.T) { } } +func TestFabricSessionPacketTransportSendsDataFrame(t *testing.T) { + sender := &captureFabricSessionSender{} + transport := &FabricSessionPacketTransport{ + Sender: sender, + StreamID: 700, + VPNConnectionID: "vpn-1", + SendDirection: FabricDirectionClientToGateway, + TrafficClass: FabricTrafficClassInteractive, + } + + if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send fabric session packet batch: %v", err) + } + if len(sender.frames) != 1 { + t.Fatalf("sent frames = %d, want 1", len(sender.frames)) + } + frame := sender.frames[0] + if frame.Type != fabricproto.FrameData || frame.StreamID != 700 || frame.Sequence != 1 || frame.TrafficClass != fabricproto.TrafficClassInteractive { + t.Fatalf("frame = %+v", frame) + } + payload, err := DecodeFabricVPNPacketDataFrame(frame) + if err != nil { + t.Fatalf("decode sent frame: %v", err) + } + if payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionClientToGateway || len(payload.Packets) != 1 || string(payload.Packets[0]) != "packet" { + t.Fatalf("payload = %+v", payload) + } +} + +func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) { + inbox := NewFabricPacketInbox(4) + receiver := memoryFabricSessionReceiver{ + frames: make(chan fabricproto.Frame, 2), + errors: make(chan error, 1), + } + transport := &FabricSessionPacketTransport{ + Receiver: receiver, + Inbox: inbox, + StreamID: 701, + VPNConnectionID: "vpn-1", + } + frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ + StreamID: 701, + Sequence: 1, + VPNConnectionID: "vpn-1", + Direction: FabricDirectionClientToGateway, + Packets: [][]byte{[]byte("packet")}, + }) + if err != nil { + t.Fatalf("new fabric vpn frame: %v", err) + } + receiver.frames <- fabricproto.Frame{Type: fabricproto.FrameAck, StreamID: 701, Sequence: 1} + receiver.frames <- frame + close(receiver.frames) + + if err := transport.RunFrameIngress(context.Background()); err != nil { + t.Fatalf("run frame ingress: %v", err) + } + packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) + if err != nil { + t.Fatalf("receive gateway packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "packet" { + t.Fatalf("packets = %#v", packets) + } +} + +func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) { + inbox := NewFabricPacketInbox(4) + receiver := memoryFabricSessionReceiver{ + frames: make(chan fabricproto.Frame, 1), + errors: make(chan error, 1), + } + transport := &FabricSessionPacketTransport{ + Receiver: receiver, + Inbox: inbox, + StreamID: 701, + VPNConnectionID: "vpn-1", + } + frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ + StreamID: 702, + Sequence: 1, + VPNConnectionID: "vpn-1", + Direction: FabricDirectionClientToGateway, + Packets: [][]byte{[]byte("packet")}, + }) + if err != nil { + t.Fatalf("new fabric vpn frame: %v", err) + } + receiver.frames <- frame + close(receiver.frames) + + if err := transport.RunFrameIngress(context.Background()); err != nil { + t.Fatalf("run frame ingress: %v", err) + } + packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), 10*time.Millisecond) + if err != nil { + t.Fatalf("receive gateway packet: %v", err) + } + if len(packets) != 0 { + t.Fatalf("packets = %#v, want none", packets) + } +} + func TestFabricPacketInboxReceivesMatchingDirection(t *testing.T) { inbox := NewFabricPacketInbox(4) envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 0976328..351922f 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -272,6 +272,9 @@ plus a pump mode with outbound/inbound queues for asynchronous stream traffic. Live smoke verifies two `PING`/`PONG` round trips on the same connection. `vpnruntime` has a binary VPN packet-batch mapper for `FrameData` payloads so packet delivery can move away from JSON production envelopes in a gated mode. +`FabricSessionPacketTransport` now adapts that mapper to the existing +`PacketTransport` interface and can demultiplex inbound DATA frames into the +VPN packet inbox by stream id. Deliverables: