From 5096155d83ecb2fe14dbde945b52386be7487eba Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sun, 17 May 2026 20:39:30 +0300 Subject: [PATCH] 2 --- .../rap-node-agent/cmd/rap-node-agent/main.go | 47 ++++++++++- .../mobile/fabricvpn/fabricvpn.go | 81 +++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index b7446dd..c97940a 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -2756,6 +2756,7 @@ func applyQUICFabricConfigIfChanged(ctx context.Context, cfg config.Config, iden log.Printf("fabric_quic_reverse_event=%s", string(payload)) }, ) + meshState.VPNFabricQUICTransport.SetInboundFabricControlHandler(fabricControlForwardHandler(client.New(cfg.BackendURL))) } desiredAddr := strings.TrimSpace(cfg.MeshQUICFabricListenAddr) desiredKey := quicFabricConfigKey(cfg) @@ -2775,7 +2776,7 @@ func applyQUICFabricConfigIfChanged(ctx context.Context, cfg config.Config, iden if meshState.QUICFabricServer != nil { return } - server, addr, certSHA256, err := startQUICFabricEndpoint(ctx, cfg, identity, meshState.VPNFabricQUICTransport, vpnFabricFrameHandlerFromMeshState(meshState), productionForwardHandlerFromMeshState(identity, meshState), webIngressForwardHandlerFromConfig(cfg, identity, client.New(cfg.BackendURL)), syntheticForwardHandlerFromMeshState(meshState)) + server, addr, certSHA256, err := startQUICFabricEndpoint(ctx, cfg, identity, meshState.VPNFabricQUICTransport, vpnFabricFrameHandlerFromMeshState(meshState), productionForwardHandlerFromMeshState(identity, meshState), webIngressForwardHandlerFromConfig(cfg, identity, client.New(cfg.BackendURL)), fabricControlForwardHandler(client.New(cfg.BackendURL)), syntheticForwardHandlerFromMeshState(meshState)) meshState.QUICFabricServer = server meshState.QUICFabricConfiguredKey = desiredKey meshState.QUICFabricConfiguredListenAddr = desiredAddr @@ -2823,6 +2824,50 @@ func webIngressForwardHandlerFromConfig(cfg config.Config, identity state.Identi return receiver.Receive } +func fabricControlForwardHandler(api *client.Client) func(context.Context, []byte) ([]byte, error) { + return func(ctx context.Context, payload []byte) ([]byte, error) { + if api == nil { + return nil, fmt.Errorf("fabric control api is not configured") + } + var req client.RawControlRequest + if err := json.Unmarshal(payload, &req); err != nil { + return nil, fmt.Errorf("invalid fabric control request") + } + if !fabricControlPathAllowed(req.Method, req.Path) { + return nil, fmt.Errorf("fabric control path is not allowed") + } + resp, err := api.RawControl(ctx, req) + if err != nil { + return nil, err + } + return json.Marshal(resp) + } +} + +func fabricControlPathAllowed(method, path string) bool { + method = strings.ToUpper(strings.TrimSpace(method)) + path = strings.TrimSpace(path) + if !strings.HasPrefix(path, "/") || strings.Contains(path, "://") || strings.Contains(path, "..") { + return false + } + if method == "" { + method = http.MethodGet + } + if method == http.MethodPost && (path == "/auth/login" || path == "/auth/refresh") { + return true + } + if method == http.MethodGet && strings.HasPrefix(path, "/organizations/") { + return true + } + if method == http.MethodGet && strings.Contains(path, "/vpn/client-profile") && strings.HasPrefix(path, "/clusters/") { + return true + } + if strings.Contains(path, "/vpn/client-diagnostics/") && strings.HasPrefix(path, "/clusters/") { + return method == http.MethodGet || method == http.MethodPost + } + return false +} + func webIngressRuntimeServiceClassesFromConfig(cfg config.Config) []string { serviceClasses := strings.Split(strings.TrimSpace(cfg.WebIngressRuntimeServiceClasses), ",") out := make([]string, 0, len(serviceClasses)) diff --git a/agents/rap-node-agent/mobile/fabricvpn/fabricvpn.go b/agents/rap-node-agent/mobile/fabricvpn/fabricvpn.go index d4c4b0e..3f94ba1 100644 --- a/agents/rap-node-agent/mobile/fabricvpn/fabricvpn.go +++ b/agents/rap-node-agent/mobile/fabricvpn/fabricvpn.go @@ -39,6 +39,11 @@ type runtimeConfig struct { StreamShards int `json:"stream_shards"` } +type controlForwardResponse struct { + Payload json.RawMessage `json:"payload,omitempty"` + Error string `json:"error,omitempty"` +} + type routeBundleConfig struct { SchemaVersion string `json:"schema_version"` RouteAuthority string `json:"route_authority"` @@ -386,6 +391,82 @@ func (m *Manager) ReceivePacket(timeoutMillis int) ([]byte, error) { return packet, nil } +func (m *Manager) ControlRequest(payloadJSON string) (string, error) { + m.opMu.Lock() + defer m.opMu.Unlock() + if err := m.ensureConnectedLocked(); err != nil { + return "", err + } + m.mu.Lock() + transport := m.transport + cfg := m.cfg + endpointAddress := m.endpoint + m.mu.Unlock() + if transport == nil || endpointAddress == "" { + return "", fmt.Errorf("fabric control runtime is not connected") + } + endpoint := endpointConfig{Address: endpointAddress} + for _, candidate := range cfg.Endpoints { + if strings.TrimSpace(candidate.Address) == endpointAddress { + endpoint = candidate + break + } + } + target := mesh.FabricTransportTarget{ + EndpointID: firstNonEmpty(endpoint.EndpointID, endpoint.Address), + PeerID: firstNonEmpty(endpoint.NodeID, cfg.ExitNodeID), + Endpoint: endpoint.Address, + Transport: firstNonEmpty(endpoint.Transport, "direct_quic"), + PeerCertSHA256: firstNonEmpty(endpoint.PeerCertSHA256, endpoint.TLSCertSHA256), + Timeout: 8 * time.Second, + OutboundBuffer: 16, + InboundBuffer: 16, + ErrorBuffer: 8, + } + carrier, selected, err := mesh.FabricTransportForTarget(target, transport) + if err != nil { + return "", err + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + session, err := carrier.Connect(ctx, selected) + if err != nil { + return "", err + } + defer session.Close() + if err := session.Send(ctx, fabricproto.Frame{ + Type: fabricproto.FrameData, + TrafficClass: fabricproto.TrafficClassReliable, + StreamID: mesh.FabricControlForwardQUICStreamID, + Sequence: uint64(time.Now().UnixNano()), + Payload: []byte(payloadJSON), + }); err != nil { + return "", err + } + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + case err := <-session.Errors(): + if err != nil { + return "", err + } + case frame := <-session.Frames(): + if frame.Type != fabricproto.FrameData || frame.StreamID != mesh.FabricControlForwardQUICStreamID { + continue + } + var response controlForwardResponse + if err := json.Unmarshal(frame.Payload, &response); err != nil { + return "", err + } + if response.Error != "" { + return "", fmt.Errorf(response.Error) + } + return string(response.Payload), nil + } + } +} + func (m *Manager) Reconnect() error { m.opMu.Lock() defer m.opMu.Unlock()