Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

300 lines
8.7 KiB
Go

package vpnruntime
import (
"context"
"sync"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
)
type FabricSessionFrameWriter interface {
SendFrame(context.Context, fabricproto.Frame) error
}
type FabricSessionPacketPeerRegistry struct {
mu sync.RWMutex
peers map[string]FabricSessionPacketPeer
changed chan struct{}
}
type FabricSessionPacketPeer struct {
TunnelID string
PoolID string
ServiceID string
ServiceTunnel FabricServiceTunnel
VPNConnectionID string
Sender FabricSessionFrameWriter
StreamID uint64
StreamIDsByTrafficClass map[string][]uint64
RegisteredAt time.Time
LastPacketAt time.Time
}
type FabricSessionPacketPeerTransport struct {
Registry *FabricSessionPacketPeerRegistry
Inbox *FabricPacketInbox
TunnelID string
PoolID string
ServiceID string
VPNConnectionID string
PeerWaitTimeout time.Duration
}
const defaultFabricSessionPeerWaitTimeout = 500 * time.Millisecond
func NewFabricSessionPacketPeerRegistry() *FabricSessionPacketPeerRegistry {
return &FabricSessionPacketPeerRegistry{peers: map[string]FabricSessionPacketPeer{}, changed: make(chan struct{})}
}
func (r *FabricSessionPacketPeerRegistry) RegisterFrame(ctx context.Context, sender FabricSessionFrameWriter, frame fabricproto.Frame) (bool, error) {
if r == nil || sender == nil || frame.Type != fabricproto.FrameData || frame.StreamID == 0 {
return false, nil
}
payload, err := DecodeFabricVPNPacketDataFrame(frame)
if err != nil {
return false, nil
}
if payload.VPNConnectionID == "" {
return false, nil
}
now := time.Now().UTC()
r.mu.Lock()
if r.peers == nil {
r.peers = map[string]FabricSessionPacketPeer{}
}
if r.changed == nil {
r.changed = make(chan struct{})
}
peer := r.peers[payload.VPNConnectionID]
if peer.RegisteredAt.IsZero() {
peer.RegisteredAt = now
}
peer.ServiceTunnel = NormalizeServiceTunnel(FabricServiceTunnel{
TunnelID: firstNonEmptyTunnelString(payload.TunnelID, payload.VPNConnectionID),
PoolID: payload.PoolID,
ServiceID: payload.ServiceID,
LocalServiceID: payload.LocalServiceID,
RemoteServiceID: payload.RemoteServiceID,
ServiceKind: payload.ServiceKind,
ServiceClass: payload.ServiceClass,
ServiceRole: payload.ServiceRole,
RouteLeaseID: payload.RouteLeaseID,
RouteGeneration: payload.RouteGeneration,
DataPlane: payload.DataPlane,
TransportOwner: payload.TransportOwner,
RouteVisibility: payload.RouteVisibility,
TrafficClasses: payload.TrafficClasses,
StreamShards: payload.StreamShards,
}, payload.VPNConnectionID)
peer.TunnelID = peer.ServiceTunnel.TunnelID
peer.PoolID = peer.ServiceTunnel.PoolID
peer.ServiceID = peer.ServiceTunnel.ServiceID
peer.VPNConnectionID = payload.VPNConnectionID
peer.Sender = sender
peer.StreamID = frame.StreamID
peer.LastPacketAt = now
if peer.StreamIDsByTrafficClass == nil {
peer.StreamIDsByTrafficClass = map[string][]uint64{}
}
trafficClass := fabricSessionTrafficClassName(frame.TrafficClass)
if trafficClass != "" && !containsUint64(peer.StreamIDsByTrafficClass[trafficClass], frame.StreamID) {
peer.StreamIDsByTrafficClass[trafficClass] = append(peer.StreamIDsByTrafficClass[trafficClass], frame.StreamID)
}
r.peers[payload.VPNConnectionID] = peer
r.signalLocked()
r.mu.Unlock()
return true, nil
}
func (r *FabricSessionPacketPeerRegistry) TransportFor(vpnConnectionID string, inbox *FabricPacketInbox) PacketTransport {
if r == nil || inbox == nil || vpnConnectionID == "" {
return nil
}
r.mu.RLock()
peer, ok := r.peers[vpnConnectionID]
r.mu.RUnlock()
if !ok || peer.Sender == nil || peer.StreamID == 0 {
return nil
}
return &FabricSessionPacketTransport{
Sender: fabricSessionFrameWriterAdapter{writer: peer.Sender},
Inbox: inbox,
StreamID: peer.StreamID,
ServiceTunnel: peer.ServiceTunnel,
TunnelID: vpnConnectionID,
PoolID: peer.PoolID,
ServiceID: peer.ServiceID,
VPNConnectionID: vpnConnectionID,
SendDirection: FabricDirectionGatewayToClient,
ReceiveDirection: FabricDirectionClientToGateway,
}
}
func (r *FabricSessionPacketPeerRegistry) WaitTransportFor(ctx context.Context, vpnConnectionID string, inbox *FabricPacketInbox, timeout time.Duration) PacketTransport {
if timeout <= 0 {
return r.TransportFor(vpnConnectionID, inbox)
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
if transport := r.TransportFor(vpnConnectionID, inbox); transport != nil {
return transport
}
changed := r.changedChannel()
select {
case <-ctx.Done():
return nil
case <-timer.C:
return nil
case <-changed:
}
}
}
func (r *FabricSessionPacketPeerRegistry) Forget(vpnConnectionID string) {
if r == nil || vpnConnectionID == "" {
return
}
r.mu.Lock()
if r.changed == nil {
r.changed = make(chan struct{})
}
delete(r.peers, vpnConnectionID)
r.signalLocked()
r.mu.Unlock()
}
func (r *FabricSessionPacketPeerRegistry) changedChannel() <-chan struct{} {
if r == nil {
return nil
}
r.mu.Lock()
defer r.mu.Unlock()
if r.changed == nil {
r.changed = make(chan struct{})
}
return r.changed
}
func (r *FabricSessionPacketPeerRegistry) signalLocked() {
if r == nil {
return
}
if r.changed == nil {
r.changed = make(chan struct{})
}
close(r.changed)
r.changed = make(chan struct{})
}
func (t *FabricSessionPacketPeerTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error {
if t == nil || t.Registry == nil || t.Inbox == nil || t.VPNConnectionID == "" {
return mesh.ErrForwardRuntimeUnavailable
}
waitTimeout := t.PeerWaitTimeout
if waitTimeout <= 0 {
waitTimeout = defaultFabricSessionPeerWaitTimeout
}
transport := t.Registry.WaitTransportFor(ctx, t.VPNConnectionID, t.Inbox, waitTimeout)
if transport == nil {
return mesh.ErrForwardRuntimeUnavailable
}
if err := transport.SendGatewayPacketBatch(ctx, packets); err != nil {
t.Registry.Forget(t.VPNConnectionID)
return err
}
return nil
}
func (t *FabricSessionPacketPeerTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) {
if t == nil || t.Inbox == nil || t.VPNConnectionID == "" {
return nil, mesh.ErrForwardRuntimeUnavailable
}
return t.Inbox.Receive(ctx, t.VPNConnectionID, FabricDirectionClientToGateway, timeout)
}
func (t *FabricSessionPacketPeerTransport) Snapshot() map[string]any {
if t == nil {
return map[string]any{
"transport": "fabric_session_peer_dynamic",
"peer_ready": false,
}
}
ready := 0
if t.Registry != nil {
if transport := t.Registry.TransportFor(t.VPNConnectionID, t.Inbox); transport != nil {
ready = 1
}
}
return map[string]any{
"transport": "fabric_session_peer_dynamic",
"tunnel_id": firstNonEmptyTunnelString(t.TunnelID, t.VPNConnectionID),
"pool_id": t.PoolID,
"service_id": t.ServiceID,
"vpn_connection_id_alias": t.VPNConnectionID,
"peer_ready": ready == 1,
}
}
func (r *FabricSessionPacketPeerRegistry) Snapshot() map[string]any {
if r == nil {
return map[string]any{"ready": 0}
}
r.mu.RLock()
defer r.mu.RUnlock()
out := map[string]any{"ready": len(r.peers)}
items := make([]map[string]any, 0, len(r.peers))
for _, peer := range r.peers {
item := map[string]any{
"tunnel_id": firstNonEmptyTunnelString(peer.TunnelID, peer.VPNConnectionID),
"pool_id": peer.PoolID,
"service_id": peer.ServiceID,
"vpn_connection_id_alias": peer.VPNConnectionID,
"service_tunnel": peer.ServiceTunnel.Snapshot(),
"stream_id": peer.StreamID,
}
if !peer.RegisteredAt.IsZero() {
item["registered_at"] = peer.RegisteredAt.Format(time.RFC3339Nano)
}
if !peer.LastPacketAt.IsZero() {
item["last_packet_at"] = peer.LastPacketAt.Format(time.RFC3339Nano)
}
items = append(items, item)
}
out["peers"] = items
return out
}
type fabricSessionFrameWriterAdapter struct {
writer FabricSessionFrameWriter
}
func (a fabricSessionFrameWriterAdapter) Send(ctx context.Context, frame fabricproto.Frame) error {
if a.writer == nil {
return mesh.ErrForwardRuntimeUnavailable
}
return a.writer.SendFrame(ctx, frame)
}
func containsUint64(values []uint64, value uint64) bool {
for _, item := range values {
if item == value {
return true
}
}
return false
}
func copyStreamIDsByClass(values map[string][]uint64) map[string][]uint64 {
if len(values) == 0 {
return nil
}
out := make(map[string][]uint64, len(values))
for key, ids := range values {
out[key] = append([]uint64(nil), ids...)
}
return out
}