Add fabric session packet transport
This commit is contained in:
@@ -0,0 +1,109 @@
|
|||||||
|
package vpnruntime
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
||||||
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FabricSessionFrameSender interface {
|
||||||
|
Send(context.Context, fabricproto.Frame) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type FabricSessionFrameReceiver interface {
|
||||||
|
Frames() <-chan fabricproto.Frame
|
||||||
|
Errors() <-chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
type FabricSessionPacketTransport struct {
|
||||||
|
Sender FabricSessionFrameSender
|
||||||
|
Receiver FabricSessionFrameReceiver
|
||||||
|
Inbox *FabricPacketInbox
|
||||||
|
|
||||||
|
StreamID uint64
|
||||||
|
VPNConnectionID string
|
||||||
|
SendDirection string
|
||||||
|
ReceiveDirection string
|
||||||
|
TrafficClass string
|
||||||
|
|
||||||
|
sequence uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error {
|
||||||
|
packets = cleanPacketBatch(packets)
|
||||||
|
if len(packets) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if t == nil || t.Sender == nil {
|
||||||
|
return mesh.ErrForwardRuntimeUnavailable
|
||||||
|
}
|
||||||
|
if t.StreamID == 0 || t.VPNConnectionID == "" {
|
||||||
|
return errors.New("fabric session packet transport identity is incomplete")
|
||||||
|
}
|
||||||
|
direction := t.SendDirection
|
||||||
|
if direction == "" {
|
||||||
|
direction = FabricDirectionGatewayToClient
|
||||||
|
}
|
||||||
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
||||||
|
StreamID: t.StreamID,
|
||||||
|
Sequence: atomic.AddUint64(&t.sequence, 1),
|
||||||
|
VPNConnectionID: t.VPNConnectionID,
|
||||||
|
Direction: direction,
|
||||||
|
TrafficClass: t.TrafficClass,
|
||||||
|
Packets: packets,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return t.Sender.Send(ctx, frame)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) {
|
||||||
|
if t == nil || t.Inbox == nil {
|
||||||
|
return nil, mesh.ErrForwardRuntimeUnavailable
|
||||||
|
}
|
||||||
|
direction := t.ReceiveDirection
|
||||||
|
if direction == "" {
|
||||||
|
direction = FabricDirectionClientToGateway
|
||||||
|
}
|
||||||
|
return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) error {
|
||||||
|
if t == nil || t.Receiver == nil || t.Inbox == nil {
|
||||||
|
return mesh.ErrForwardRuntimeUnavailable
|
||||||
|
}
|
||||||
|
frames := t.Receiver.Frames()
|
||||||
|
errorsCh := t.Receiver.Errors()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case err, ok := <-errorsCh:
|
||||||
|
if !ok {
|
||||||
|
errorsCh = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case frame, ok := <-frames:
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if frame.Type != fabricproto.FrameData {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if t.StreamID != 0 && frame.StreamID != t.StreamID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -123,6 +123,32 @@ type memoryPacketTransport struct {
|
|||||||
recv [][]byte
|
recv [][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type captureFabricSessionSender struct {
|
||||||
|
err error
|
||||||
|
frames []fabricproto.Frame
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *captureFabricSessionSender) Send(_ context.Context, frame fabricproto.Frame) error {
|
||||||
|
if s.err != nil {
|
||||||
|
return s.err
|
||||||
|
}
|
||||||
|
s.frames = append(s.frames, frame)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type memoryFabricSessionReceiver struct {
|
||||||
|
frames chan fabricproto.Frame
|
||||||
|
errors chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r memoryFabricSessionReceiver) Frames() <-chan fabricproto.Frame {
|
||||||
|
return r.frames
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r memoryFabricSessionReceiver) Errors() <-chan error {
|
||||||
|
return r.errors
|
||||||
|
}
|
||||||
|
|
||||||
func (t *memoryPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error {
|
func (t *memoryPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error {
|
||||||
if t.sendErr != nil {
|
if t.sendErr != nil {
|
||||||
return t.sendErr
|
return t.sendErr
|
||||||
@@ -172,6 +198,110 @@ func TestFabricPacketTransportSendsVPNPacketBatchEnvelope(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFabricSessionPacketTransportSendsDataFrame(t *testing.T) {
|
||||||
|
sender := &captureFabricSessionSender{}
|
||||||
|
transport := &FabricSessionPacketTransport{
|
||||||
|
Sender: sender,
|
||||||
|
StreamID: 700,
|
||||||
|
VPNConnectionID: "vpn-1",
|
||||||
|
SendDirection: FabricDirectionClientToGateway,
|
||||||
|
TrafficClass: FabricTrafficClassInteractive,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil {
|
||||||
|
t.Fatalf("send fabric session packet batch: %v", err)
|
||||||
|
}
|
||||||
|
if len(sender.frames) != 1 {
|
||||||
|
t.Fatalf("sent frames = %d, want 1", len(sender.frames))
|
||||||
|
}
|
||||||
|
frame := sender.frames[0]
|
||||||
|
if frame.Type != fabricproto.FrameData || frame.StreamID != 700 || frame.Sequence != 1 || frame.TrafficClass != fabricproto.TrafficClassInteractive {
|
||||||
|
t.Fatalf("frame = %+v", frame)
|
||||||
|
}
|
||||||
|
payload, err := DecodeFabricVPNPacketDataFrame(frame)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("decode sent frame: %v", err)
|
||||||
|
}
|
||||||
|
if payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionClientToGateway || len(payload.Packets) != 1 || string(payload.Packets[0]) != "packet" {
|
||||||
|
t.Fatalf("payload = %+v", payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) {
|
||||||
|
inbox := NewFabricPacketInbox(4)
|
||||||
|
receiver := memoryFabricSessionReceiver{
|
||||||
|
frames: make(chan fabricproto.Frame, 2),
|
||||||
|
errors: make(chan error, 1),
|
||||||
|
}
|
||||||
|
transport := &FabricSessionPacketTransport{
|
||||||
|
Receiver: receiver,
|
||||||
|
Inbox: inbox,
|
||||||
|
StreamID: 701,
|
||||||
|
VPNConnectionID: "vpn-1",
|
||||||
|
}
|
||||||
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
||||||
|
StreamID: 701,
|
||||||
|
Sequence: 1,
|
||||||
|
VPNConnectionID: "vpn-1",
|
||||||
|
Direction: FabricDirectionClientToGateway,
|
||||||
|
Packets: [][]byte{[]byte("packet")},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new fabric vpn frame: %v", err)
|
||||||
|
}
|
||||||
|
receiver.frames <- fabricproto.Frame{Type: fabricproto.FrameAck, StreamID: 701, Sequence: 1}
|
||||||
|
receiver.frames <- frame
|
||||||
|
close(receiver.frames)
|
||||||
|
|
||||||
|
if err := transport.RunFrameIngress(context.Background()); err != nil {
|
||||||
|
t.Fatalf("run frame ingress: %v", err)
|
||||||
|
}
|
||||||
|
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("receive gateway packet: %v", err)
|
||||||
|
}
|
||||||
|
if len(packets) != 1 || string(packets[0]) != "packet" {
|
||||||
|
t.Fatalf("packets = %#v", packets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) {
|
||||||
|
inbox := NewFabricPacketInbox(4)
|
||||||
|
receiver := memoryFabricSessionReceiver{
|
||||||
|
frames: make(chan fabricproto.Frame, 1),
|
||||||
|
errors: make(chan error, 1),
|
||||||
|
}
|
||||||
|
transport := &FabricSessionPacketTransport{
|
||||||
|
Receiver: receiver,
|
||||||
|
Inbox: inbox,
|
||||||
|
StreamID: 701,
|
||||||
|
VPNConnectionID: "vpn-1",
|
||||||
|
}
|
||||||
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
||||||
|
StreamID: 702,
|
||||||
|
Sequence: 1,
|
||||||
|
VPNConnectionID: "vpn-1",
|
||||||
|
Direction: FabricDirectionClientToGateway,
|
||||||
|
Packets: [][]byte{[]byte("packet")},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("new fabric vpn frame: %v", err)
|
||||||
|
}
|
||||||
|
receiver.frames <- frame
|
||||||
|
close(receiver.frames)
|
||||||
|
|
||||||
|
if err := transport.RunFrameIngress(context.Background()); err != nil {
|
||||||
|
t.Fatalf("run frame ingress: %v", err)
|
||||||
|
}
|
||||||
|
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), 10*time.Millisecond)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("receive gateway packet: %v", err)
|
||||||
|
}
|
||||||
|
if len(packets) != 0 {
|
||||||
|
t.Fatalf("packets = %#v, want none", packets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFabricPacketInboxReceivesMatchingDirection(t *testing.T) {
|
func TestFabricPacketInboxReceivesMatchingDirection(t *testing.T) {
|
||||||
inbox := NewFabricPacketInbox(4)
|
inbox := NewFabricPacketInbox(4)
|
||||||
envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{
|
envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{
|
||||||
|
|||||||
@@ -272,6 +272,9 @@ plus a pump mode with outbound/inbound queues for asynchronous stream traffic.
|
|||||||
Live smoke verifies two `PING`/`PONG` round trips on the same connection.
|
Live smoke verifies two `PING`/`PONG` round trips on the same connection.
|
||||||
`vpnruntime` has a binary VPN packet-batch mapper for `FrameData` payloads so
|
`vpnruntime` has a binary VPN packet-batch mapper for `FrameData` payloads so
|
||||||
packet delivery can move away from JSON production envelopes in a gated mode.
|
packet delivery can move away from JSON production envelopes in a gated mode.
|
||||||
|
`FabricSessionPacketTransport` now adapts that mapper to the existing
|
||||||
|
`PacketTransport` interface and can demultiplex inbound DATA frames into the
|
||||||
|
VPN packet inbox by stream id.
|
||||||
|
|
||||||
Deliverables:
|
Deliverables:
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user