Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

1179 lines
35 KiB
Go

package fabricvpn
import (
"context"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/vpnruntime"
"github.com/quic-go/quic-go"
)
const (
defaultRuntimeStreamShards = 8
maxRuntimeStreamShards = 128
minPacketBatchSendTimeout = 5 * time.Second
maxPacketBatchSendTimeout = 30 * time.Second
)
type endpointConfig struct {
EndpointID string `json:"endpoint_id"`
NodeID string `json:"node_id"`
Transport string `json:"transport"`
Address string `json:"address"`
PeerCertSHA256 string `json:"peer_cert_sha256"`
TLSCertSHA256 string `json:"tls_cert_sha256"`
Priority int `json:"priority"`
}
type runtimeConfig struct {
ClusterID string `json:"cluster_id"`
LocalNodeID string `json:"local_node_id"`
TunnelID string `json:"tunnel_id"`
PoolID string `json:"pool_id"`
ServiceID string `json:"service_id"`
LocalServiceID string `json:"local_service_id"`
RemoteServiceID string `json:"remote_service_id"`
ServiceKind string `json:"service_kind"`
ServiceClass string `json:"service_class"`
RouteLeaseID string `json:"route_lease_id"`
RouteGeneration string `json:"route_generation"`
Endpoints []endpointConfig `json:"endpoints"`
RouteBundle routeBundleConfig `json:"route_bundle"`
ServiceChannelRequest serviceChannelRequest `json:"service_channel_request"`
StreamShards int `json:"stream_shards"`
}
type controlForwardResponse struct {
Payload json.RawMessage `json:"payload,omitempty"`
Error string `json:"error,omitempty"`
}
type routeBundleConfig struct {
SchemaVersion string `json:"schema_version"`
RouteAuthority string `json:"route_authority"`
SelectedTargetNode string `json:"selected_target_node_id"`
EndpointCandidates []endpointConfig `json:"endpoint_candidates"`
TargetCandidates []endpointConfig `json:"target_candidates"`
RouteLease routeLeaseConfig `json:"route_lease"`
}
type routeLeaseConfig struct {
SchemaVersion string `json:"schema_version"`
LeaseID string `json:"lease_id"`
Generation string `json:"generation"`
SelectedTargetNode string `json:"selected_target_node"`
PrimaryPath routeLeasePath `json:"primary_path"`
WarmStandbyPaths []routeLeasePath `json:"warm_standby_paths"`
Multipath map[string]any `json:"multipath"`
RebuildPolicy map[string]any `json:"rebuild_policy"`
}
type routeLeasePath struct {
PathID string `json:"path_id"`
TargetNodeID string `json:"target_node_id"`
Status string `json:"status"`
EndpointCandidates []endpointConfig `json:"endpoint_candidates"`
}
type serviceChannelRequest struct {
SchemaVersion string `json:"schema_version"`
ChannelID string `json:"channel_id"`
ServiceClass string `json:"service_class"`
SourceRole string `json:"source_role"`
}
type SocketProtector interface {
Protect(fd int64) bool
}
type Manager struct {
opMu sync.Mutex
mu sync.Mutex
cancel context.CancelFunc
heartbeatCancel context.CancelFunc
transport *mesh.QUICFabricTransport
session mesh.FabricTransportSession
packet *vpnruntime.FabricSessionPacketTransport
inbox *vpnruntime.FabricPacketInbox
serviceStreams *vpnruntime.FabricServiceStreamRegistry
cfg runtimeConfig
lastErr string
endpoint string
protector SocketProtector
uplinkPackets atomic.Uint64
uplinkBytes atomic.Uint64
downlinkPackets atomic.Uint64
downlinkBytes atomic.Uint64
}
type fabricEndpointConnectResult struct {
endpoint endpointConfig
session mesh.FabricTransportSession
streamIDs map[string][]uint64
streamID uint64
err error
}
func NewManager() *Manager {
return &Manager{}
}
func (m *Manager) SetSocketProtector(protector SocketProtector) {
m.mu.Lock()
m.protector = protector
m.mu.Unlock()
}
func (m *Manager) Start(configJSON string) error {
var cfg runtimeConfig
if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil {
return err
}
cfg.ClusterID = strings.TrimSpace(cfg.ClusterID)
cfg.LocalNodeID = strings.TrimSpace(cfg.LocalNodeID)
cfg.TunnelID = strings.TrimSpace(cfg.TunnelID)
cfg.PoolID = strings.TrimSpace(cfg.PoolID)
cfg.ServiceID = strings.TrimSpace(cfg.ServiceID)
cfg.LocalServiceID = strings.TrimSpace(cfg.LocalServiceID)
cfg.RemoteServiceID = strings.TrimSpace(cfg.RemoteServiceID)
cfg.ServiceKind = strings.TrimSpace(cfg.ServiceKind)
cfg.ServiceClass = strings.TrimSpace(cfg.ServiceClass)
cfg.RouteLeaseID = strings.TrimSpace(firstNonEmpty(cfg.RouteLeaseID, cfg.RouteBundle.RouteLease.LeaseID))
cfg.RouteGeneration = strings.TrimSpace(firstNonEmpty(cfg.RouteGeneration, cfg.RouteBundle.RouteLease.Generation, cfg.RouteBundle.RouteLease.LeaseID))
cfg.TunnelID = firstNonEmpty(cfg.TunnelID)
if cfg.PoolID == "" {
cfg.PoolID = vpnruntime.DefaultFabricTunnelPoolID
}
if cfg.ServiceClass == "" {
cfg.ServiceClass = vpnruntime.DefaultFabricTunnelClass
}
if cfg.ServiceKind == "" {
cfg.ServiceKind = vpnruntime.DefaultFabricTunnelServiceKind
}
cfg.Endpoints = fabricRuntimeEndpoints(cfg)
if cfg.ClusterID == "" || cfg.LocalNodeID == "" || cfg.TunnelID == "" {
return fmt.Errorf("cluster, local node and fabric tunnel id are required")
}
if strings.TrimSpace(cfg.ServiceChannelRequest.SchemaVersion) == "" {
return fmt.Errorf("fabric service channel request is required")
}
if len(cfg.Endpoints) == 0 {
return fmt.Errorf("fabric route lease has no QUIC candidates")
}
if cfg.StreamShards <= 0 {
cfg.StreamShards = defaultRuntimeStreamShards
}
if cfg.StreamShards > maxRuntimeStreamShards {
cfg.StreamShards = maxRuntimeStreamShards
}
m.Stop()
ctx, cancel := context.WithCancel(context.Background())
if err := m.connect(ctx, cfg, cancel); err != nil {
cancel()
m.setErr(err)
return err
}
return nil
}
func fabricRuntimeEndpoints(cfg runtimeConfig) []endpointConfig {
if len(cfg.RouteBundle.RouteLease.PrimaryPath.EndpointCandidates) > 0 {
return cfg.RouteBundle.RouteLease.PrimaryPath.EndpointCandidates
}
for _, path := range cfg.RouteBundle.RouteLease.WarmStandbyPaths {
if len(path.EndpointCandidates) > 0 {
return path.EndpointCandidates
}
}
if len(cfg.RouteBundle.EndpointCandidates) > 0 {
return cfg.RouteBundle.EndpointCandidates
}
if len(cfg.RouteBundle.TargetCandidates) > 0 {
return cfg.RouteBundle.TargetCandidates
}
return cfg.Endpoints
}
func fabricRuntimeTargetNodeID(cfg runtimeConfig) string {
if cfg.RouteBundle.RouteLease.PrimaryPath.TargetNodeID != "" {
return cfg.RouteBundle.RouteLease.PrimaryPath.TargetNodeID
}
if cfg.RouteBundle.RouteLease.SelectedTargetNode != "" {
return cfg.RouteBundle.RouteLease.SelectedTargetNode
}
return cfg.RouteBundle.SelectedTargetNode
}
func (m *Manager) connect(ctx context.Context, cfg runtimeConfig, cancel context.CancelFunc) error {
quicTransport := mesh.NewQUICFabricTransport(nil)
quicTransport.SetLocalPeerID(cfg.LocalNodeID)
quicTransport.DialAddr = m.protectedQUICDialer()
inbox := vpnruntime.NewFabricPacketInbox(4096)
quicTransport.SetInboundHandlers(func(ctx context.Context, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) {
if err := inbox.DeliverProductionEnvelope(ctx, envelope); err != nil {
return mesh.ProductionForwardResult{}, err
}
return mesh.ProductionForwardResult{Delivered: true, MessageID: envelope.MessageID}, nil
}, nil, nil)
result, err := m.connectFastestEndpoint(ctx, cfg, quicTransport)
if err != nil {
return err
}
endpoint := result.endpoint
session := result.session
streamIDs := result.streamIDs
streamID := result.streamID
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
m.mu.Lock()
m.cancel = cancel
m.heartbeatCancel = heartbeatCancel
m.transport = quicTransport
m.session = session
m.inbox = inbox
m.serviceStreams = vpnruntime.NewFabricServiceStreamRegistry()
m.cfg = cfg
m.endpoint = endpoint.Address
m.lastErr = ""
m.packet = &vpnruntime.FabricSessionPacketTransport{
Sender: session,
Receiver: session,
Inbox: inbox,
StreamID: streamID,
ServiceStreams: m.serviceStreams,
ServiceTunnel: serviceTunnelFromRuntimeConfig(cfg),
StreamIDsByTrafficClass: streamIDs,
TunnelID: cfg.TunnelID,
PoolID: cfg.PoolID,
ServiceID: cfg.ServiceID,
VPNConnectionID: cfg.TunnelID,
SendDirection: vpnruntime.FabricDirectionClientToGateway,
ReceiveDirection: vpnruntime.FabricDirectionGatewayToClient,
}
m.mu.Unlock()
announceCtx, announceCancel := context.WithTimeout(context.Background(), 2*time.Second)
announceErr := announceVPNSessionStreams(announceCtx, session, serviceTunnelFromRuntimeConfig(cfg), streamIDs, streamID)
announceCancel()
if announceErr != nil {
m.setErr(announceErr)
}
go m.runVPNSessionHeartbeat(heartbeatCtx, session, streamIDs, streamID)
return nil
}
func (m *Manager) connectFastestEndpoint(ctx context.Context, cfg runtimeConfig, quicTransport *mesh.QUICFabricTransport) (fabricEndpointConnectResult, error) {
if len(cfg.Endpoints) == 0 {
return fabricEndpointConnectResult{}, fmt.Errorf("no QUIC exit endpoints available")
}
connectCtx, connectCancel := context.WithCancel(ctx)
defer connectCancel()
endpointGroups := groupEndpointsByPeer(cfg)
results := make(chan fabricEndpointConnectResult, len(endpointGroups))
attempts := 0
for _, group := range endpointGroups {
attempts++
go func(group []endpointConfig) {
var last fabricEndpointConnectResult
for _, endpoint := range group {
target := fabricRuntimePacketTarget(cfg, endpoint)
carrier, selected, err := mesh.FabricTransportForTarget(target, quicTransport)
if err != nil {
last = fabricEndpointConnectResult{endpoint: endpoint, err: err}
continue
}
dialCtx, dialCancel := context.WithTimeout(connectCtx, 5*time.Second)
session, err := carrier.Connect(dialCtx, selected)
if err != nil {
dialCancel()
last = fabricEndpointConnectResult{endpoint: endpoint, err: err}
continue
}
streamIDs, streamID, err := openStreams(dialCtx, session, cfg.StreamShards)
dialCancel()
if err != nil {
_ = session.Close()
last = fabricEndpointConnectResult{endpoint: endpoint, err: err}
continue
}
results <- fabricEndpointConnectResult{
endpoint: endpoint,
session: session,
streamIDs: streamIDs,
streamID: streamID,
}
return
}
if last.err == nil {
last.err = fmt.Errorf("no endpoint attempt completed for peer")
}
results <- last
}(group)
}
var lastErr error
for index := 0; index < attempts; index++ {
select {
case <-ctx.Done():
if lastErr != nil {
return fabricEndpointConnectResult{}, lastErr
}
return fabricEndpointConnectResult{}, ctx.Err()
case result := <-results:
if result.err != nil {
lastErr = result.err
continue
}
connectCancel()
go closeLateFabricSessions(results, attempts-index-1)
return result, nil
}
}
if lastErr == nil {
lastErr = fmt.Errorf("no endpoint attempt completed")
}
return fabricEndpointConnectResult{}, fmt.Errorf("fabric bootstrap failed after %d endpoint candidates: %w", len(cfg.Endpoints), lastErr)
}
func groupEndpointsByPeer(cfg runtimeConfig) [][]endpointConfig {
groups := make([][]endpointConfig, 0, len(cfg.Endpoints))
indexByPeer := map[string]int{}
for _, endpoint := range cfg.Endpoints {
peer := endpointPeerKey(cfg, endpoint)
if index, ok := indexByPeer[peer]; ok {
groups[index] = append(groups[index], endpoint)
continue
}
indexByPeer[peer] = len(groups)
groups = append(groups, []endpointConfig{endpoint})
}
return groups
}
func endpointPeerKey(cfg runtimeConfig, endpoint endpointConfig) string {
if value := strings.TrimSpace(endpoint.NodeID); value != "" {
return value
}
if value := strings.TrimSpace(fabricRuntimeTargetNodeID(cfg)); value != "" {
return value
}
return firstNonEmpty(endpoint.EndpointID, endpoint.Address)
}
func closeLateFabricSessions(results <-chan fabricEndpointConnectResult, remaining int) {
for index := 0; index < remaining; index++ {
result := <-results
if result.session != nil {
_ = result.session.Close()
}
}
}
func fabricRuntimePacketTarget(cfg runtimeConfig, endpoint endpointConfig) mesh.FabricTransportTarget {
return mesh.FabricTransportTarget{
EndpointID: firstNonEmpty(endpoint.EndpointID, endpoint.Address),
PeerID: firstNonEmpty(endpoint.NodeID, fabricRuntimeTargetNodeID(cfg)),
Endpoint: endpoint.Address,
Transport: firstNonEmpty(endpoint.Transport, "direct_quic"),
PeerCertSHA256: firstNonEmpty(endpoint.PeerCertSHA256, endpoint.TLSCertSHA256),
OutboundBuffer: 4096,
InboundBuffer: 4096,
ErrorBuffer: 128,
}
}
func announceVPNSessionStreams(ctx context.Context, session mesh.FabricTransportSession, serviceTunnel vpnruntime.FabricServiceTunnel, streamIDsByClass map[string][]uint64, fallbackStreamID uint64) error {
serviceTunnel = vpnruntime.NormalizeServiceTunnel(serviceTunnel, serviceTunnel.TunnelID)
if session == nil || strings.TrimSpace(serviceTunnel.TunnelID) == "" {
return fmt.Errorf("fabric vpn session announce requires an active session")
}
announced := map[uint64]bool{}
sequence := uint64(time.Now().UnixNano())
for trafficClass, streamIDs := range streamIDsByClass {
for _, streamID := range streamIDs {
if streamID == 0 || announced[streamID] {
continue
}
sequence++
frame, err := vpnruntime.NewFabricVPNSessionHelloFrame(vpnruntime.FabricVPNPacketFrameInput{
StreamID: streamID,
Sequence: sequence,
VPNConnectionID: serviceTunnel.TunnelID,
Direction: vpnruntime.FabricDirectionClientToGateway,
TrafficClass: trafficClass,
ServiceTunnel: serviceTunnel,
})
if err != nil {
return err
}
if err := session.Send(ctx, frame); err != nil {
return err
}
announced[streamID] = true
}
}
if len(announced) == 0 && fallbackStreamID != 0 {
frame, err := vpnruntime.NewFabricVPNSessionHelloFrame(vpnruntime.FabricVPNPacketFrameInput{
StreamID: fallbackStreamID,
Sequence: sequence + 1,
VPNConnectionID: serviceTunnel.TunnelID,
Direction: vpnruntime.FabricDirectionClientToGateway,
TrafficClass: vpnruntime.FabricTrafficClassBulk,
ServiceTunnel: serviceTunnel,
})
if err != nil {
return err
}
if err := session.Send(ctx, frame); err != nil {
return err
}
}
return nil
}
func (m *Manager) runVPNSessionHeartbeat(ctx context.Context, session mesh.FabricTransportSession, streamIDsByClass map[string][]uint64, fallbackStreamID uint64) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
heartbeatCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
err := announceVPNSessionStreams(heartbeatCtx, session, m.currentServiceTunnel(), streamIDsByClass, fallbackStreamID)
cancel()
if err != nil {
m.setErr(err)
return
}
}
}
}
func (m *Manager) currentServiceTunnel() vpnruntime.FabricServiceTunnel {
m.mu.Lock()
cfg := m.cfg
m.mu.Unlock()
return serviceTunnelFromRuntimeConfig(cfg)
}
func (m *Manager) protectedQUICDialer() func(context.Context, string, *tls.Config, *quic.Config) (*quic.Conn, error) {
m.mu.Lock()
protector := m.protector
m.mu.Unlock()
if protector == nil {
return nil
}
return func(ctx context.Context, endpoint string, tlsConfig *tls.Config, config *quic.Config) (*quic.Conn, error) {
network := "udp4"
if strings.Contains(endpoint, "[") {
network = "udp6"
}
conn, err := net.ListenPacket(network, ":0")
if err != nil {
return nil, err
}
raw, ok := conn.(interface {
SyscallConn() (syscall.RawConn, error)
})
if !ok {
_ = conn.Close()
return nil, fmt.Errorf("udp socket does not expose raw connection for vpn protection")
}
rawConn, err := raw.SyscallConn()
if err != nil {
_ = conn.Close()
return nil, err
}
var protectErr error
if err := rawConn.Control(func(fd uintptr) {
if !protector.Protect(int64(fd)) {
protectErr = fmt.Errorf("android vpn socket protect failed")
}
}); err != nil {
_ = conn.Close()
return nil, err
}
if protectErr != nil {
_ = conn.Close()
return nil, protectErr
}
return mesh.DialQUICAddrWithPacketConn(ctx, endpoint, conn, tlsConfig, config)
}
}
func (m *Manager) Stop() {
m.opMu.Lock()
defer m.opMu.Unlock()
m.stopLocked()
}
func (m *Manager) stopLocked() {
m.mu.Lock()
cancel := m.cancel
heartbeatCancel := m.heartbeatCancel
session := m.session
transport := m.transport
m.cancel = nil
m.heartbeatCancel = nil
m.session = nil
m.transport = nil
m.packet = nil
m.serviceStreams = nil
m.mu.Unlock()
if cancel != nil {
cancel()
}
if heartbeatCancel != nil {
heartbeatCancel()
}
if session != nil {
_ = session.Close()
}
if transport != nil {
_ = transport.Close()
}
}
func (m *Manager) SendPacket(packet []byte) error {
if len(packet) == 0 {
return nil
}
transport := m.packetTransport()
if transport == nil {
var err error
transport, err = m.reconnectPacketTransport()
if err != nil || transport == nil {
return fmt.Errorf("fabric vpn runtime is not connected")
}
}
packetBatch := [][]byte{append([]byte(nil), packet...)}
ctx, cancel := context.WithTimeout(context.Background(), packetBatchSendTimeout(packetBatch))
defer cancel()
if err := transport.SendGatewayPacketBatch(ctx, packetBatch); err != nil {
m.setErr(err)
transport, reconnectErr := m.reconnectPacketTransport()
if reconnectErr != nil {
return err
}
if transport == nil {
return err
}
retryPacketBatch := [][]byte{append([]byte(nil), packet...)}
retryCtx, retryCancel := context.WithTimeout(context.Background(), packetBatchSendTimeout(retryPacketBatch))
defer retryCancel()
if retryErr := transport.SendGatewayPacketBatch(retryCtx, retryPacketBatch); retryErr != nil {
m.setErr(retryErr)
return retryErr
}
}
m.uplinkPackets.Add(1)
m.uplinkBytes.Add(uint64(len(packet)))
return nil
}
func (m *Manager) SendPacketBatchPayload(payload []byte) error {
packets, err := decodePacketBatchPayload(payload)
if err != nil {
return err
}
if len(packets) == 0 {
return nil
}
transport := m.packetTransport()
if transport == nil {
var err error
transport, err = m.reconnectPacketTransport()
if err != nil || transport == nil {
return fmt.Errorf("fabric vpn runtime is not connected")
}
}
sendTimeout := packetBatchSendTimeout(packets)
ctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
defer cancel()
if err := transport.SendGatewayPacketBatch(ctx, clonePacketBatch(packets)); err != nil {
m.setErr(err)
transport, reconnectErr := m.reconnectPacketTransport()
if reconnectErr != nil {
return err
}
if transport == nil {
return err
}
retryCtx, retryCancel := context.WithTimeout(context.Background(), sendTimeout)
defer retryCancel()
if retryErr := transport.SendGatewayPacketBatch(retryCtx, clonePacketBatch(packets)); retryErr != nil {
m.setErr(retryErr)
return retryErr
}
}
var bytes uint64
for _, packet := range packets {
bytes += uint64(len(packet))
}
m.uplinkPackets.Add(uint64(len(packets)))
m.uplinkBytes.Add(bytes)
return nil
}
func packetBatchSendTimeout(packets [][]byte) time.Duration {
if len(packets) == 0 {
return minPacketBatchSendTimeout
}
var bytes int
for _, packet := range packets {
bytes += len(packet)
}
timeout := minPacketBatchSendTimeout
if bytes > 0 {
timeout += time.Duration(bytes/(512*1024)) * time.Second
}
if len(packets) > 512 {
timeout += time.Duration(len(packets)/512) * time.Second
}
if timeout > maxPacketBatchSendTimeout {
return maxPacketBatchSendTimeout
}
return timeout
}
func (m *Manager) ReceivePacket(timeoutMillis int) ([]byte, error) {
payload, err := m.ReceivePacketBatchPayload(timeoutMillis)
if err != nil {
return nil, err
}
packets, err := decodePacketBatchPayload(payload)
if err != nil {
return nil, err
}
if len(packets) == 0 {
return nil, nil
}
return append([]byte(nil), packets[0]...), nil
}
func (m *Manager) ReceivePacketBatchPayload(timeoutMillis int) ([]byte, error) {
transport := m.packetTransport()
if transport == nil {
var err error
transport, err = m.reconnectPacketTransport()
if err != nil || transport == nil {
return nil, fmt.Errorf("fabric vpn runtime is not connected")
}
}
timeout := time.Duration(timeoutMillis) * time.Millisecond
if timeout <= 0 {
timeout = 100 * time.Millisecond
}
ctx, cancel := context.WithTimeout(context.Background(), timeout+time.Second)
defer cancel()
packets, err := transport.ReceiveGatewayPacketBatch(ctx, timeout)
if err != nil {
m.setErr(err)
_, _ = m.reconnectPacketTransport()
return nil, err
}
if len(packets) == 0 {
return nil, nil
}
var bytes uint64
for _, packet := range packets {
bytes += uint64(len(packet))
}
m.downlinkPackets.Add(uint64(len(packets)))
m.downlinkBytes.Add(bytes)
return encodePacketBatchPayload(packets), nil
}
func (m *Manager) ControlRequest(payloadJSON string) (string, error) {
m.opMu.Lock()
defer m.opMu.Unlock()
if err := m.ensureConnectedLocked(); err != nil {
return "", err
}
m.mu.Lock()
transport := m.transport
cfg := m.cfg
endpointAddress := m.endpoint
m.mu.Unlock()
if transport == nil {
return "", fmt.Errorf("fabric control runtime is not connected")
}
candidates := prioritizeControlEndpoints(cfg.Endpoints, endpointAddress)
if len(candidates) == 0 {
return "", fmt.Errorf("fabric control runtime has no bootstrap endpoints")
}
var lastErr error
for _, endpoint := range candidates {
response, err := m.controlRequestToEndpoint(transport, cfg, endpoint, payloadJSON)
if err != nil {
lastErr = err
continue
}
if strings.TrimSpace(endpoint.Address) != "" && strings.TrimSpace(endpoint.Address) != endpointAddress {
m.mu.Lock()
m.endpoint = strings.TrimSpace(endpoint.Address)
m.mu.Unlock()
}
return response, nil
}
if lastErr != nil {
return "", lastErr
}
return "", fmt.Errorf("fabric control route unavailable")
}
func prioritizeControlEndpoints(endpoints []endpointConfig, activeAddress string) []endpointConfig {
activeAddress = strings.TrimSpace(activeAddress)
out := make([]endpointConfig, 0, len(endpoints)+1)
seen := map[string]bool{}
for _, endpoint := range endpoints {
address := strings.TrimSpace(endpoint.Address)
if address == "" || address != activeAddress {
continue
}
out = append(out, endpoint)
seen[address] = true
}
for _, endpoint := range endpoints {
address := strings.TrimSpace(endpoint.Address)
if address == "" || seen[address] {
continue
}
out = append(out, endpoint)
seen[address] = true
}
if len(out) == 0 && activeAddress != "" {
out = append(out, endpointConfig{Address: activeAddress})
}
return out
}
func (m *Manager) controlRequestToEndpoint(transport *mesh.QUICFabricTransport, cfg runtimeConfig, endpoint endpointConfig, payloadJSON string) (string, error) {
target := mesh.FabricTransportTarget{
EndpointID: firstNonEmpty(endpoint.EndpointID, endpoint.Address),
PeerID: firstNonEmpty(endpoint.NodeID, fabricRuntimeTargetNodeID(cfg)),
Endpoint: endpoint.Address,
Transport: firstNonEmpty(endpoint.Transport, "direct_quic"),
PeerCertSHA256: firstNonEmpty(endpoint.PeerCertSHA256, endpoint.TLSCertSHA256),
Timeout: 8 * time.Second,
OutboundBuffer: 16,
InboundBuffer: 16,
ErrorBuffer: 8,
}
carrier, selected, err := mesh.FabricTransportForTarget(target, transport)
if err != nil {
return "", err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
session, err := carrier.Connect(ctx, selected)
if err != nil {
return "", err
}
defer session.Close()
if err := session.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameData,
TrafficClass: fabricproto.TrafficClassReliable,
StreamID: mesh.FabricControlForwardQUICStreamID,
Sequence: uint64(time.Now().UnixNano()),
Payload: []byte(payloadJSON),
}); err != nil {
return "", err
}
for {
select {
case <-ctx.Done():
return "", ctx.Err()
case err, ok := <-session.Errors():
if !ok {
return "", fmt.Errorf("fabric control error stream closed")
}
if err != nil {
return "", err
}
case frame, ok := <-session.Frames():
if !ok {
return "", fmt.Errorf("fabric control stream closed")
}
if frame.Type != fabricproto.FrameData || frame.StreamID != mesh.FabricControlForwardQUICStreamID {
continue
}
var response controlForwardResponse
if err := json.Unmarshal(frame.Payload, &response); err != nil {
return "", err
}
if response.Error != "" {
return "", fmt.Errorf("%s", response.Error)
}
return string(response.Payload), nil
}
}
}
func (m *Manager) Reconnect() error {
m.opMu.Lock()
defer m.opMu.Unlock()
return m.reconnectLocked()
}
func (m *Manager) UpdateRuntimeConfig(configJSON string) error {
var next runtimeConfig
if err := json.Unmarshal([]byte(configJSON), &next); err != nil {
return err
}
next.ClusterID = strings.TrimSpace(next.ClusterID)
next.LocalNodeID = strings.TrimSpace(next.LocalNodeID)
next.TunnelID = strings.TrimSpace(next.TunnelID)
next.PoolID = strings.TrimSpace(next.PoolID)
next.ServiceID = strings.TrimSpace(next.ServiceID)
next.LocalServiceID = strings.TrimSpace(next.LocalServiceID)
next.RemoteServiceID = strings.TrimSpace(next.RemoteServiceID)
next.ServiceKind = strings.TrimSpace(next.ServiceKind)
next.ServiceClass = strings.TrimSpace(next.ServiceClass)
next.RouteLeaseID = strings.TrimSpace(firstNonEmpty(next.RouteLeaseID, next.RouteBundle.RouteLease.LeaseID))
next.RouteGeneration = strings.TrimSpace(firstNonEmpty(next.RouteGeneration, next.RouteBundle.RouteLease.Generation, next.RouteBundle.RouteLease.LeaseID))
next.Endpoints = fabricRuntimeEndpoints(next)
if next.StreamShards <= 0 {
next.StreamShards = defaultRuntimeStreamShards
}
if next.StreamShards > maxRuntimeStreamShards {
next.StreamShards = maxRuntimeStreamShards
}
m.opMu.Lock()
defer m.opMu.Unlock()
m.mu.Lock()
current := m.cfg
packet := m.packet
m.mu.Unlock()
if current.TunnelID != "" && next.TunnelID != "" && current.TunnelID != next.TunnelID {
return fmt.Errorf("fabric runtime config tunnel id changed from %q to %q", current.TunnelID, next.TunnelID)
}
if next.ClusterID == "" {
next.ClusterID = current.ClusterID
}
if next.LocalNodeID == "" {
next.LocalNodeID = current.LocalNodeID
}
if next.TunnelID == "" {
next.TunnelID = current.TunnelID
}
if next.PoolID == "" {
next.PoolID = current.PoolID
}
if next.ServiceID == "" {
next.ServiceID = current.ServiceID
}
if next.ServiceKind == "" {
next.ServiceKind = current.ServiceKind
}
if next.ServiceClass == "" {
next.ServiceClass = current.ServiceClass
}
if len(next.Endpoints) == 0 {
next.Endpoints = current.Endpoints
}
reconnectForRoute := shouldReconnectForRuntimeRoute(current, next)
if packet != nil {
if _, err := packet.UpdateServiceTunnel(serviceTunnelFromRuntimeConfig(next)); err != nil {
return err
}
}
m.mu.Lock()
m.cfg = next
m.lastErr = ""
m.mu.Unlock()
if reconnectForRoute {
if err := m.reconnectLocked(); err != nil {
return err
}
}
return nil
}
func shouldReconnectForRuntimeRoute(current runtimeConfig, next runtimeConfig) bool {
if current.TunnelID == "" || next.TunnelID == "" || current.TunnelID != next.TunnelID {
return false
}
if fabricRuntimeTargetNodeID(current) != fabricRuntimeTargetNodeID(next) {
return true
}
return endpointListSignature(current.Endpoints) != endpointListSignature(next.Endpoints)
}
func endpointListSignature(endpoints []endpointConfig) string {
if len(endpoints) == 0 {
return ""
}
var b strings.Builder
for _, endpoint := range endpoints {
b.WriteString(endpoint.EndpointID)
b.WriteByte('|')
b.WriteString(endpoint.NodeID)
b.WriteByte('|')
b.WriteString(endpoint.Transport)
b.WriteByte('|')
b.WriteString(endpoint.Address)
b.WriteByte('|')
b.WriteString(endpoint.PeerCertSHA256)
b.WriteByte('|')
b.WriteString(endpoint.TLSCertSHA256)
b.WriteByte('|')
b.WriteString(fmt.Sprintf("%d", endpoint.Priority))
b.WriteByte('\n')
}
return b.String()
}
func (m *Manager) packetTransport() *vpnruntime.FabricSessionPacketTransport {
m.mu.Lock()
defer m.mu.Unlock()
return m.packet
}
func (m *Manager) reconnectPacketTransport() (*vpnruntime.FabricSessionPacketTransport, error) {
m.opMu.Lock()
defer m.opMu.Unlock()
if err := m.reconnectLocked(); err != nil {
return nil, err
}
return m.packetTransport(), nil
}
func (m *Manager) ensureConnectedLocked() error {
m.mu.Lock()
connected := m.packet != nil
cancel := m.cancel
m.mu.Unlock()
if connected {
return nil
}
if cancel == nil {
return fmt.Errorf("fabric vpn runtime is stopped")
}
return m.reconnectLocked()
}
func (m *Manager) reconnectLocked() error {
m.mu.Lock()
cfg := m.cfg
oldSession := m.session
oldTransport := m.transport
oldHeartbeatCancel := m.heartbeatCancel
cancel := m.cancel
m.session = nil
m.transport = nil
m.packet = nil
m.heartbeatCancel = nil
m.mu.Unlock()
if oldHeartbeatCancel != nil {
oldHeartbeatCancel()
}
if oldSession != nil {
_ = oldSession.Close()
}
if oldTransport != nil {
_ = oldTransport.Close()
}
if cancel == nil {
return fmt.Errorf("fabric vpn runtime is stopped")
}
ctx, ctxCancel := context.WithTimeout(context.Background(), 8*time.Second)
defer ctxCancel()
if err := m.connect(ctx, cfg, cancel); err != nil {
m.setErr(err)
return err
}
return nil
}
func decodePacketBatchPayload(payload []byte) ([][]byte, error) {
if len(payload) == 0 {
return nil, nil
}
packets := make([][]byte, 0, 16)
for offset := 0; offset < len(payload); {
if len(payload)-offset < 4 {
return nil, fmt.Errorf("invalid packet batch payload: truncated length")
}
size := int(binary.BigEndian.Uint32(payload[offset : offset+4]))
offset += 4
if size <= 0 || size > 65535 {
return nil, fmt.Errorf("invalid packet batch payload: packet size %d", size)
}
if len(payload)-offset < size {
return nil, fmt.Errorf("invalid packet batch payload: truncated packet")
}
packet := append([]byte(nil), payload[offset:offset+size]...)
packets = append(packets, packet)
offset += size
}
return packets, nil
}
func encodePacketBatchPayload(packets [][]byte) []byte {
if len(packets) == 0 {
return nil
}
total := 0
for _, packet := range packets {
if len(packet) == 0 {
continue
}
total += 4 + len(packet)
}
if total == 0 {
return nil
}
payload := make([]byte, 0, total)
var size [4]byte
for _, packet := range packets {
if len(packet) == 0 {
continue
}
binary.BigEndian.PutUint32(size[:], uint32(len(packet)))
payload = append(payload, size[:]...)
payload = append(payload, packet...)
}
return payload
}
func clonePacketBatch(packets [][]byte) [][]byte {
out := make([][]byte, 0, len(packets))
for _, packet := range packets {
if len(packet) == 0 {
continue
}
out = append(out, append([]byte(nil), packet...))
}
return out
}
func (m *Manager) SnapshotJSON() string {
m.mu.Lock()
connected := m.packet != nil
endpoint := m.endpoint
lastErr := m.lastErr
tunnelID := m.cfg.TunnelID
poolID := m.cfg.PoolID
serviceID := m.cfg.ServiceID
localNodeID := m.cfg.LocalNodeID
serviceKind := m.cfg.ServiceKind
serviceClass := m.cfg.ServiceClass
routeLeaseID := m.cfg.RouteLeaseID
routeGeneration := m.cfg.RouteGeneration
var serviceStreamSnapshot map[string]any
if m.serviceStreams != nil {
serviceStreamSnapshot = m.serviceStreams.Snapshot()
}
m.mu.Unlock()
payload, _ := json.Marshal(map[string]any{
"schema_version": "rap.ipv4_tunnel_fabric_runtime.v1",
"platform_adapter": "android_vpnservice_tun",
"connected": connected,
"endpoint": endpoint,
"last_error": lastErr,
"tunnel_id": tunnelID,
"pool_id": poolID,
"service_id": serviceID,
"service_kind": serviceKind,
"service_class": serviceClass,
"route_lease_id": routeLeaseID,
"route_generation": routeGeneration,
"local_node_id": localNodeID,
"uplink_packets": m.uplinkPackets.Load(),
"uplink_bytes": m.uplinkBytes.Load(),
"downlink_packets": m.downlinkPackets.Load(),
"downlink_bytes": m.downlinkBytes.Load(),
"service_streams": serviceStreamSnapshot,
})
return string(payload)
}
func (m *Manager) setErr(err error) {
if err == nil {
return
}
m.mu.Lock()
m.lastErr = err.Error()
m.mu.Unlock()
}
func openStreams(ctx context.Context, session mesh.FabricTransportSession, shards int) (map[string][]uint64, uint64, error) {
base := uint64(time.Now().UnixNano())
classes := []struct {
name string
trafficClass fabricproto.TrafficClass
shards int
}{
{name: vpnruntime.FabricTrafficClassControl, trafficClass: fabricproto.TrafficClassControl, shards: 1},
{name: vpnruntime.FabricTrafficClassDNS, trafficClass: fabricproto.TrafficClassReliable, shards: 1},
{name: vpnruntime.FabricTrafficClassInteractive, trafficClass: fabricproto.TrafficClassInteractive, shards: shards},
{name: vpnruntime.FabricTrafficClassReliable, trafficClass: fabricproto.TrafficClassReliable, shards: maxInt(1, shards/2)},
{name: vpnruntime.FabricTrafficClassBulk, trafficClass: fabricproto.TrafficClassBulk, shards: shards},
{name: vpnruntime.FabricTrafficClassDroppable, trafficClass: fabricproto.TrafficClassDroppable, shards: maxInt(1, shards/2)},
}
out := make(map[string][]uint64, len(classes))
var primary uint64
var ordinal uint64
for classIndex, class := range classes {
_ = classIndex
for shard := 0; shard < class.shards; shard++ {
ordinal++
streamID := base + ordinal
if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FrameOpenStream, StreamID: streamID, TrafficClass: class.trafficClass}); err != nil {
return nil, 0, err
}
if primary == 0 {
primary = streamID
}
out[class.name] = append(out[class.name], streamID)
}
}
return out, primary, nil
}
func serviceTunnelFromRuntimeConfig(cfg runtimeConfig) vpnruntime.FabricServiceTunnel {
return vpnruntime.NormalizeServiceTunnel(vpnruntime.FabricServiceTunnel{
TunnelID: cfg.TunnelID,
PoolID: cfg.PoolID,
ServiceID: cfg.ServiceID,
LocalServiceID: cfg.LocalServiceID,
RemoteServiceID: cfg.RemoteServiceID,
ServiceKind: cfg.ServiceKind,
ServiceClass: cfg.ServiceClass,
ServiceRole: vpnruntime.DefaultFabricTunnelRole,
RouteLeaseID: cfg.RouteLeaseID,
RouteGeneration: cfg.RouteGeneration,
StreamShards: cfg.StreamShards,
}, cfg.TunnelID)
}
func maxInt(left, right int) int {
if left > right {
return left
}
return right
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
}
return ""
}