package vpnruntime import ( "context" "sync" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" ) type FabricSessionFrameWriter interface { SendFrame(context.Context, fabricproto.Frame) error } type FabricSessionPacketPeerRegistry struct { mu sync.RWMutex peers map[string]FabricSessionPacketPeer changed chan struct{} } type FabricSessionPacketPeer struct { TunnelID string PoolID string ServiceID string ServiceTunnel FabricServiceTunnel VPNConnectionID string Sender FabricSessionFrameWriter StreamID uint64 StreamIDsByTrafficClass map[string][]uint64 RegisteredAt time.Time LastPacketAt time.Time } type FabricSessionPacketPeerTransport struct { Registry *FabricSessionPacketPeerRegistry Inbox *FabricPacketInbox TunnelID string PoolID string ServiceID string VPNConnectionID string PeerWaitTimeout time.Duration } const defaultFabricSessionPeerWaitTimeout = 500 * time.Millisecond func NewFabricSessionPacketPeerRegistry() *FabricSessionPacketPeerRegistry { return &FabricSessionPacketPeerRegistry{peers: map[string]FabricSessionPacketPeer{}, changed: make(chan struct{})} } func (r *FabricSessionPacketPeerRegistry) RegisterFrame(ctx context.Context, sender FabricSessionFrameWriter, frame fabricproto.Frame) (bool, error) { if r == nil || sender == nil || frame.Type != fabricproto.FrameData || frame.StreamID == 0 { return false, nil } payload, err := DecodeFabricVPNPacketDataFrame(frame) if err != nil { return false, nil } if payload.VPNConnectionID == "" { return false, nil } now := time.Now().UTC() r.mu.Lock() if r.peers == nil { r.peers = map[string]FabricSessionPacketPeer{} } if r.changed == nil { r.changed = make(chan struct{}) } peer := r.peers[payload.VPNConnectionID] if peer.RegisteredAt.IsZero() { peer.RegisteredAt = now } peer.ServiceTunnel = NormalizeServiceTunnel(FabricServiceTunnel{ TunnelID: firstNonEmptyTunnelString(payload.TunnelID, payload.VPNConnectionID), PoolID: payload.PoolID, ServiceID: payload.ServiceID, LocalServiceID: payload.LocalServiceID, RemoteServiceID: payload.RemoteServiceID, ServiceKind: payload.ServiceKind, ServiceClass: payload.ServiceClass, ServiceRole: payload.ServiceRole, RouteLeaseID: payload.RouteLeaseID, RouteGeneration: payload.RouteGeneration, DataPlane: payload.DataPlane, TransportOwner: payload.TransportOwner, RouteVisibility: payload.RouteVisibility, TrafficClasses: payload.TrafficClasses, StreamShards: payload.StreamShards, }, payload.VPNConnectionID) peer.TunnelID = peer.ServiceTunnel.TunnelID peer.PoolID = peer.ServiceTunnel.PoolID peer.ServiceID = peer.ServiceTunnel.ServiceID peer.VPNConnectionID = payload.VPNConnectionID peer.Sender = sender peer.StreamID = frame.StreamID peer.LastPacketAt = now if peer.StreamIDsByTrafficClass == nil { peer.StreamIDsByTrafficClass = map[string][]uint64{} } trafficClass := fabricSessionTrafficClassName(frame.TrafficClass) if trafficClass != "" && !containsUint64(peer.StreamIDsByTrafficClass[trafficClass], frame.StreamID) { peer.StreamIDsByTrafficClass[trafficClass] = append(peer.StreamIDsByTrafficClass[trafficClass], frame.StreamID) } r.peers[payload.VPNConnectionID] = peer r.signalLocked() r.mu.Unlock() return true, nil } func (r *FabricSessionPacketPeerRegistry) TransportFor(vpnConnectionID string, inbox *FabricPacketInbox) PacketTransport { if r == nil || inbox == nil || vpnConnectionID == "" { return nil } r.mu.RLock() peer, ok := r.peers[vpnConnectionID] r.mu.RUnlock() if !ok || peer.Sender == nil || peer.StreamID == 0 { return nil } return &FabricSessionPacketTransport{ Sender: fabricSessionFrameWriterAdapter{writer: peer.Sender}, Inbox: inbox, StreamID: peer.StreamID, ServiceTunnel: peer.ServiceTunnel, TunnelID: vpnConnectionID, PoolID: peer.PoolID, ServiceID: peer.ServiceID, VPNConnectionID: vpnConnectionID, SendDirection: FabricDirectionGatewayToClient, ReceiveDirection: FabricDirectionClientToGateway, } } func (r *FabricSessionPacketPeerRegistry) WaitTransportFor(ctx context.Context, vpnConnectionID string, inbox *FabricPacketInbox, timeout time.Duration) PacketTransport { if timeout <= 0 { return r.TransportFor(vpnConnectionID, inbox) } timer := time.NewTimer(timeout) defer timer.Stop() for { if transport := r.TransportFor(vpnConnectionID, inbox); transport != nil { return transport } changed := r.changedChannel() select { case <-ctx.Done(): return nil case <-timer.C: return nil case <-changed: } } } func (r *FabricSessionPacketPeerRegistry) Forget(vpnConnectionID string) { if r == nil || vpnConnectionID == "" { return } r.mu.Lock() if r.changed == nil { r.changed = make(chan struct{}) } delete(r.peers, vpnConnectionID) r.signalLocked() r.mu.Unlock() } func (r *FabricSessionPacketPeerRegistry) changedChannel() <-chan struct{} { if r == nil { return nil } r.mu.Lock() defer r.mu.Unlock() if r.changed == nil { r.changed = make(chan struct{}) } return r.changed } func (r *FabricSessionPacketPeerRegistry) signalLocked() { if r == nil { return } if r.changed == nil { r.changed = make(chan struct{}) } close(r.changed) r.changed = make(chan struct{}) } func (t *FabricSessionPacketPeerTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { if t == nil || t.Registry == nil || t.Inbox == nil || t.VPNConnectionID == "" { return mesh.ErrForwardRuntimeUnavailable } waitTimeout := t.PeerWaitTimeout if waitTimeout <= 0 { waitTimeout = defaultFabricSessionPeerWaitTimeout } transport := t.Registry.WaitTransportFor(ctx, t.VPNConnectionID, t.Inbox, waitTimeout) if transport == nil { return mesh.ErrForwardRuntimeUnavailable } if err := transport.SendGatewayPacketBatch(ctx, packets); err != nil { t.Registry.Forget(t.VPNConnectionID) return err } return nil } func (t *FabricSessionPacketPeerTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { if t == nil || t.Inbox == nil || t.VPNConnectionID == "" { return nil, mesh.ErrForwardRuntimeUnavailable } return t.Inbox.Receive(ctx, t.VPNConnectionID, FabricDirectionClientToGateway, timeout) } func (t *FabricSessionPacketPeerTransport) Snapshot() map[string]any { if t == nil { return map[string]any{ "transport": "fabric_session_peer_dynamic", "peer_ready": false, } } ready := 0 if t.Registry != nil { if transport := t.Registry.TransportFor(t.VPNConnectionID, t.Inbox); transport != nil { ready = 1 } } return map[string]any{ "transport": "fabric_session_peer_dynamic", "tunnel_id": firstNonEmptyTunnelString(t.TunnelID, t.VPNConnectionID), "pool_id": t.PoolID, "service_id": t.ServiceID, "vpn_connection_id_alias": t.VPNConnectionID, "peer_ready": ready == 1, } } func (r *FabricSessionPacketPeerRegistry) Snapshot() map[string]any { if r == nil { return map[string]any{"ready": 0} } r.mu.RLock() defer r.mu.RUnlock() out := map[string]any{"ready": len(r.peers)} items := make([]map[string]any, 0, len(r.peers)) for _, peer := range r.peers { item := map[string]any{ "tunnel_id": firstNonEmptyTunnelString(peer.TunnelID, peer.VPNConnectionID), "pool_id": peer.PoolID, "service_id": peer.ServiceID, "vpn_connection_id_alias": peer.VPNConnectionID, "service_tunnel": peer.ServiceTunnel.Snapshot(), "stream_id": peer.StreamID, } if !peer.RegisteredAt.IsZero() { item["registered_at"] = peer.RegisteredAt.Format(time.RFC3339Nano) } if !peer.LastPacketAt.IsZero() { item["last_packet_at"] = peer.LastPacketAt.Format(time.RFC3339Nano) } items = append(items, item) } out["peers"] = items return out } type fabricSessionFrameWriterAdapter struct { writer FabricSessionFrameWriter } func (a fabricSessionFrameWriterAdapter) Send(ctx context.Context, frame fabricproto.Frame) error { if a.writer == nil { return mesh.ErrForwardRuntimeUnavailable } return a.writer.SendFrame(ctx, frame) } func containsUint64(values []uint64, value uint64) bool { for _, item := range values { if item == value { return true } } return false } func copyStreamIDsByClass(values map[string][]uint64) map[string][]uint64 { if len(values) == 0 { return nil } out := make(map[string][]uint64, len(values)) for key, ids := range values { out[key] = append([]uint64(nil), ids...) } return out }