package mesh import ( "bytes" "context" "crypto/sha256" "encoding/base64" "encoding/binary" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "net/http/httputil" "net/url" "strconv" "strings" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority" "github.com/gorilla/websocket" ) type ProductionEnvelopeObserver func(context.Context, ProductionEnvelopeObservation) error type ProductionEnvelopeDelivery func(context.Context, ProductionEnvelope) error type ProductionForwardLogger func(ProductionForwardLogEntry) type FabricServiceChannelAccessLogger func(FabricServiceChannelAccessLogEntry) type RemoteWorkspaceFrameSink interface { AcceptRemoteWorkspaceFrameBatchProbe(context.Context, RemoteWorkspaceFrameBatchDelivery) (RemoteWorkspaceFrameBatchDeliveryReceipt, error) } type RemoteWorkspaceFrameSinkSessionControl interface { ControlAdapterSession(action string, adapterSessionID string, reason string, now time.Time) (RemoteWorkspaceAdapterSessionControlResult, error) } type RemoteWorkspaceFrameSinkSessionSnapshot interface { SnapshotAdapterSessions(includeTerminal bool, limit int, now time.Time) RemoteWorkspaceAdapterSessionSnapshot } type RemoteWorkspaceFrameSinkSessionMailbox interface { ReadAdapterSessionMailbox(adapterSessionID string, drain bool, limit int, afterSequence int64, now time.Time) (RemoteWorkspaceAdapterMailboxSnapshot, error) } type RemoteWorkspaceFrameSinkSessionMailboxTelemetry interface { RecordAdapterSessionMailboxRead(snapshot RemoteWorkspaceAdapterMailboxSnapshot, now time.Time) } type RemoteWorkspaceFrameSinkSessionMailboxConsumer interface { RecordAdapterSessionMailboxConsumerRead(snapshot RemoteWorkspaceAdapterMailboxSnapshot, consumerID string, ackSequence int64, reset bool, now time.Time) (RemoteWorkspaceAdapterMailboxSnapshot, error) } type RemoteWorkspaceFrameSinkSessionMailboxConsumerSnapshot interface { SnapshotAdapterSessionMailboxConsumers(adapterSessionID string, limit int, now time.Time) (RemoteWorkspaceAdapterMailboxConsumerSnapshot, error) } type RemoteWorkspaceFrameSinkSessionMailboxConsumerResume interface { ResolveAdapterSessionMailboxConsumerResume(adapterSessionID string, consumerID string, resumeFrom string, now time.Time) (int64, error) } type RemoteWorkspaceFrameSinkSessionMailboxPreflight interface { PreflightAdapterSessionMailboxConsumerResume(adapterSessionID string, consumerID string, resumeFrom string, limit int, now time.Time) (RemoteWorkspaceAdapterMailboxPreflightSnapshot, error) } type VPNPacketIngress interface { SendClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, packets [][]byte) error ReceiveClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration) ([][]byte, error) } type VPNPacketIngressTrafficClass interface { SendClientPacketBatchWithTrafficClass(ctx context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte) error } type VPNPacketIngressRoutePreference interface { PreferClientRoute(routeID string) } type Server struct { Local PeerIdentity SyntheticRuntime *SyntheticRuntime ProductionForwardingEnabled bool ProductionEnvelopeObserver ProductionEnvelopeObserver ProductionEnvelopeDelivery ProductionEnvelopeDelivery ProductionForwardTransport ProductionForwardTransport ProductionForwardLogger ProductionForwardLogger FabricServiceChannelLogger FabricServiceChannelAccessLogger RemoteWorkspaceFrameSink RemoteWorkspaceFrameSink ProductionRoutes []SyntheticRoute VPNPacketIngress VPNPacketIngress BackendProxyBaseURL string ClusterAuthorityPublicKey string ServiceChannelIntrospection bool } func (s Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/mesh/v1/health", s.handleHealth) mux.HandleFunc("/mesh/v1/forward", s.handleForward) mux.HandleFunc("/mesh/v1/synthetic/probe", s.handleSyntheticProbe) if s.RemoteWorkspaceFrameSink != nil { mux.HandleFunc("/mesh/v1/remote-workspace/adapter-sessions/", s.handleRemoteWorkspaceAdapterSessionControl) } if s.VPNPacketIngress != nil || s.BackendProxyBaseURL != "" { mux.HandleFunc("/api/v1/clusters/", func(w http.ResponseWriter, r *http.Request) { if s.handleFabricServiceChannelRemoteWorkspaceIngress(w, r) { return } if s.VPNPacketIngress != nil && s.handleFabricServiceChannelVPNPacketIngress(w, r) { return } if s.VPNPacketIngress != nil && s.handleVPNPacketIngress(w, r) { return } if s.BackendProxyBaseURL != "" { s.backendProxy().ServeHTTP(w, r) return } http.NotFound(w, r) }) } if s.BackendProxyBaseURL != "" { proxy := s.backendProxy() mux.Handle("/api/v1/", proxy) mux.Handle("/api/v1", proxy) mux.Handle("/downloads/", proxy) mux.Handle("/downloads", proxy) } return mux } func (s Server) handleRemoteWorkspaceAdapterSessionControl(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodGet && isRemoteWorkspaceAdapterSessionListPath(r.URL.Path) { s.handleRemoteWorkspaceAdapterSessionSnapshot(w, r) return } if r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/mailbox/consumers") { s.handleRemoteWorkspaceAdapterSessionMailboxConsumers(w, r) return } if r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/mailbox/preflight") { s.handleRemoteWorkspaceAdapterSessionMailboxPreflight(w, r) return } if r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/mailbox") { s.handleRemoteWorkspaceAdapterSessionMailbox(w, r) return } controller, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionControl) if !ok { http.Error(w, "remote workspace adapter session control unavailable", http.StatusServiceUnavailable) return } sessionID, ok := parseRemoteWorkspaceAdapterSessionControlPath(r.URL.Path) if !ok { http.NotFound(w, r) return } if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } var request struct { Action string `json:"action"` Reason string `json:"reason,omitempty"` } if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 16*1024)).Decode(&request); err != nil { http.Error(w, "invalid remote workspace adapter session control payload", http.StatusBadRequest) return } result, err := controller.ControlAdapterSession(request.Action, sessionID, request.Reason, time.Now().UTC()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(result) } func (s Server) handleRemoteWorkspaceAdapterSessionSnapshot(w http.ResponseWriter, r *http.Request) { snapshotter, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionSnapshot) if !ok { http.Error(w, "remote workspace adapter session snapshot unavailable", http.StatusServiceUnavailable) return } includeTerminal := strings.EqualFold(r.URL.Query().Get("include_terminal"), "true") limit := 50 if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" { parsed, err := strconv.Atoi(rawLimit) if err != nil || parsed <= 0 { http.Error(w, "invalid remote workspace adapter session snapshot limit", http.StatusBadRequest) return } limit = parsed } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(snapshotter.SnapshotAdapterSessions(includeTerminal, limit, time.Now().UTC())) } func (s Server) handleRemoteWorkspaceAdapterSessionMailbox(w http.ResponseWriter, r *http.Request) { reader, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailbox) if !ok { http.Error(w, "remote workspace adapter session mailbox unavailable", http.StatusServiceUnavailable) return } sessionID, ok := parseRemoteWorkspaceAdapterSessionMailboxPath(r.URL.Path) if !ok { http.NotFound(w, r) return } drain := strings.EqualFold(r.URL.Query().Get("drain"), "true") limit := 50 if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" { parsed, err := strconv.Atoi(rawLimit) if err != nil || parsed <= 0 { http.Error(w, "invalid remote workspace adapter session mailbox limit", http.StatusBadRequest) return } limit = parsed } waitMs := 0 if rawWait := strings.TrimSpace(r.URL.Query().Get("wait_ms")); rawWait != "" { parsed, err := strconv.Atoi(rawWait) if err != nil || parsed < 0 { http.Error(w, "invalid remote workspace adapter session mailbox wait", http.StatusBadRequest) return } if parsed > 30000 { parsed = 30000 } waitMs = parsed } afterSequence := int64(0) if rawAfter := strings.TrimSpace(r.URL.Query().Get("after_sequence")); rawAfter != "" { parsed, err := strconv.ParseInt(rawAfter, 10, 64) if err != nil || parsed < 0 { http.Error(w, "invalid remote workspace adapter session mailbox after sequence", http.StatusBadRequest) return } afterSequence = parsed } if drain && afterSequence > 0 { http.Error(w, "remote workspace adapter session mailbox after sequence cannot drain", http.StatusBadRequest) return } consumerID := strings.TrimSpace(r.URL.Query().Get("consumer_id")) if consumerID != "" && !isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID) { http.Error(w, "invalid remote workspace adapter mailbox consumer", http.StatusBadRequest) return } resumeFrom := strings.TrimSpace(strings.ToLower(r.URL.Query().Get("resume_from"))) if resumeFrom != "" { switch resumeFrom { case "ack", "checkpoint": default: http.Error(w, "invalid remote workspace adapter mailbox resume cursor", http.StatusBadRequest) return } if consumerID == "" { http.Error(w, "remote workspace adapter mailbox consumer required for resume", http.StatusBadRequest) return } if afterSequence > 0 { http.Error(w, "remote workspace adapter mailbox resume cannot combine with after sequence", http.StatusBadRequest) return } if drain { http.Error(w, "remote workspace adapter mailbox resume cannot drain", http.StatusBadRequest) return } } resetConsumer := false if rawReset := strings.TrimSpace(r.URL.Query().Get("reset_consumer")); rawReset != "" { switch strings.ToLower(rawReset) { case "true": resetConsumer = true case "false": default: http.Error(w, "invalid remote workspace adapter mailbox consumer reset", http.StatusBadRequest) return } } if resetConsumer && consumerID == "" { http.Error(w, "remote workspace adapter mailbox consumer required for reset", http.StatusBadRequest) return } if resetConsumer && resumeFrom != "" { http.Error(w, "remote workspace adapter mailbox resume cannot reset consumer", http.StatusBadRequest) return } ackSequence := int64(0) if rawAck := strings.TrimSpace(r.URL.Query().Get("ack_sequence")); rawAck != "" { parsed, err := strconv.ParseInt(rawAck, 10, 64) if err != nil || parsed < 0 { http.Error(w, "invalid remote workspace adapter mailbox ack sequence", http.StatusBadRequest) return } ackSequence = parsed } resumeSequence := int64(0) if resumeFrom != "" { resolver, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxConsumerResume) if !ok { http.Error(w, "remote workspace adapter mailbox resume unavailable", http.StatusServiceUnavailable) return } resolved, err := resolver.ResolveAdapterSessionMailboxConsumerResume(sessionID, consumerID, resumeFrom, time.Now().UTC()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } afterSequence = resolved resumeSequence = resolved } mailbox, err := s.readRemoteWorkspaceAdapterSessionMailbox(r.Context(), reader, sessionID, drain, limit, afterSequence, waitMs) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if resumeFrom != "" { mailbox.ResumeFrom = resumeFrom mailbox.ResumeSequence = resumeSequence } if consumerID != "" { mailbox.ConsumerID = consumerID } if recorder, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxTelemetry); ok { recorder.RecordAdapterSessionMailboxRead(mailbox, time.Now().UTC()) } if consumerID != "" { recorder, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxConsumer) if !ok { http.Error(w, "remote workspace adapter mailbox consumer unavailable", http.StatusServiceUnavailable) return } mailbox, err = recorder.RecordAdapterSessionMailboxConsumerRead(mailbox, consumerID, ackSequence, resetConsumer, time.Now().UTC()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(mailbox) } func (s Server) handleRemoteWorkspaceAdapterSessionMailboxConsumers(w http.ResponseWriter, r *http.Request) { snapshotter, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxConsumerSnapshot) if !ok { http.Error(w, "remote workspace adapter mailbox consumer snapshot unavailable", http.StatusServiceUnavailable) return } sessionID, ok := parseRemoteWorkspaceAdapterSessionMailboxConsumersPath(r.URL.Path) if !ok { http.NotFound(w, r) return } limit := 50 if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" { parsed, err := strconv.Atoi(rawLimit) if err != nil || parsed <= 0 { http.Error(w, "invalid remote workspace adapter mailbox consumer snapshot limit", http.StatusBadRequest) return } limit = parsed } snapshot, err := snapshotter.SnapshotAdapterSessionMailboxConsumers(sessionID, limit, time.Now().UTC()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(snapshot) } func (s Server) handleRemoteWorkspaceAdapterSessionMailboxPreflight(w http.ResponseWriter, r *http.Request) { preflighter, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxPreflight) if !ok { http.Error(w, "remote workspace adapter mailbox preflight unavailable", http.StatusServiceUnavailable) return } sessionID, ok := parseRemoteWorkspaceAdapterSessionMailboxPreflightPath(r.URL.Path) if !ok { http.NotFound(w, r) return } consumerID := strings.TrimSpace(r.URL.Query().Get("consumer_id")) if consumerID == "" { http.Error(w, "remote workspace adapter mailbox consumer required for preflight", http.StatusBadRequest) return } if !isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID) { http.Error(w, "invalid remote workspace adapter mailbox consumer", http.StatusBadRequest) return } resumeFrom := strings.TrimSpace(strings.ToLower(r.URL.Query().Get("resume_from"))) if resumeFrom == "" { resumeFrom = "checkpoint" } if resumeFrom != "ack" && resumeFrom != "checkpoint" { http.Error(w, "invalid remote workspace adapter mailbox resume cursor", http.StatusBadRequest) return } limit := 50 if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" { parsed, err := strconv.Atoi(rawLimit) if err != nil || parsed <= 0 { http.Error(w, "invalid remote workspace adapter mailbox preflight limit", http.StatusBadRequest) return } limit = parsed } snapshot, err := preflighter.PreflightAdapterSessionMailboxConsumerResume(sessionID, consumerID, resumeFrom, limit, time.Now().UTC()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(snapshot) } func (s Server) readRemoteWorkspaceAdapterSessionMailbox(ctx context.Context, reader RemoteWorkspaceFrameSinkSessionMailbox, sessionID string, drain bool, limit int, afterSequence int64, waitMs int) (RemoteWorkspaceAdapterMailboxSnapshot, error) { if waitMs <= 0 { return reader.ReadAdapterSessionMailbox(sessionID, drain, limit, afterSequence, time.Now().UTC()) } deadline := time.Now().UTC().Add(time.Duration(waitMs) * time.Millisecond) waited := false for { mailbox, err := reader.ReadAdapterSessionMailbox(sessionID, drain, limit, afterSequence, time.Now().UTC()) if err != nil { return mailbox, err } if mailbox.ReturnedCount > 0 { mailbox.Waited = waited mailbox.WaitMs = waitMs return mailbox, nil } now := time.Now().UTC() if !now.Before(deadline) { mailbox.Waited = true mailbox.WaitTimeout = true mailbox.WaitMs = waitMs return mailbox, nil } waited = true sleepFor := 25 * time.Millisecond if remaining := time.Until(deadline); remaining < sleepFor { sleepFor = remaining } timer := time.NewTimer(sleepFor) select { case <-ctx.Done(): timer.Stop() return RemoteWorkspaceAdapterMailboxSnapshot{}, ctx.Err() case <-timer.C: } } } func isRemoteWorkspaceAdapterSessionListPath(path string) bool { return path == "/mesh/v1/remote-workspace/adapter-sessions" || path == "/mesh/v1/remote-workspace/adapter-sessions/" } func parseRemoteWorkspaceAdapterSessionMailboxPath(path string) (string, bool) { const prefix = "/mesh/v1/remote-workspace/adapter-sessions/" if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, "/mailbox") { return "", false } sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), "/mailbox") sessionID = strings.Trim(sessionID, "/") if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") { return "", false } return sessionID, true } func parseRemoteWorkspaceAdapterSessionMailboxConsumersPath(path string) (string, bool) { const prefix = "/mesh/v1/remote-workspace/adapter-sessions/" const suffix = "/mailbox/consumers" if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, suffix) { return "", false } sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), suffix) sessionID = strings.Trim(sessionID, "/") if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") { return "", false } return sessionID, true } func parseRemoteWorkspaceAdapterSessionMailboxPreflightPath(path string) (string, bool) { const prefix = "/mesh/v1/remote-workspace/adapter-sessions/" const suffix = "/mailbox/preflight" if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, suffix) { return "", false } sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), suffix) sessionID = strings.Trim(sessionID, "/") if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") { return "", false } return sessionID, true } func isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID string) bool { if consumerID == "" || len(consumerID) > 64 { return false } for _, ch := range consumerID { if (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch >= '0' && ch <= '9') { continue } switch ch { case '.', '_', ':', '-': continue default: return false } } return true } func parseRemoteWorkspaceAdapterSessionControlPath(path string) (string, bool) { const prefix = "/mesh/v1/remote-workspace/adapter-sessions/" if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, "/control") { return "", false } sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), "/control") sessionID = strings.Trim(sessionID, "/") if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") { return "", false } return sessionID, true } func (s Server) handleVPNPacketIngress(w http.ResponseWriter, r *http.Request) bool { if clusterID, vpnConnectionID, ok := parseVPNClientPacketWebSocketPath(r.URL.Path); ok { s.handleVPNPacketWebSocket(w, r, clusterID, "", vpnConnectionID, false, true, "") return true } clusterID, vpnConnectionID, ok := parseVPNClientPacketPath(r.URL.Path) if !ok { return false } return s.handleVPNPacketHTTP(w, r, clusterID, "", vpnConnectionID, "", false, true, "") } func (s Server) handleFabricServiceChannelRemoteWorkspaceIngress(w http.ResponseWriter, r *http.Request) bool { clusterID, channelID, resourceID, channelClass, webSocket, ok := parseFabricServiceChannelRemoteWorkspacePath(r.URL.Path) if !ok { return false } if webSocket { http.Error(w, "remote workspace service-channel websocket forwarding is not implemented", http.StatusNotImplemented) return true } decision, valid := s.validateFabricServiceChannelRequest(w, r, clusterID, channelID, resourceID, FabricServiceClassRemoteWorkspace, channelClass) if !valid { return true } w.Header().Set("X-RAP-Service-Channel-Accepted-By", decision.AcceptedBy) s.logFabricServiceChannelAccess(r, clusterID, channelID, resourceID, decision) if r.Method != http.MethodPost && r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return true } if r.Method == http.MethodPost { body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, MaxProductionEnvelopePayloadBytes)) if err != nil { http.Error(w, "invalid remote workspace probe payload", http.StatusBadRequest) return true } if len(strings.TrimSpace(string(body))) > 0 { frameProbe, err := validateRemoteWorkspaceFrameBatchProbe(body, decision.ChannelClass) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return true } payloadFlow := "validated_probe_only" adapterSessionID := remoteWorkspaceAdapterSessionID(clusterID, channelID, resourceID, decision.PreferredRouteID) var deliveryReceipt *RemoteWorkspaceFrameBatchDeliveryReceipt if s.RemoteWorkspaceFrameSink != nil { receipt, err := s.RemoteWorkspaceFrameSink.AcceptRemoteWorkspaceFrameBatchProbe(r.Context(), RemoteWorkspaceFrameBatchDelivery{ ClusterID: clusterID, ChannelID: channelID, ResourceID: resourceID, AdapterSessionID: adapterSessionID, ServiceClass: FabricServiceClassRemoteWorkspace, ChannelClass: decision.ChannelClass, AdapterContractID: frameProbe.AdapterContractID, Frames: frameProbe.Frames, AcceptedBy: decision.AcceptedBy, PreferredRouteID: decision.PreferredRouteID, }) if err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) return true } deliveryReceipt = &receipt payloadFlow = "delivered_probe_only" } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) response := map[string]any{ "schema_version": "rap.remote_workspace_service_channel_ingress_probe.v1", "accepted": true, "service_class": FabricServiceClassRemoteWorkspace, "channel_class": decision.ChannelClass, "channel_id": channelID, "resource_id": resourceID, "adapter_session_id": adapterSessionID, "data_plane": "validated", "payload_flow": payloadFlow, "frame_batch_schema": frameProbe.SchemaVersion, "frame_count": len(frameProbe.Frames), "adapter_contract_id": frameProbe.AdapterContractID, } if deliveryReceipt != nil { response["adapter_delivery"] = deliveryReceipt } _ = json.NewEncoder(w).Encode(response) return true } } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) _ = json.NewEncoder(w).Encode(map[string]any{ "schema_version": "rap.remote_workspace_service_channel_ingress_probe.v1", "accepted": true, "service_class": FabricServiceClassRemoteWorkspace, "channel_class": decision.ChannelClass, "channel_id": channelID, "resource_id": resourceID, "data_plane": "validated", "payload_flow": "not_implemented", }) return true } type remoteWorkspaceFrameBatchProbe struct { SchemaVersion string `json:"schema_version"` ProbeOnly bool `json:"probe_only"` ServiceClass string `json:"service_class"` ChannelClass string `json:"channel_class"` AdapterContractID string `json:"adapter_contract_id,omitempty"` Frames []RemoteWorkspaceFrameProbeRecord `json:"frames"` } type RemoteWorkspaceFrameProbeRecord struct { Channel string `json:"channel"` Direction string `json:"direction"` PayloadEncoding string `json:"payload_encoding,omitempty"` PayloadLength int `json:"payload_length,omitempty"` Droppable bool `json:"droppable,omitempty"` } type RemoteWorkspaceFrameBatchDelivery struct { ClusterID string `json:"cluster_id"` ChannelID string `json:"channel_id"` ResourceID string `json:"resource_id"` AdapterSessionID string `json:"adapter_session_id,omitempty"` ServiceClass string `json:"service_class"` ChannelClass string `json:"channel_class"` AdapterContractID string `json:"adapter_contract_id,omitempty"` Frames []RemoteWorkspaceFrameProbeRecord `json:"frames"` AcceptedBy string `json:"accepted_by,omitempty"` PreferredRouteID string `json:"preferred_route_id,omitempty"` } type RemoteWorkspaceFrameBatchDeliveryReceipt struct { SchemaVersion string `json:"schema_version"` Sink string `json:"sink"` Accepted bool `json:"accepted"` ProbeOnly bool `json:"probe_only"` ClusterID string `json:"cluster_id,omitempty"` ChannelID string `json:"channel_id,omitempty"` ResourceID string `json:"resource_id,omitempty"` ServiceClass string `json:"service_class"` ChannelClass string `json:"channel_class"` AdapterContractID string `json:"adapter_contract_id,omitempty"` AdapterSessionID string `json:"adapter_session_id,omitempty"` AdapterRuntimeID string `json:"adapter_runtime_id,omitempty"` SessionState string `json:"session_state,omitempty"` SessionCreatedAt string `json:"session_created_at,omitempty"` SessionBoundAt string `json:"session_bound_at,omitempty"` SessionLastActive string `json:"session_last_activity_at,omitempty"` SessionLifecycle string `json:"session_lifecycle_state,omitempty"` SessionDeliveries int64 `json:"session_delivery_count,omitempty"` SessionPressure int64 `json:"session_backpressure_count,omitempty"` MailboxDepth int `json:"mailbox_depth,omitempty"` MailboxEnqueued int64 `json:"mailbox_enqueued_total,omitempty"` FrameCount int `json:"frame_count"` QueueCapacity int `json:"queue_capacity"` QueueDepth int `json:"queue_depth"` AcceptedFrames int `json:"accepted_frames"` DroppedFrames int `json:"dropped_frames"` AckedFrames int `json:"acked_frames"` Backpressure bool `json:"backpressure"` DropPolicy string `json:"drop_policy,omitempty"` DeliverySequence int64 `json:"delivery_sequence"` DeliveredAt string `json:"delivered_at"` } func remoteWorkspaceAdapterSessionID(clusterID string, channelID string, resourceID string, preferredRouteID string) string { seed := strings.Join([]string{ strings.TrimSpace(clusterID), strings.TrimSpace(channelID), strings.TrimSpace(resourceID), strings.TrimSpace(preferredRouteID), }, "\x00") sum := sha256.Sum256([]byte(seed)) return "rap-rw-adapter-session-" + hex.EncodeToString(sum[:])[:24] } func validateRemoteWorkspaceFrameBatchProbe(payload []byte, requiredChannelClass string) (remoteWorkspaceFrameBatchProbe, error) { var decoded remoteWorkspaceFrameBatchProbe if err := json.Unmarshal(payload, &decoded); err != nil { return decoded, fmt.Errorf("invalid remote workspace frame batch probe") } if decoded.SchemaVersion != "rap.remote_workspace_frame_batch.v1" { return decoded, fmt.Errorf("unsupported remote workspace frame batch schema") } if !decoded.ProbeOnly { return decoded, fmt.Errorf("remote workspace payload forwarding is not implemented") } if strings.TrimSpace(strings.ToLower(decoded.ServiceClass)) != FabricServiceClassRemoteWorkspace { return decoded, fmt.Errorf("remote workspace frame batch service class mismatch") } requiredChannelClass = strings.TrimSpace(strings.ToLower(requiredChannelClass)) if strings.TrimSpace(strings.ToLower(decoded.ChannelClass)) != requiredChannelClass { return decoded, fmt.Errorf("remote workspace frame batch channel class mismatch") } if len(decoded.Frames) == 0 || len(decoded.Frames) > 32 { return decoded, fmt.Errorf("remote workspace frame batch probe must contain 1..32 frames") } for _, frame := range decoded.Frames { channel := strings.TrimSpace(strings.ToLower(frame.Channel)) direction := strings.TrimSpace(strings.ToLower(frame.Direction)) if !isAllowedRemoteWorkspaceAdapterFrameChannel(channel) { return decoded, fmt.Errorf("unsupported remote workspace adapter frame channel") } if !isAllowedRemoteWorkspaceAdapterFrameDirection(channel, direction) { return decoded, fmt.Errorf("unsupported remote workspace adapter frame direction") } encoding := strings.TrimSpace(strings.ToLower(frame.PayloadEncoding)) if encoding != "" && encoding != "none" && encoding != "base64" { return decoded, fmt.Errorf("unsupported remote workspace frame payload encoding") } if frame.PayloadLength < 0 || frame.PayloadLength > MaxProductionEnvelopePayloadBytes { return decoded, fmt.Errorf("remote workspace frame payload length out of bounds") } } return decoded, nil } func isAllowedRemoteWorkspaceAdapterFrameChannel(channel string) bool { switch channel { case "input", "control", "display", "cursor", "clipboard", "file_transfer", "audio", "device", "telemetry": return true default: return false } } func isAllowedRemoteWorkspaceAdapterFrameDirection(channel string, direction string) bool { switch channel { case "input": return direction == "client_to_adapter" case "display", "cursor", "audio", "telemetry": return direction == "adapter_to_client" case "control", "clipboard", "file_transfer", "device": return direction == "client_to_adapter" || direction == "adapter_to_client" || direction == "bidirectional" default: return false } } func (s Server) handleFabricServiceChannelVPNPacketIngress(w http.ResponseWriter, r *http.Request) bool { if clusterID, channelID, vpnConnectionID, ok := parseFabricServiceChannelVPNPacketWebSocketPath(r.URL.Path); ok { decision, valid := s.validateFabricServiceChannelVPNRequest(w, r, clusterID, channelID, vpnConnectionID) if !valid { return true } s.logFabricServiceChannelAccess(r, clusterID, channelID, vpnConnectionID, decision) s.preferVPNPacketIngressRoute(decision.PreferredRouteID) s.handleVPNPacketWebSocket(w, r, clusterID, channelID, vpnConnectionID, decision.ForceBackendFallback, decision.BackendFallbackAllowed(), decision.BackendRelayPolicy) return true } clusterID, channelID, vpnConnectionID, ok := parseFabricServiceChannelVPNPacketPath(r.URL.Path) if !ok { return false } decision, valid := s.validateFabricServiceChannelVPNRequest(w, r, clusterID, channelID, vpnConnectionID) if !valid { return true } w.Header().Set("X-RAP-Service-Channel-Accepted-By", decision.AcceptedBy) s.logFabricServiceChannelAccess(r, clusterID, channelID, vpnConnectionID, decision) s.preferVPNPacketIngressRoute(decision.PreferredRouteID) backendPath := "/api/v1/clusters/" + clusterID + "/vpn-connections/" + vpnConnectionID + "/tunnel/client/packets" return s.handleVPNPacketHTTP(w, r, clusterID, channelID, vpnConnectionID, backendPath, decision.ForceBackendFallback, decision.BackendFallbackAllowed(), decision.BackendRelayPolicy) } func (s Server) preferVPNPacketIngressRoute(routeID string) { routeID = strings.TrimSpace(routeID) if routeID == "" || s.VPNPacketIngress == nil { return } if preferred, ok := s.VPNPacketIngress.(VPNPacketIngressRoutePreference); ok { preferred.PreferClientRoute(routeID) } } func (s Server) handleVPNPacketHTTP(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, vpnConnectionID string, backendFallbackPath string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) bool { switch r.Method { case http.MethodPost: body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, MaxProductionVPNPacketPayloadBytes)) if err != nil { http.Error(w, "invalid vpn packet payload", http.StatusBadRequest) return true } if r.URL.Query().Get("batch") != "true" && len(body) == 0 { http.Error(w, "empty vpn packet payload", http.StatusBadRequest) return true } packets := [][]byte{body} if r.URL.Query().Get("batch") == "true" { packets, err = decodeVPNIngressPacketBatch(body) if err != nil { http.Error(w, "invalid vpn packet batch", http.StatusBadRequest) return true } } packets = cleanVPNIngressPacketBatch(packets) if len(packets) == 0 { http.Error(w, "empty vpn packet batch", http.StatusBadRequest) return true } if forceBackendFallback { if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, body, backendFallbackPath) { return true } s.logFabricServiceChannelViolation(r, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error()) http.Error(w, ErrRouteNotFound.Error(), vpnIngressStatusCode(ErrRouteNotFound)) return true } trafficClass := r.Header.Get("X-RAP-Traffic-Class") var sendErr error if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok { sendErr = classIngress.SendClientPacketBatchWithTrafficClass(r.Context(), clusterID, vpnConnectionID, trafficClass, packets) } else { sendErr = s.VPNPacketIngress.SendClientPacketBatch(r.Context(), clusterID, vpnConnectionID, packets) } if sendErr != nil { if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, body, backendFallbackPath) { return true } s.logFabricServiceChannelViolation(r, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error()) http.Error(w, sendErr.Error(), vpnIngressStatusCode(sendErr)) return true } w.WriteHeader(http.StatusAccepted) return true case http.MethodGet: if forceBackendFallback { if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, nil, backendFallbackPath) { return true } s.logFabricServiceChannelViolation(r, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error()) w.WriteHeader(http.StatusNoContent) return true } timeout := vpnIngressTimeout(r) packets, err := s.VPNPacketIngress.ReceiveClientPacketBatch(r.Context(), clusterID, vpnConnectionID, timeout) if err != nil { http.Error(w, err.Error(), vpnIngressStatusCode(err)) return true } packets = cleanVPNIngressPacketBatch(packets) if len(packets) == 0 { if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, nil, backendFallbackPath) { return true } w.WriteHeader(http.StatusNoContent) return true } if r.URL.Query().Get("batch") == "true" { w.Header().Set("Content-Type", "application/vnd.rap.vpn-packet-batch.v1") _, _ = w.Write(encodeVPNIngressPacketBatch(packets)) return true } w.Header().Set("Content-Type", "application/octet-stream") _, _ = w.Write(packets[0]) return true default: w.WriteHeader(http.StatusMethodNotAllowed) return true } } func (s Server) handleVPNPacketWebSocket(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, vpnConnectionID string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } if s.VPNPacketIngress == nil { http.Error(w, ErrForwardRuntimeUnavailable.Error(), http.StatusServiceUnavailable) return } upgrader := websocket.Upgrader{ CheckOrigin: func(_ *http.Request) bool { return true }, } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() conn.SetReadLimit(MaxProductionVPNPacketPayloadBytes) ctx, cancel := context.WithCancel(r.Context()) defer cancel() trafficClass := r.Header.Get("X-RAP-Traffic-Class") errCh := make(chan error, 2) go func() { errCh <- s.readVPNPacketWebSocket(ctx, conn, clusterID, channelID, vpnConnectionID, trafficClass, forceBackendFallback, backendFallbackAllowed, backendRelayPolicy) }() go func() { errCh <- s.writeVPNPacketWebSocket(ctx, conn, clusterID, channelID, vpnConnectionID, forceBackendFallback, backendFallbackAllowed, backendRelayPolicy) }() select { case <-ctx.Done(): case <-errCh: cancel() } } func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn, clusterID string, channelID string, vpnConnectionID string, trafficClass string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) error { for { messageType, payload, err := conn.ReadMessage() if err != nil { return err } if messageType != websocket.BinaryMessage { continue } packets, err := decodeVPNIngressPacketBatch(payload) if err != nil { return err } packets = cleanVPNIngressPacketBatch(packets) if len(packets) == 0 { continue } if forceBackendFallback { if !backendFallbackAllowed { s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error()) return ErrRouteNotFound } if proxyErr := s.backendVPNPacketPost(ctx, clusterID, vpnConnectionID, payload); proxyErr != nil { return proxyErr } continue } var sendErr error if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok { sendErr = classIngress.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, trafficClass, packets) } else { sendErr = s.VPNPacketIngress.SendClientPacketBatch(ctx, clusterID, vpnConnectionID, packets) } if sendErr != nil { if !backendFallbackAllowed { s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error()) return sendErr } if proxyErr := s.backendVPNPacketPost(ctx, clusterID, vpnConnectionID, payload); proxyErr != nil { return sendErr } } } } func (s Server) writeVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn, clusterID string, channelID string, vpnConnectionID string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) error { lastPing := time.Now() for { select { case <-ctx.Done(): return ctx.Err() default: } var packets [][]byte var err error if !forceBackendFallback { packets, err = s.VPNPacketIngress.ReceiveClientPacketBatch(ctx, clusterID, vpnConnectionID, 50*time.Millisecond) } if forceBackendFallback && !backendFallbackAllowed { s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error()) return ErrRouteNotFound } if err != nil && !backendFallbackAllowed { s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_receive_failed_backend_fallback_blocked", err.Error()) return err } if backendFallbackAllowed && (forceBackendFallback || err != nil || len(packets) == 0) { backendPackets, proxyErr := s.backendVPNPacketGet(ctx, clusterID, vpnConnectionID, 50*time.Millisecond) if proxyErr != nil && err != nil { return err } if len(backendPackets) > 0 { packets = backendPackets } } if len(packets) > 0 { if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { return err } if err := conn.WriteMessage(websocket.BinaryMessage, encodeVPNIngressPacketBatch(packets)); err != nil { return err } continue } if time.Since(lastPing) >= 15*time.Second { if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { return err } if err := conn.WriteMessage(websocket.PingMessage, []byte("rap-vpn")); err != nil { return err } lastPing = time.Now() } } } func (s Server) backendVPNPacketPost(ctx context.Context, clusterID string, vpnConnectionID string, batchPayload []byte) error { target := strings.TrimRight(strings.TrimSpace(s.BackendProxyBaseURL), "/") if target == "" { return ErrRouteNotFound } req, err := http.NewRequestWithContext(ctx, http.MethodPost, target+"/clusters/"+clusterID+"/vpn-connections/"+vpnConnectionID+"/tunnel/client/packets?batch=true", bytes.NewReader(batchPayload)) if err != nil { return err } req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID) req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID) resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("backend vpn packet post failed: status=%d", resp.StatusCode) } return nil } func (s Server) backendVPNPacketGet(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration) ([][]byte, error) { target := strings.TrimRight(strings.TrimSpace(s.BackendProxyBaseURL), "/") if target == "" { return nil, ErrRouteNotFound } if timeout <= 0 { timeout = 50 * time.Millisecond } req, err := http.NewRequestWithContext(ctx, http.MethodGet, target+"/clusters/"+clusterID+"/vpn-connections/"+vpnConnectionID+"/tunnel/client/packets?batch=true&timeout_ms="+strconv.FormatInt(timeout.Milliseconds(), 10), nil) if err != nil { return nil, err } req.Header.Set("Accept", "application/vnd.rap.vpn-packet-batch.v1") req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID) req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent { return nil, nil } if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, fmt.Errorf("backend vpn packet get failed: status=%d", resp.StatusCode) } body, err := io.ReadAll(io.LimitReader(resp.Body, MaxProductionVPNPacketPayloadBytes)) if err != nil { return nil, err } if len(body) == 0 { return nil, nil } return decodeVPNIngressPacketBatch(body) } func (s Server) proxyVPNPacketIngressToBackend(w http.ResponseWriter, r *http.Request, body []byte) bool { return s.proxyVPNPacketIngressToBackendPath(w, r, body, "") } func (s Server) proxyVPNPacketIngressToBackendPath(w http.ResponseWriter, r *http.Request, body []byte, backendPath string) bool { if strings.TrimSpace(s.BackendProxyBaseURL) == "" { return false } target, err := url.Parse(s.BackendProxyBaseURL) if err != nil || target.Scheme == "" || target.Host == "" { return false } if strings.EqualFold(target.Host, r.Host) { return false } var reader io.Reader if body != nil { reader = bytes.NewReader(body) } requestURI := r.URL.RequestURI() if backendPath != "" { requestURI = backendPath if r.URL.RawQuery != "" { requestURI += "?" + r.URL.RawQuery } } req, err := http.NewRequestWithContext(r.Context(), r.Method, target.Scheme+"://"+target.Host+requestURI, reader) if err != nil { return false } for _, key := range []string{"Accept", "Content-Type"} { if value := r.Header.Get(key); value != "" { req.Header.Set(key, value) } } req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID) req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID) resp, err := http.DefaultClient.Do(req) if err != nil { return false } defer resp.Body.Close() for _, key := range []string{"Content-Type"} { if value := resp.Header.Get(key); value != "" { w.Header().Set(key, value) } } w.WriteHeader(resp.StatusCode) _, _ = io.Copy(w, resp.Body) return true } type fabricServiceChannelLeaseAuthorityPayload struct { SchemaVersion string `json:"schema_version"` ChannelID string `json:"channel_id"` ClusterID string `json:"cluster_id"` ResourceID string `json:"resource_id,omitempty"` ServiceClass string `json:"service_class"` Status string `json:"status"` SelectedEntryNodeID string `json:"selected_entry_node_id"` SelectedExitNodeID string `json:"selected_exit_node_id"` AllowedChannels []string `json:"allowed_channels"` RouteGeneration string `json:"route_generation"` FencingEpoch int64 `json:"fencing_epoch"` TokenHash string `json:"token_hash"` IssuedAt time.Time `json:"issued_at"` ExpiresAt time.Time `json:"expires_at"` DataPlane fabricServiceChannelDataPlaneContract `json:"data_plane,omitempty"` PrimaryRoute struct { RouteID string `json:"route_id"` Status string `json:"status"` } `json:"primary_route"` } type fabricServiceChannelDataPlaneContract struct { SchemaVersion string `json:"schema_version,omitempty"` Mode string `json:"mode,omitempty"` ControlPlaneTransport string `json:"control_plane_transport,omitempty"` WorkingDataTransport string `json:"working_data_transport,omitempty"` SteadyStateTransport string `json:"steady_state_transport,omitempty"` BackendRelayPolicy string `json:"backend_relay_policy,omitempty"` ProductionForwardingRequired bool `json:"production_forwarding_required,omitempty"` ServiceNeutral bool `json:"service_neutral,omitempty"` ProtocolAgnostic bool `json:"protocol_agnostic,omitempty"` LogicalFlowMode string `json:"logical_flow_mode,omitempty"` RequiredFlowIsolationClasses []string `json:"required_flow_isolation_classes,omitempty"` RouteSelectionStrategy string `json:"route_selection_strategy,omitempty"` EntryFailoverMode string `json:"entry_failover_mode,omitempty"` ExitFailoverMode string `json:"exit_failover_mode,omitempty"` RouteRebuildMode string `json:"route_rebuild_mode,omitempty"` FailureDetectionSource string `json:"failure_detection_source,omitempty"` DegradedFallbackVisibility string `json:"degraded_fallback_visibility,omitempty"` StableContractForServiceClass string `json:"stable_contract_for_service_class,omitempty"` } type fabricServiceChannelRequestDecision struct { ForceBackendFallback bool PreferredRouteID string AcceptedBy string ServiceClass string ChannelClass string DataPlane fabricServiceChannelDataPlaneContract DataPlaneValid bool DataPlaneMode string BackendRelayPolicy string } func (d fabricServiceChannelRequestDecision) BackendFallbackAllowed() bool { return strings.TrimSpace(d.BackendRelayPolicy) != "disabled" } func (s Server) validateFabricServiceChannelVPNRequest(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, vpnConnectionID string) (fabricServiceChannelRequestDecision, bool) { return s.validateFabricServiceChannelRequest(w, r, clusterID, channelID, vpnConnectionID, FabricServiceClassVPNPackets, ProductionChannelVPNPacket) } func (s Server) validateFabricServiceChannelRequest(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, resourceID string, expectedServiceClass string, defaultChannelClass string) (fabricServiceChannelRequestDecision, bool) { var decision fabricServiceChannelRequestDecision expectedServiceClass = strings.TrimSpace(strings.ToLower(expectedServiceClass)) defaultChannelClass = strings.TrimSpace(strings.ToLower(defaultChannelClass)) if strings.TrimSpace(clusterID) == "" || strings.TrimSpace(channelID) == "" { http.Error(w, "invalid fabric service channel target", http.StatusBadRequest) return decision, false } if headerChannelID := strings.TrimSpace(r.Header.Get("X-RAP-Fabric-Channel-ID")); headerChannelID != "" && headerChannelID != channelID { http.Error(w, "fabric service channel mismatch", http.StatusForbidden) return decision, false } serviceClass := strings.TrimSpace(strings.ToLower(r.Header.Get("X-RAP-Service-Class"))) if serviceClass == "" { serviceClass = expectedServiceClass } if serviceClass != expectedServiceClass { http.Error(w, "unsupported fabric service class", http.StatusForbidden) return decision, false } channelClass := strings.TrimSpace(strings.ToLower(r.Header.Get("X-RAP-Channel-Class"))) if channelClass == "" { channelClass = defaultChannelClass } if !isAllowedFabricServiceChannelForClass(serviceClass, channelClass) { http.Error(w, "unsupported fabric service channel class", http.StatusForbidden) return decision, false } token := fabricServiceChannelBearerToken(r) if !strings.HasPrefix(token, "rap_fsc_") { http.Error(w, "fabric service channel token is required", http.StatusUnauthorized) return decision, false } payload, err := s.verifyFabricServiceChannelLeaseAuthority(r, clusterID, channelID, resourceID, serviceClass, channelClass, token) if err != nil { http.Error(w, err.Error(), http.StatusForbidden) return decision, false } decision.AcceptedBy = "legacy_unsigned" decision.ServiceClass = serviceClass decision.ChannelClass = channelClass if payload != nil && (payload.Status == "degraded_fallback" || payload.PrimaryRoute.Status == "missing_route_intent") { decision.ForceBackendFallback = true } if payload != nil { if err := validateFabricServiceChannelDataPlaneContract(payload.DataPlane, channelClass); err != nil { http.Error(w, err.Error(), http.StatusForbidden) return decision, false } decision.DataPlane = payload.DataPlane decision.DataPlaneValid = strings.TrimSpace(payload.DataPlane.SchemaVersion) != "" decision.DataPlaneMode = strings.TrimSpace(payload.DataPlane.Mode) decision.BackendRelayPolicy = strings.TrimSpace(payload.DataPlane.BackendRelayPolicy) if payload.DataPlane.Mode == "degraded_backend_fallback" { decision.ForceBackendFallback = true } } if payload != nil && !decision.ForceBackendFallback { decision.PreferredRouteID = strings.TrimSpace(payload.PrimaryRoute.RouteID) } if payload != nil && payload.SchemaVersion == "rap.fabric_service_channel_introspection.v1" { decision.AcceptedBy = "introspection" } else if payload != nil { decision.AcceptedBy = "signed" } return decision, true } type FabricServiceChannelAccessLogEntry struct { Event string `json:"event"` ClusterID string `json:"cluster_id,omitempty"` ChannelID string `json:"channel_id,omitempty"` ResourceID string `json:"resource_id,omitempty"` LocalNodeID string `json:"local_node_id,omitempty"` Method string `json:"method,omitempty"` Path string `json:"path,omitempty"` AcceptedBy string `json:"accepted_by,omitempty"` PreferredRouteID string `json:"preferred_route_id,omitempty"` ServiceClass string `json:"service_class,omitempty"` ChannelClass string `json:"channel_class,omitempty"` ForceBackendFallback bool `json:"force_backend_fallback,omitempty"` DataPlaneMode string `json:"data_plane_mode,omitempty"` WorkingDataTransport string `json:"working_data_transport,omitempty"` SteadyStateTransport string `json:"steady_state_transport,omitempty"` BackendRelayPolicy string `json:"backend_relay_policy,omitempty"` LogicalFlowMode string `json:"logical_flow_mode,omitempty"` DataPlaneValid bool `json:"data_plane_valid,omitempty"` ViolationStatus string `json:"violation_status,omitempty"` ViolationReason string `json:"violation_reason,omitempty"` OccurredAt time.Time `json:"occurred_at"` } func (s Server) logFabricServiceChannelAccess(r *http.Request, clusterID string, channelID string, resourceID string, decision fabricServiceChannelRequestDecision) { if s.FabricServiceChannelLogger == nil { return } entry := FabricServiceChannelAccessLogEntry{ Event: "fabric_service_channel_access_accepted", ClusterID: clusterID, ChannelID: channelID, ResourceID: resourceID, LocalNodeID: s.Local.NodeID, AcceptedBy: decision.AcceptedBy, PreferredRouteID: decision.PreferredRouteID, ServiceClass: decision.ServiceClass, ChannelClass: decision.ChannelClass, ForceBackendFallback: decision.ForceBackendFallback, DataPlaneMode: decision.DataPlaneMode, WorkingDataTransport: strings.TrimSpace(decision.DataPlane.WorkingDataTransport), SteadyStateTransport: strings.TrimSpace(decision.DataPlane.SteadyStateTransport), BackendRelayPolicy: decision.BackendRelayPolicy, LogicalFlowMode: strings.TrimSpace(decision.DataPlane.LogicalFlowMode), DataPlaneValid: decision.DataPlaneValid, OccurredAt: time.Now().UTC(), } if r != nil { entry.Method = r.Method if r.URL != nil { entry.Path = r.URL.Path } } s.FabricServiceChannelLogger(entry) } func (s Server) logFabricServiceChannelViolation(r *http.Request, clusterID string, channelID string, resourceID string, backendRelayPolicy string, status string, reason string) { if s.FabricServiceChannelLogger == nil || strings.TrimSpace(channelID) == "" { return } entry := FabricServiceChannelAccessLogEntry{ Event: "fabric_service_channel_data_plane_violation", ClusterID: clusterID, ChannelID: channelID, ResourceID: resourceID, LocalNodeID: s.Local.NodeID, BackendRelayPolicy: strings.TrimSpace(backendRelayPolicy), ViolationStatus: strings.TrimSpace(status), ViolationReason: strings.TrimSpace(reason), OccurredAt: time.Now().UTC(), } if r != nil { entry.Method = r.Method if r.URL != nil { entry.Path = r.URL.Path } } s.FabricServiceChannelLogger(entry) } func (s Server) verifyFabricServiceChannelLeaseAuthority(r *http.Request, clusterID string, channelID string, resourceID string, serviceClass string, channelClass string, token string) (*fabricServiceChannelLeaseAuthorityPayload, error) { publicKey := strings.TrimSpace(s.ClusterAuthorityPublicKey) payloadHeader := strings.TrimSpace(r.Header.Get("X-RAP-Service-Channel-Authority-Payload")) signatureHeader := strings.TrimSpace(r.Header.Get("X-RAP-Service-Channel-Authority-Signature")) if payloadHeader == "" && signatureHeader == "" { if publicKey != "" { if payload, ok, err := s.introspectFabricServiceChannelLease(r, clusterID, channelID, resourceID, serviceClass, channelClass, token); ok || err != nil { return payload, err } return nil, fmt.Errorf("%w: signed service channel authority is required", ErrUnauthorizedChannel) } return nil, nil } if publicKey == "" { return nil, ErrUnauthorizedChannel } if payloadHeader == "" || signatureHeader == "" { return nil, fmt.Errorf("%w: service channel authority payload and signature are required together", ErrUnauthorizedChannel) } payloadRaw, err := decodeHeaderJSON(payloadHeader) if err != nil { return nil, fmt.Errorf("%w: invalid service channel authority payload", ErrUnauthorizedChannel) } signatureRaw, err := decodeHeaderJSON(signatureHeader) if err != nil { return nil, fmt.Errorf("%w: invalid service channel authority signature", ErrUnauthorizedChannel) } var signature authority.Signature if err := json.Unmarshal(signatureRaw, &signature); err != nil { return nil, fmt.Errorf("%w: invalid service channel authority signature", ErrUnauthorizedChannel) } if err := authority.VerifyRaw(publicKey, payloadRaw, signature); err != nil { return nil, fmt.Errorf("%w: service channel authority signature rejected", ErrUnauthorizedChannel) } var payload fabricServiceChannelLeaseAuthorityPayload if err := json.Unmarshal(payloadRaw, &payload); err != nil { return nil, fmt.Errorf("%w: invalid service channel authority payload", ErrUnauthorizedChannel) } if payload.SchemaVersion != "rap.fabric_service_channel_lease_authority.v1" || payload.ClusterID != clusterID || payload.ChannelID != channelID || payload.ResourceID != resourceID || payload.ServiceClass != serviceClass || payload.TokenHash != fabricServiceChannelTokenHash(token) || !containsString(payload.AllowedChannels, channelClass) { return nil, fmt.Errorf("%w: service channel authority payload mismatch", ErrUnauthorizedChannel) } if payload.SelectedEntryNodeID != "" && s.Local.NodeID != "" && payload.SelectedEntryNodeID != s.Local.NodeID { return nil, fmt.Errorf("%w: service channel entry node mismatch", ErrUnauthorizedChannel) } if !payload.ExpiresAt.IsZero() && !payload.ExpiresAt.After(time.Now().UTC()) { return nil, fmt.Errorf("%w: service channel lease expired", ErrUnauthorizedChannel) } return &payload, nil } func validateFabricServiceChannelDataPlaneContract(contract fabricServiceChannelDataPlaneContract, requiredFlowClass string) error { if strings.TrimSpace(contract.SchemaVersion) == "" { return nil } requiredFlowClass = strings.TrimSpace(strings.ToLower(requiredFlowClass)) if contract.SchemaVersion != "rap.fabric_service_channel_data_plane.v1" || contract.WorkingDataTransport != "fabric_service_channel" || contract.SteadyStateTransport != "fabric_route" || (contract.BackendRelayPolicy != "degraded_fallback_only" && contract.BackendRelayPolicy != "disabled") || !contract.ServiceNeutral || !contract.ProtocolAgnostic || contract.LogicalFlowMode != "multi_flow_isolated" { return fmt.Errorf("%w: unsupported service channel data-plane contract", ErrUnauthorizedChannel) } if contract.Mode != "" && contract.Mode != "fabric_primary" && contract.Mode != "degraded_backend_fallback" { return fmt.Errorf("%w: unsupported service channel data-plane mode", ErrUnauthorizedChannel) } if requiredFlowClass != "" && len(contract.RequiredFlowIsolationClasses) > 0 && !containsString(contract.RequiredFlowIsolationClasses, requiredFlowClass) { return fmt.Errorf("%w: service channel data-plane missing required flow isolation", ErrUnauthorizedChannel) } return nil } type fabricServiceChannelIntrospectionResponse struct { Introspection fabricServiceChannelIntrospection `json:"fabric_service_channel_introspection"` } type fabricServiceChannelIntrospection struct { Allowed bool `json:"allowed"` Status string `json:"status"` Reason string `json:"reason"` SelectedEntryNodeID string `json:"selected_entry_node_id"` AllowedChannels []string `json:"allowed_channels"` PreferredRouteID string `json:"preferred_route_id"` ForceBackendFallback bool `json:"force_backend_fallback"` DataPlane fabricServiceChannelDataPlaneContract `json:"data_plane,omitempty"` LeaseStatus string `json:"lease_status"` PrimaryRoute struct { RouteID string `json:"route_id"` Status string `json:"status"` } `json:"primary_route"` ExpiresAt time.Time `json:"expires_at"` } func (s Server) introspectFabricServiceChannelLease(r *http.Request, clusterID string, channelID string, resourceID string, serviceClass string, channelClass string, token string) (*fabricServiceChannelLeaseAuthorityPayload, bool, error) { baseURL := strings.TrimRight(strings.TrimSpace(s.BackendProxyBaseURL), "/") if baseURL == "" { return nil, false, nil } serviceClass = strings.TrimSpace(strings.ToLower(firstNonEmpty(serviceClass, r.Header.Get("X-RAP-Service-Class"), FabricServiceClassVPNPackets))) channelClass = strings.TrimSpace(strings.ToLower(firstNonEmpty(channelClass, r.Header.Get("X-RAP-Channel-Class"), ProductionChannelVPNPacket))) path := "/clusters/" + clusterID + "/fabric/service-channels/" + channelID + "/introspect" body, _ := json.Marshal(map[string]any{ "token": token, "resource_id": resourceID, "service_class": serviceClass, "channel_class": channelClass, "entry_node_id": s.Local.NodeID, }) ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+path, bytes.NewReader(body)) if err != nil { return nil, true, fmt.Errorf("%w: service channel introspection request failed", ErrUnauthorizedChannel) } req.Header.Set("Content-Type", "application/json") req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID) req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, true, fmt.Errorf("%w: service channel introspection unavailable", ErrUnauthorizedChannel) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, true, fmt.Errorf("%w: service channel introspection denied", ErrUnauthorizedChannel) } var decoded fabricServiceChannelIntrospectionResponse if err := json.NewDecoder(io.LimitReader(resp.Body, 64*1024)).Decode(&decoded); err != nil { return nil, true, fmt.Errorf("%w: invalid service channel introspection response", ErrUnauthorizedChannel) } item := decoded.Introspection if !item.Allowed { return nil, true, fmt.Errorf("%w: service channel introspection denied", ErrUnauthorizedChannel) } if item.SelectedEntryNodeID != "" && s.Local.NodeID != "" && item.SelectedEntryNodeID != s.Local.NodeID { return nil, true, fmt.Errorf("%w: service channel entry node mismatch", ErrUnauthorizedChannel) } if !item.ExpiresAt.IsZero() && !item.ExpiresAt.After(time.Now().UTC()) { return nil, true, fmt.Errorf("%w: service channel lease expired", ErrUnauthorizedChannel) } payload := &fabricServiceChannelLeaseAuthorityPayload{ SchemaVersion: "rap.fabric_service_channel_introspection.v1", ChannelID: channelID, ClusterID: clusterID, ResourceID: resourceID, ServiceClass: serviceClass, Status: item.LeaseStatus, SelectedEntryNodeID: item.SelectedEntryNodeID, AllowedChannels: item.AllowedChannels, DataPlane: item.DataPlane, ExpiresAt: item.ExpiresAt, } payload.PrimaryRoute.RouteID = strings.TrimSpace(firstNonEmpty(item.PreferredRouteID, item.PrimaryRoute.RouteID)) payload.PrimaryRoute.Status = item.PrimaryRoute.Status if item.ForceBackendFallback { payload.Status = "degraded_fallback" if payload.PrimaryRoute.Status == "" { payload.PrimaryRoute.Status = "missing_route_intent" } } return payload, true, nil } func decodeHeaderJSON(value string) (json.RawMessage, error) { if value == "" { return nil, fmt.Errorf("empty header") } if decoded, err := base64.RawURLEncoding.DecodeString(value); err == nil { return json.RawMessage(decoded), nil } if decoded, err := base64.StdEncoding.DecodeString(value); err == nil { return json.RawMessage(decoded), nil } return json.RawMessage(value), nil } func fabricServiceChannelTokenHash(token string) string { sum := sha256.Sum256([]byte(strings.TrimSpace(token))) return hex.EncodeToString(sum[:]) } func fabricServiceChannelBearerToken(r *http.Request) string { if r == nil { return "" } if token := strings.TrimSpace(r.Header.Get("X-RAP-Service-Channel-Token")); token != "" { return token } auth := strings.TrimSpace(r.Header.Get("Authorization")) if len(auth) > len("Bearer ") && strings.EqualFold(auth[:len("Bearer ")], "Bearer ") { return strings.TrimSpace(auth[len("Bearer "):]) } return strings.TrimSpace(r.URL.Query().Get("service_channel_token")) } func isAllowedFabricServiceVPNChannel(channel string) bool { return isAllowedFabricServiceChannelForClass(FabricServiceClassVPNPackets, channel) } func isAllowedFabricServiceChannelForClass(serviceClass string, channel string) bool { serviceClass = strings.TrimSpace(strings.ToLower(serviceClass)) channel = strings.TrimSpace(strings.ToLower(channel)) switch channel { case ProductionChannelVPNPacket, FabricServiceChannelBulk, FabricServiceChannelControl, FabricServiceChannelInteractive, FabricServiceChannelReliable, FabricServiceChannelDroppable: if serviceClass == FabricServiceClassRemoteWorkspace && channel == ProductionChannelVPNPacket { return false } return channel != "" } return false } func containsString(values []string, target string) bool { for _, value := range values { if value == target { return true } } return false } func parseFabricServiceChannelVPNPacketWebSocketPath(path string) (string, string, string, bool) { parts := strings.Split(strings.Trim(path, "/"), "/") if len(parts) != 11 || parts[0] != "api" || parts[1] != "v1" || parts[2] != "clusters" || parts[4] != "fabric" || parts[5] != "service-channels" || parts[7] != "vpn-connections" || parts[9] != "packets" || parts[10] != "ws" { return "", "", "", false } if parts[3] == "" || parts[6] == "" || parts[8] == "" { return "", "", "", false } return parts[3], parts[6], parts[8], true } func parseFabricServiceChannelRemoteWorkspacePath(path string) (string, string, string, string, bool, bool) { parts := strings.Split(strings.Trim(path, "/"), "/") if len(parts) == 11 && parts[0] == "api" && parts[1] == "v1" && parts[2] == "clusters" && parts[4] == "fabric" && parts[5] == "service-channels" && parts[7] == "remote-workspaces" && parts[9] == "streams" && parts[10] == "ws" && parts[3] != "" && parts[6] != "" && parts[8] != "" { return parts[3], parts[6], parts[8], FabricServiceChannelInteractive, true, true } if len(parts) != 11 || parts[0] != "api" || parts[1] != "v1" || parts[2] != "clusters" || parts[4] != "fabric" || parts[5] != "service-channels" || parts[7] != "remote-workspaces" || parts[9] != "streams" { return "", "", "", "", false, false } if parts[3] == "" || parts[6] == "" || parts[8] == "" || parts[10] == "" { return "", "", "", "", false, false } return parts[3], parts[6], parts[8], strings.TrimSpace(strings.ToLower(parts[10])), false, true } func parseFabricServiceChannelVPNPacketPath(path string) (string, string, string, bool) { parts := strings.Split(strings.Trim(path, "/"), "/") if len(parts) != 10 || parts[0] != "api" || parts[1] != "v1" || parts[2] != "clusters" || parts[4] != "fabric" || parts[5] != "service-channels" || parts[7] != "vpn-connections" || parts[9] != "packets" { return "", "", "", false } if parts[3] == "" || parts[6] == "" || parts[8] == "" { return "", "", "", false } return parts[3], parts[6], parts[8], true } func parseVPNClientPacketWebSocketPath(path string) (string, string, bool) { parts := strings.Split(strings.Trim(path, "/"), "/") if len(parts) != 10 || parts[0] != "api" || parts[1] != "v1" || parts[2] != "clusters" || parts[4] != "vpn-connections" || parts[6] != "tunnel" || parts[7] != "client" || parts[8] != "packets" || parts[9] != "ws" { return "", "", false } if parts[3] == "" || parts[5] == "" { return "", "", false } return parts[3], parts[5], true } func parseVPNClientPacketPath(path string) (string, string, bool) { parts := strings.Split(strings.Trim(path, "/"), "/") if len(parts) != 9 || parts[0] != "api" || parts[1] != "v1" || parts[2] != "clusters" || parts[4] != "vpn-connections" || parts[6] != "tunnel" || parts[7] != "client" || parts[8] != "packets" { return "", "", false } if parts[3] == "" || parts[5] == "" { return "", "", false } return parts[3], parts[5], true } func vpnIngressTimeout(r *http.Request) time.Duration { timeoutMs, _ := strconv.Atoi(r.URL.Query().Get("timeout_ms")) if timeoutMs <= 0 { timeoutMs = 25000 } if timeoutMs > 30000 { timeoutMs = 30000 } return time.Duration(timeoutMs) * time.Millisecond } func vpnIngressStatusCode(err error) int { switch err { case ErrForwardRuntimeUnavailable, ErrRouteNotFound, ErrForwardPeerUnavailable: return http.StatusServiceUnavailable case ErrUnauthorizedChannel, ErrClusterMismatch, ErrNodeMismatch: return http.StatusForbidden default: return http.StatusBadGateway } } func encodeVPNIngressPacketBatch(packets [][]byte) []byte { packets = cleanVPNIngressPacketBatch(packets) total := 0 for _, packet := range packets { total += 4 + len(packet) } out := make([]byte, total) offset := 0 for _, packet := range packets { binary.BigEndian.PutUint32(out[offset:offset+4], uint32(len(packet))) offset += 4 copy(out[offset:offset+len(packet)], packet) offset += len(packet) } return out } func cleanVPNIngressPacketBatch(packets [][]byte) [][]byte { if len(packets) == 0 { return nil } cleaned := make([][]byte, 0, len(packets)) for _, packet := range packets { if len(packet) == 0 { continue } cleaned = append(cleaned, packet) } return cleaned } func decodeVPNIngressPacketBatch(payload []byte) ([][]byte, error) { var packets [][]byte for offset := 0; offset < len(payload); { if offset+4 > len(payload) { return nil, fmt.Errorf("%w: truncated vpn packet batch header", ErrForwardEnvelopeInvalid) } size := int(binary.BigEndian.Uint32(payload[offset : offset+4])) offset += 4 if size <= 0 || size > 65535 { return nil, fmt.Errorf("%w: invalid vpn packet batch item size", ErrForwardEnvelopeInvalid) } if offset+size > len(payload) { return nil, fmt.Errorf("%w: truncated vpn packet batch item", ErrForwardEnvelopeInvalid) } packets = append(packets, append([]byte(nil), payload[offset:offset+size]...)) offset += size } if len(packets) == 0 { return nil, fmt.Errorf("%w: empty vpn packet batch", ErrForwardEnvelopeInvalid) } return packets, nil } func (s Server) backendProxy() http.Handler { target, err := url.Parse(s.BackendProxyBaseURL) if err != nil || target.Scheme == "" || target.Host == "" { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { http.Error(w, "backend proxy misconfigured", http.StatusServiceUnavailable) }) } proxy := &httputil.ReverseProxy{ Director: func(r *http.Request) { r.URL.Scheme = target.Scheme r.URL.Host = target.Host r.Host = target.Host r.Header.Set("X-RAP-Entry-Node", s.Local.NodeID) r.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID) }, } return proxy } func (s Server) handleHealth(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } var message HealthMessage if err := json.NewDecoder(r.Body).Decode(&message); err != nil { http.Error(w, "invalid health message", http.StatusBadRequest) return } if message.ProtocolVersion != ProtocolVersion { http.Error(w, "unsupported mesh protocol version", http.StatusBadRequest) return } if err := ValidatePeer(s.Local, message.From); err != nil { http.Error(w, err.Error(), http.StatusForbidden) return } if message.To.NodeID != "" && message.To.NodeID != s.Local.NodeID { http.Error(w, ErrNodeMismatch.Error(), http.StatusForbidden) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(HealthAck{ ProtocolVersion: ProtocolVersion, Accepted: true, By: s.Local, }) } func (s Server) handleForward(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } if !s.ProductionForwardingEnabled { s.logProductionForward(ProductionForwardLogEntry{ Event: "production_forward_rejected", ClusterID: s.Local.ClusterID, LocalNodeID: s.Local.NodeID, Reason: ErrForwardDisabled.Error(), StatusCode: http.StatusNotImplemented, OccurredAt: time.Now().UTC(), }) http.Error(w, ErrForwardDisabled.Error(), http.StatusNotImplemented) return } var envelope ProductionEnvelope if err := json.NewDecoder(r.Body).Decode(&envelope); err != nil { s.logProductionForward(ProductionForwardLogEntry{ Event: "production_forward_rejected", ClusterID: s.Local.ClusterID, LocalNodeID: s.Local.NodeID, Reason: "invalid production mesh envelope", StatusCode: http.StatusBadRequest, OccurredAt: time.Now().UTC(), }) http.Error(w, "invalid production mesh envelope", http.StatusBadRequest) return } if err := ValidateProductionEnvelope(s.Local, envelope, time.Now().UTC()); err != nil { s.rejectProductionForward(w, envelope, err, forwardStatusCode(err)) return } if err := ValidateProductionEnvelopeRouteConfig(s.Local, envelope, s.ProductionRoutes, time.Now().UTC()); err != nil { s.rejectProductionForward(w, envelope, err, forwardStatusCode(err)) return } s.logProductionForward(productionForwardLogEntry("production_forward_accepted", s.Local, envelope, "", 0)) if s.ProductionEnvelopeObserver != nil { observation := NewProductionEnvelopeObservation(envelope, time.Now().UTC()) if err := observeProductionEnvelope(r.Context(), s.ProductionEnvelopeObserver, observation); err != nil { s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardObservationFailed.Error(), http.StatusInternalServerError)) http.Error(w, ErrForwardObservationFailed.Error(), http.StatusInternalServerError) return } } if envelope.DestinationNodeID == s.Local.NodeID { if err := deliverProductionEnvelope(r.Context(), s.ProductionEnvelopeDelivery, envelope); err != nil { s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardDeliveryFailed.Error(), http.StatusInternalServerError)) http.Error(w, ErrForwardDeliveryFailed.Error(), http.StatusInternalServerError) return } s.logProductionForward(productionForwardLogEntry("production_forward_delivered", s.Local, envelope, "", http.StatusOK)) writeProductionForwardResult(w, ProductionForwardResult{ Accepted: true, Delivered: true, By: s.Local, MessageID: envelope.MessageID, RouteID: envelope.RouteID, }) return } if envelope.NextHopNodeID == s.Local.NodeID { s.rejectProductionForward(w, envelope, ErrLoopDetected, forwardStatusCode(ErrLoopDetected)) return } if len(envelope.RoutePath) == 0 && envelope.NextHopNodeID != envelope.DestinationNodeID { s.rejectProductionForward(w, envelope, ErrForwardRuntimeUnavailable, http.StatusNotImplemented) return } if s.ProductionForwardTransport == nil { s.rejectProductionForward(w, envelope, ErrForwardRuntimeUnavailable, http.StatusNotImplemented) return } if envelope.TTL <= 1 { s.rejectProductionForward(w, envelope, ErrTTLExhausted, forwardStatusCode(ErrTTLExhausted)) return } forwarded := envelope forwarded.CurrentHopNodeID = envelope.NextHopNodeID forwarded.NextHopNodeID = nextProductionHopAfter(envelope.RoutePath, envelope.NextHopNodeID, envelope.DestinationNodeID) forwarded.TTL = envelope.TTL - 1 forwarded.HopCount = envelope.HopCount + 1 forwarded.VisitedNodeIDs = append(append([]string{}, envelope.VisitedNodeIDs...), s.Local.NodeID) result, err := s.ProductionForwardTransport.SendProduction(r.Context(), envelope.NextHopNodeID, forwarded) if err != nil { s.rejectProductionForward(w, envelope, err, forwardStatusCode(err)) return } s.logProductionForward(productionForwardLogEntry("production_forward_forwarded", s.Local, envelope, "", http.StatusOK)) result.Accepted = true result.Forwarded = true result.By = s.Local result.MessageID = envelope.MessageID result.RouteID = envelope.RouteID result.NextNodeID = envelope.NextHopNodeID writeProductionForwardResult(w, result) } func (s Server) rejectProductionForward(w http.ResponseWriter, envelope ProductionEnvelope, err error, statusCode int) { s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, err.Error(), statusCode)) http.Error(w, err.Error(), statusCode) } func (s Server) logProductionForward(entry ProductionForwardLogEntry) { if s.ProductionForwardLogger == nil { return } if entry.OccurredAt.IsZero() { entry.OccurredAt = time.Now().UTC() } s.ProductionForwardLogger(entry) } func productionForwardLogEntry(event string, local PeerIdentity, envelope ProductionEnvelope, reason string, statusCode int) ProductionForwardLogEntry { return ProductionForwardLogEntry{ Event: event, RouteID: envelope.RouteID, MessageID: envelope.MessageID, ClusterID: envelope.ClusterID, LocalNodeID: local.NodeID, SourceNodeID: envelope.SourceNodeID, DestinationNodeID: envelope.DestinationNodeID, CurrentHopNodeID: envelope.CurrentHopNodeID, NextHopNodeID: envelope.NextHopNodeID, ChannelClass: envelope.ChannelClass, MessageType: envelope.MessageType, Reason: reason, StatusCode: statusCode, TTL: envelope.TTL, HopCount: envelope.HopCount, RoutePathLength: len(envelope.RoutePath), VisitedCount: len(envelope.VisitedNodeIDs), PayloadLength: envelope.PayloadLength, OccurredAt: time.Now().UTC(), } } func nextProductionHopAfter(routePath []string, currentNodeID string, destinationNodeID string) string { if len(routePath) == 0 { return destinationNodeID } for index, nodeID := range routePath { if nodeID == currentNodeID { if index >= len(routePath)-1 { return currentNodeID } return routePath[index+1] } } return destinationNodeID } func writeProductionForwardResult(w http.ResponseWriter, result ProductionForwardResult) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(result) } func observeProductionEnvelope(ctx context.Context, observer ProductionEnvelopeObserver, observation ProductionEnvelopeObservation) (err error) { if observer == nil { return nil } defer func() { if recover() != nil { err = ErrForwardObservationFailed } }() return observer(ctx, observation) } func deliverProductionEnvelope(ctx context.Context, delivery ProductionEnvelopeDelivery, envelope ProductionEnvelope) (err error) { if delivery == nil { return nil } defer func() { if recover() != nil { err = ErrForwardDeliveryFailed } }() return delivery(ctx, envelope) } func (s Server) handleSyntheticProbe(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } if s.SyntheticRuntime == nil { http.Error(w, ErrMeshRuntimeDisabled.Error(), http.StatusServiceUnavailable) return } var envelope SyntheticEnvelope if err := json.NewDecoder(r.Body).Decode(&envelope); err != nil { http.Error(w, "invalid synthetic mesh envelope", http.StatusBadRequest) return } ack, err := s.SyntheticRuntime.Receive(r.Context(), envelope) if err != nil { http.Error(w, err.Error(), syntheticStatusCode(err)) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(ack) } func NewHealthMessage(from, to PeerIdentity) HealthMessage { status := "reachable" return HealthMessage{ ProtocolVersion: ProtocolVersion, From: from, To: to, ObservedAt: time.Now().UTC(), LinkStatus: status, } } func syntheticStatusCode(err error) int { switch err { case ErrClusterMismatch, ErrNodeMismatch, ErrUnauthorizedChannel, ErrLoopDetected: return http.StatusForbidden case ErrMeshRuntimeDisabled: return http.StatusServiceUnavailable case ErrRouteExpired, ErrTTLExhausted, ErrInvalidRoutePath, ErrUnsupportedSyntheticMessage, ErrRouteIDRequired: return http.StatusBadRequest case ErrRouteNotFound, ErrSyntheticPeerUnavailable: return http.StatusNotFound default: return http.StatusBadRequest } } func forwardStatusCode(err error) int { switch err { case ErrClusterMismatch, ErrNodeMismatch, ErrUnauthorizedChannel, ErrLoopDetected: return http.StatusForbidden case ErrRouteExpired, ErrTTLExhausted, ErrInvalidRoutePath, ErrRouteIDRequired: return http.StatusBadRequest case ErrForwardRuntimeUnavailable: return http.StatusNotImplemented case ErrRouteNotFound: return http.StatusNotFound case ErrForwardPeerUnavailable: return http.StatusBadGateway default: return http.StatusBadRequest } }