package vpnruntime import ( "context" "errors" "fmt" "sync" "sync/atomic" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" ) type FabricSessionFrameSender interface { Send(context.Context, fabricproto.Frame) error } type FabricSessionFrameReceiver interface { Frames() <-chan fabricproto.Frame Errors() <-chan error } type FabricSessionPacketTransport struct { Sender FabricSessionFrameSender Receiver FabricSessionFrameReceiver Inbox *FabricPacketInbox StreamID uint64 VPNConnectionID string SendDirection string ReceiveDirection string TrafficClass string StreamIDsByTrafficClass map[string][]uint64 StreamIDs []uint64 sequence uint64 sequenceMu sync.Mutex sequenceByStream map[uint64]uint64 statsMu sync.Mutex sendFramesByClass map[string]uint64 sendPacketsByClass map[string]uint64 sendFramesByStream map[uint64]uint64 } func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { packets = cleanPacketBatch(packets) if len(packets) == 0 { return nil } if t == nil || t.Sender == nil { return mesh.ErrForwardRuntimeUnavailable } if !t.hasSendStream() || t.VPNConnectionID == "" { return errors.New("fabric session packet transport identity is incomplete") } direction := t.SendDirection if direction == "" { direction = FabricDirectionGatewayToClient } streamID, trafficClass := t.selectStreamForPackets(packets) frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ StreamID: streamID, Sequence: t.nextSequence(streamID), VPNConnectionID: t.VPNConnectionID, Direction: direction, TrafficClass: trafficClass, Packets: packets, }) if err != nil { return err } if err := t.Sender.Send(ctx, frame); err != nil { return err } t.recordSend(streamID, trafficClass, len(packets)) return nil } func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { if t == nil || t.Inbox == nil { return nil, mesh.ErrForwardRuntimeUnavailable } direction := t.ReceiveDirection if direction == "" { direction = FabricDirectionClientToGateway } if packets, err := t.Inbox.Receive(ctx, t.VPNConnectionID, direction, 5*time.Millisecond); err != nil || len(packets) > 0 { return packets, err } if t.Receiver == nil { return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout) } if timeout <= 0 { timeout = 25 * time.Second } timer := time.NewTimer(timeout) defer timer.Stop() frames := t.Receiver.Frames() errorsCh := t.Receiver.Errors() for { select { case <-ctx.Done(): return nil, ctx.Err() case <-timer.C: return nil, nil case err, ok := <-errorsCh: if !ok { errorsCh = nil continue } if err != nil { return nil, err } case frame, ok := <-frames: if !ok { return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, 5*time.Millisecond) } if frame.Type != fabricproto.FrameData || !t.acceptsStream(frame.StreamID) { continue } payload, err := DecodeFabricVPNPacketDataFrame(frame) if err != nil { return nil, err } if payload.VPNConnectionID == t.VPNConnectionID && payload.Direction == direction { return cleanPacketBatch(payload.Packets), nil } if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil { return nil, err } } } } func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) error { if t == nil || t.Receiver == nil || t.Inbox == nil { return mesh.ErrForwardRuntimeUnavailable } frames := t.Receiver.Frames() errorsCh := t.Receiver.Errors() for { select { case <-ctx.Done(): return ctx.Err() case err, ok := <-errorsCh: if !ok { errorsCh = nil continue } if err != nil { return err } case frame, ok := <-frames: if !ok { return nil } if frame.Type != fabricproto.FrameData || !t.acceptsStream(frame.StreamID) { continue } if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil { return err } } } } func (t *FabricSessionPacketTransport) selectStreamForPackets(packets [][]byte) (uint64, string) { trafficClass := fabricSessionTrafficClassForPackets(t.TrafficClass, packets) if ids := t.streamIDsForTrafficClass(trafficClass); len(ids) > 0 { if len(ids) == 1 || len(packets) == 0 { return ids[0], trafficClass } _, shard := classifyPacketFlow(packets[0], len(ids)) return ids[shard], trafficClass } if len(t.StreamIDs) > 0 { if len(t.StreamIDs) == 1 || len(packets) == 0 { return t.StreamIDs[0], trafficClass } _, shard := classifyPacketFlow(packets[0], len(t.StreamIDs)) return t.StreamIDs[shard], trafficClass } return t.StreamID, trafficClass } func (t *FabricSessionPacketTransport) hasSendStream() bool { if t == nil { return false } if t.StreamID != 0 || len(t.StreamIDs) > 0 { return true } for _, ids := range t.StreamIDsByTrafficClass { if len(ids) > 0 { return true } } return false } func (t *FabricSessionPacketTransport) streamIDsForTrafficClass(trafficClass string) []uint64 { if t == nil || len(t.StreamIDsByTrafficClass) == 0 { return nil } if ids := t.StreamIDsByTrafficClass[normalizeFabricTrafficClass(trafficClass)]; len(ids) > 0 { return ids } if normalizeFabricTrafficClass(trafficClass) == FabricTrafficClassReliable { return t.StreamIDsByTrafficClass[FabricTrafficClassBulk] } return nil } func (t *FabricSessionPacketTransport) acceptsStream(streamID uint64) bool { if t == nil || streamID == 0 { return false } if t.StreamID != 0 && streamID == t.StreamID { return true } for _, id := range t.StreamIDs { if id == streamID { return true } } for _, ids := range t.StreamIDsByTrafficClass { for _, id := range ids { if id == streamID { return true } } } return t.StreamID == 0 && len(t.StreamIDs) == 0 && len(t.StreamIDsByTrafficClass) == 0 } func (t *FabricSessionPacketTransport) nextSequence(streamID uint64) uint64 { if streamID == 0 { return atomic.AddUint64(&t.sequence, 1) } t.sequenceMu.Lock() defer t.sequenceMu.Unlock() if t.sequenceByStream == nil { t.sequenceByStream = map[uint64]uint64{} } t.sequenceByStream[streamID]++ return t.sequenceByStream[streamID] } func (t *FabricSessionPacketTransport) recordSend(streamID uint64, trafficClass string, packetCount int) { if t == nil { return } trafficClass = normalizeFabricTrafficClass(trafficClass) t.statsMu.Lock() defer t.statsMu.Unlock() if t.sendFramesByClass == nil { t.sendFramesByClass = map[string]uint64{} } if t.sendPacketsByClass == nil { t.sendPacketsByClass = map[string]uint64{} } if t.sendFramesByStream == nil { t.sendFramesByStream = map[uint64]uint64{} } t.sendFramesByClass[trafficClass]++ t.sendPacketsByClass[trafficClass] += uint64(packetCount) t.sendFramesByStream[streamID]++ } func (t *FabricSessionPacketTransport) Snapshot() map[string]any { if t == nil { return nil } t.statsMu.Lock() sendFramesByClass := copyStringUint64Map(t.sendFramesByClass) sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass) sendFramesByStream := make(map[string]uint64, len(t.sendFramesByStream)) for streamID, count := range t.sendFramesByStream { sendFramesByStream[fmt.Sprintf("%d", streamID)] = count } t.statsMu.Unlock() return map[string]any{ "schema_version": "rap.vpn_fabric_session_packet_transport.v1", "stream_id": t.StreamID, "stream_ids_by_class": copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass), "send_frames_by_class": sendFramesByClass, "send_packets_by_class": sendPacketsByClass, "send_frames_by_stream_id": sendFramesByStream, } } func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) string { if fallback = normalizeFabricTrafficClass(fallback); fallback != "" && fallback != FabricTrafficClassBulk { return fallback } if batchHasTCPControlPacket(packets) { return FabricTrafficClassInteractive } return FabricTrafficClassBulk } func copyStringUint64Map(values map[string]uint64) map[string]uint64 { if len(values) == 0 { return map[string]uint64{} } out := make(map[string]uint64, len(values)) for key, value := range values { out[key] = value } return out } func copyStreamIDsByTrafficClass(values map[string][]uint64) map[string][]uint64 { if len(values) == 0 { return map[string][]uint64{} } out := make(map[string][]uint64, len(values)) for key, ids := range values { out[key] = append([]uint64(nil), ids...) } return out }