Files
rdp-proxy/agents/rap-node-agent/internal/mesh/server.go
T

2118 lines
81 KiB
Go

package mesh
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/authority"
"github.com/gorilla/websocket"
)
type ProductionEnvelopeObserver func(context.Context, ProductionEnvelopeObservation) error
type ProductionEnvelopeDelivery func(context.Context, ProductionEnvelope) error
type ProductionForwardLogger func(ProductionForwardLogEntry)
type FabricServiceChannelAccessLogger func(FabricServiceChannelAccessLogEntry)
type RemoteWorkspaceFrameSink interface {
AcceptRemoteWorkspaceFrameBatchProbe(context.Context, RemoteWorkspaceFrameBatchDelivery) (RemoteWorkspaceFrameBatchDeliveryReceipt, error)
}
type RemoteWorkspaceFrameSinkSessionControl interface {
ControlAdapterSession(action string, adapterSessionID string, reason string, now time.Time) (RemoteWorkspaceAdapterSessionControlResult, error)
}
type RemoteWorkspaceFrameSinkSessionSnapshot interface {
SnapshotAdapterSessions(includeTerminal bool, limit int, now time.Time) RemoteWorkspaceAdapterSessionSnapshot
}
type RemoteWorkspaceFrameSinkSessionMailbox interface {
ReadAdapterSessionMailbox(adapterSessionID string, drain bool, limit int, afterSequence int64, now time.Time) (RemoteWorkspaceAdapterMailboxSnapshot, error)
}
type RemoteWorkspaceFrameSinkSessionMailboxTelemetry interface {
RecordAdapterSessionMailboxRead(snapshot RemoteWorkspaceAdapterMailboxSnapshot, now time.Time)
}
type RemoteWorkspaceFrameSinkSessionMailboxConsumer interface {
RecordAdapterSessionMailboxConsumerRead(snapshot RemoteWorkspaceAdapterMailboxSnapshot, consumerID string, ackSequence int64, reset bool, now time.Time) (RemoteWorkspaceAdapterMailboxSnapshot, error)
}
type RemoteWorkspaceFrameSinkSessionMailboxConsumerSnapshot interface {
SnapshotAdapterSessionMailboxConsumers(adapterSessionID string, limit int, now time.Time) (RemoteWorkspaceAdapterMailboxConsumerSnapshot, error)
}
type RemoteWorkspaceFrameSinkSessionMailboxConsumerResume interface {
ResolveAdapterSessionMailboxConsumerResume(adapterSessionID string, consumerID string, resumeFrom string, now time.Time) (int64, error)
}
type RemoteWorkspaceFrameSinkSessionMailboxPreflight interface {
PreflightAdapterSessionMailboxConsumerResume(adapterSessionID string, consumerID string, resumeFrom string, limit int, now time.Time) (RemoteWorkspaceAdapterMailboxPreflightSnapshot, error)
}
type VPNPacketIngress interface {
SendClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, packets [][]byte) error
ReceiveClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration) ([][]byte, error)
}
type VPNPacketIngressTrafficClass interface {
SendClientPacketBatchWithTrafficClass(ctx context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte) error
}
type VPNPacketIngressRoutePreference interface {
PreferClientRoute(routeID string)
}
type Server struct {
Local PeerIdentity
SyntheticRuntime *SyntheticRuntime
ProductionForwardingEnabled bool
ProductionEnvelopeObserver ProductionEnvelopeObserver
ProductionEnvelopeDelivery ProductionEnvelopeDelivery
ProductionForwardTransport ProductionForwardTransport
ProductionForwardLogger ProductionForwardLogger
FabricServiceChannelLogger FabricServiceChannelAccessLogger
RemoteWorkspaceFrameSink RemoteWorkspaceFrameSink
ProductionRoutes []SyntheticRoute
VPNPacketIngress VPNPacketIngress
BackendProxyBaseURL string
ClusterAuthorityPublicKey string
ServiceChannelIntrospection bool
}
func (s Server) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/mesh/v1/health", s.handleHealth)
mux.HandleFunc("/mesh/v1/forward", s.handleForward)
mux.HandleFunc("/mesh/v1/synthetic/probe", s.handleSyntheticProbe)
if s.RemoteWorkspaceFrameSink != nil {
mux.HandleFunc("/mesh/v1/remote-workspace/adapter-sessions/", s.handleRemoteWorkspaceAdapterSessionControl)
}
if s.VPNPacketIngress != nil || s.BackendProxyBaseURL != "" {
mux.HandleFunc("/api/v1/clusters/", func(w http.ResponseWriter, r *http.Request) {
if s.handleFabricServiceChannelRemoteWorkspaceIngress(w, r) {
return
}
if s.VPNPacketIngress != nil && s.handleFabricServiceChannelVPNPacketIngress(w, r) {
return
}
if s.VPNPacketIngress != nil && s.handleVPNPacketIngress(w, r) {
return
}
if s.BackendProxyBaseURL != "" {
s.backendProxy().ServeHTTP(w, r)
return
}
http.NotFound(w, r)
})
}
if s.BackendProxyBaseURL != "" {
proxy := s.backendProxy()
mux.Handle("/api/v1/", proxy)
mux.Handle("/api/v1", proxy)
mux.Handle("/downloads/", proxy)
mux.Handle("/downloads", proxy)
}
return mux
}
func (s Server) handleRemoteWorkspaceAdapterSessionControl(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet && isRemoteWorkspaceAdapterSessionListPath(r.URL.Path) {
s.handleRemoteWorkspaceAdapterSessionSnapshot(w, r)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/mailbox/consumers") {
s.handleRemoteWorkspaceAdapterSessionMailboxConsumers(w, r)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/mailbox/preflight") {
s.handleRemoteWorkspaceAdapterSessionMailboxPreflight(w, r)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/mailbox") {
s.handleRemoteWorkspaceAdapterSessionMailbox(w, r)
return
}
controller, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionControl)
if !ok {
http.Error(w, "remote workspace adapter session control unavailable", http.StatusServiceUnavailable)
return
}
sessionID, ok := parseRemoteWorkspaceAdapterSessionControlPath(r.URL.Path)
if !ok {
http.NotFound(w, r)
return
}
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var request struct {
Action string `json:"action"`
Reason string `json:"reason,omitempty"`
}
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 16*1024)).Decode(&request); err != nil {
http.Error(w, "invalid remote workspace adapter session control payload", http.StatusBadRequest)
return
}
result, err := controller.ControlAdapterSession(request.Action, sessionID, request.Reason, time.Now().UTC())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(result)
}
func (s Server) handleRemoteWorkspaceAdapterSessionSnapshot(w http.ResponseWriter, r *http.Request) {
snapshotter, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionSnapshot)
if !ok {
http.Error(w, "remote workspace adapter session snapshot unavailable", http.StatusServiceUnavailable)
return
}
includeTerminal := strings.EqualFold(r.URL.Query().Get("include_terminal"), "true")
limit := 50
if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" {
parsed, err := strconv.Atoi(rawLimit)
if err != nil || parsed <= 0 {
http.Error(w, "invalid remote workspace adapter session snapshot limit", http.StatusBadRequest)
return
}
limit = parsed
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(snapshotter.SnapshotAdapterSessions(includeTerminal, limit, time.Now().UTC()))
}
func (s Server) handleRemoteWorkspaceAdapterSessionMailbox(w http.ResponseWriter, r *http.Request) {
reader, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailbox)
if !ok {
http.Error(w, "remote workspace adapter session mailbox unavailable", http.StatusServiceUnavailable)
return
}
sessionID, ok := parseRemoteWorkspaceAdapterSessionMailboxPath(r.URL.Path)
if !ok {
http.NotFound(w, r)
return
}
drain := strings.EqualFold(r.URL.Query().Get("drain"), "true")
limit := 50
if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" {
parsed, err := strconv.Atoi(rawLimit)
if err != nil || parsed <= 0 {
http.Error(w, "invalid remote workspace adapter session mailbox limit", http.StatusBadRequest)
return
}
limit = parsed
}
waitMs := 0
if rawWait := strings.TrimSpace(r.URL.Query().Get("wait_ms")); rawWait != "" {
parsed, err := strconv.Atoi(rawWait)
if err != nil || parsed < 0 {
http.Error(w, "invalid remote workspace adapter session mailbox wait", http.StatusBadRequest)
return
}
if parsed > 30000 {
parsed = 30000
}
waitMs = parsed
}
afterSequence := int64(0)
if rawAfter := strings.TrimSpace(r.URL.Query().Get("after_sequence")); rawAfter != "" {
parsed, err := strconv.ParseInt(rawAfter, 10, 64)
if err != nil || parsed < 0 {
http.Error(w, "invalid remote workspace adapter session mailbox after sequence", http.StatusBadRequest)
return
}
afterSequence = parsed
}
if drain && afterSequence > 0 {
http.Error(w, "remote workspace adapter session mailbox after sequence cannot drain", http.StatusBadRequest)
return
}
consumerID := strings.TrimSpace(r.URL.Query().Get("consumer_id"))
if consumerID != "" && !isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID) {
http.Error(w, "invalid remote workspace adapter mailbox consumer", http.StatusBadRequest)
return
}
resumeFrom := strings.TrimSpace(strings.ToLower(r.URL.Query().Get("resume_from")))
if resumeFrom != "" {
switch resumeFrom {
case "ack", "checkpoint":
default:
http.Error(w, "invalid remote workspace adapter mailbox resume cursor", http.StatusBadRequest)
return
}
if consumerID == "" {
http.Error(w, "remote workspace adapter mailbox consumer required for resume", http.StatusBadRequest)
return
}
if afterSequence > 0 {
http.Error(w, "remote workspace adapter mailbox resume cannot combine with after sequence", http.StatusBadRequest)
return
}
if drain {
http.Error(w, "remote workspace adapter mailbox resume cannot drain", http.StatusBadRequest)
return
}
}
resetConsumer := false
if rawReset := strings.TrimSpace(r.URL.Query().Get("reset_consumer")); rawReset != "" {
switch strings.ToLower(rawReset) {
case "true":
resetConsumer = true
case "false":
default:
http.Error(w, "invalid remote workspace adapter mailbox consumer reset", http.StatusBadRequest)
return
}
}
if resetConsumer && consumerID == "" {
http.Error(w, "remote workspace adapter mailbox consumer required for reset", http.StatusBadRequest)
return
}
if resetConsumer && resumeFrom != "" {
http.Error(w, "remote workspace adapter mailbox resume cannot reset consumer", http.StatusBadRequest)
return
}
ackSequence := int64(0)
if rawAck := strings.TrimSpace(r.URL.Query().Get("ack_sequence")); rawAck != "" {
parsed, err := strconv.ParseInt(rawAck, 10, 64)
if err != nil || parsed < 0 {
http.Error(w, "invalid remote workspace adapter mailbox ack sequence", http.StatusBadRequest)
return
}
ackSequence = parsed
}
resumeSequence := int64(0)
if resumeFrom != "" {
resolver, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxConsumerResume)
if !ok {
http.Error(w, "remote workspace adapter mailbox resume unavailable", http.StatusServiceUnavailable)
return
}
resolved, err := resolver.ResolveAdapterSessionMailboxConsumerResume(sessionID, consumerID, resumeFrom, time.Now().UTC())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
afterSequence = resolved
resumeSequence = resolved
}
mailbox, err := s.readRemoteWorkspaceAdapterSessionMailbox(r.Context(), reader, sessionID, drain, limit, afterSequence, waitMs)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if resumeFrom != "" {
mailbox.ResumeFrom = resumeFrom
mailbox.ResumeSequence = resumeSequence
}
if consumerID != "" {
mailbox.ConsumerID = consumerID
}
if recorder, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxTelemetry); ok {
recorder.RecordAdapterSessionMailboxRead(mailbox, time.Now().UTC())
}
if consumerID != "" {
recorder, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxConsumer)
if !ok {
http.Error(w, "remote workspace adapter mailbox consumer unavailable", http.StatusServiceUnavailable)
return
}
mailbox, err = recorder.RecordAdapterSessionMailboxConsumerRead(mailbox, consumerID, ackSequence, resetConsumer, time.Now().UTC())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(mailbox)
}
func (s Server) handleRemoteWorkspaceAdapterSessionMailboxConsumers(w http.ResponseWriter, r *http.Request) {
snapshotter, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxConsumerSnapshot)
if !ok {
http.Error(w, "remote workspace adapter mailbox consumer snapshot unavailable", http.StatusServiceUnavailable)
return
}
sessionID, ok := parseRemoteWorkspaceAdapterSessionMailboxConsumersPath(r.URL.Path)
if !ok {
http.NotFound(w, r)
return
}
limit := 50
if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" {
parsed, err := strconv.Atoi(rawLimit)
if err != nil || parsed <= 0 {
http.Error(w, "invalid remote workspace adapter mailbox consumer snapshot limit", http.StatusBadRequest)
return
}
limit = parsed
}
snapshot, err := snapshotter.SnapshotAdapterSessionMailboxConsumers(sessionID, limit, time.Now().UTC())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(snapshot)
}
func (s Server) handleRemoteWorkspaceAdapterSessionMailboxPreflight(w http.ResponseWriter, r *http.Request) {
preflighter, ok := s.RemoteWorkspaceFrameSink.(RemoteWorkspaceFrameSinkSessionMailboxPreflight)
if !ok {
http.Error(w, "remote workspace adapter mailbox preflight unavailable", http.StatusServiceUnavailable)
return
}
sessionID, ok := parseRemoteWorkspaceAdapterSessionMailboxPreflightPath(r.URL.Path)
if !ok {
http.NotFound(w, r)
return
}
consumerID := strings.TrimSpace(r.URL.Query().Get("consumer_id"))
if consumerID == "" {
http.Error(w, "remote workspace adapter mailbox consumer required for preflight", http.StatusBadRequest)
return
}
if !isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID) {
http.Error(w, "invalid remote workspace adapter mailbox consumer", http.StatusBadRequest)
return
}
resumeFrom := strings.TrimSpace(strings.ToLower(r.URL.Query().Get("resume_from")))
if resumeFrom == "" {
resumeFrom = "checkpoint"
}
if resumeFrom != "ack" && resumeFrom != "checkpoint" {
http.Error(w, "invalid remote workspace adapter mailbox resume cursor", http.StatusBadRequest)
return
}
limit := 50
if rawLimit := strings.TrimSpace(r.URL.Query().Get("limit")); rawLimit != "" {
parsed, err := strconv.Atoi(rawLimit)
if err != nil || parsed <= 0 {
http.Error(w, "invalid remote workspace adapter mailbox preflight limit", http.StatusBadRequest)
return
}
limit = parsed
}
snapshot, err := preflighter.PreflightAdapterSessionMailboxConsumerResume(sessionID, consumerID, resumeFrom, limit, time.Now().UTC())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(snapshot)
}
func (s Server) readRemoteWorkspaceAdapterSessionMailbox(ctx context.Context, reader RemoteWorkspaceFrameSinkSessionMailbox, sessionID string, drain bool, limit int, afterSequence int64, waitMs int) (RemoteWorkspaceAdapterMailboxSnapshot, error) {
if waitMs <= 0 {
return reader.ReadAdapterSessionMailbox(sessionID, drain, limit, afterSequence, time.Now().UTC())
}
deadline := time.Now().UTC().Add(time.Duration(waitMs) * time.Millisecond)
waited := false
for {
mailbox, err := reader.ReadAdapterSessionMailbox(sessionID, drain, limit, afterSequence, time.Now().UTC())
if err != nil {
return mailbox, err
}
if mailbox.ReturnedCount > 0 {
mailbox.Waited = waited
mailbox.WaitMs = waitMs
return mailbox, nil
}
now := time.Now().UTC()
if !now.Before(deadline) {
mailbox.Waited = true
mailbox.WaitTimeout = true
mailbox.WaitMs = waitMs
return mailbox, nil
}
waited = true
sleepFor := 25 * time.Millisecond
if remaining := time.Until(deadline); remaining < sleepFor {
sleepFor = remaining
}
timer := time.NewTimer(sleepFor)
select {
case <-ctx.Done():
timer.Stop()
return RemoteWorkspaceAdapterMailboxSnapshot{}, ctx.Err()
case <-timer.C:
}
}
}
func isRemoteWorkspaceAdapterSessionListPath(path string) bool {
return path == "/mesh/v1/remote-workspace/adapter-sessions" || path == "/mesh/v1/remote-workspace/adapter-sessions/"
}
func parseRemoteWorkspaceAdapterSessionMailboxPath(path string) (string, bool) {
const prefix = "/mesh/v1/remote-workspace/adapter-sessions/"
if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, "/mailbox") {
return "", false
}
sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), "/mailbox")
sessionID = strings.Trim(sessionID, "/")
if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") {
return "", false
}
return sessionID, true
}
func parseRemoteWorkspaceAdapterSessionMailboxConsumersPath(path string) (string, bool) {
const prefix = "/mesh/v1/remote-workspace/adapter-sessions/"
const suffix = "/mailbox/consumers"
if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, suffix) {
return "", false
}
sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), suffix)
sessionID = strings.Trim(sessionID, "/")
if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") {
return "", false
}
return sessionID, true
}
func parseRemoteWorkspaceAdapterSessionMailboxPreflightPath(path string) (string, bool) {
const prefix = "/mesh/v1/remote-workspace/adapter-sessions/"
const suffix = "/mailbox/preflight"
if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, suffix) {
return "", false
}
sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), suffix)
sessionID = strings.Trim(sessionID, "/")
if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") {
return "", false
}
return sessionID, true
}
func isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID string) bool {
if consumerID == "" || len(consumerID) > 64 {
return false
}
for _, ch := range consumerID {
if (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch >= '0' && ch <= '9') {
continue
}
switch ch {
case '.', '_', ':', '-':
continue
default:
return false
}
}
return true
}
func parseRemoteWorkspaceAdapterSessionControlPath(path string) (string, bool) {
const prefix = "/mesh/v1/remote-workspace/adapter-sessions/"
if !strings.HasPrefix(path, prefix) || !strings.HasSuffix(path, "/control") {
return "", false
}
sessionID := strings.TrimSuffix(strings.TrimPrefix(path, prefix), "/control")
sessionID = strings.Trim(sessionID, "/")
if strings.TrimSpace(sessionID) == "" || strings.Contains(sessionID, "/") {
return "", false
}
return sessionID, true
}
func (s Server) handleVPNPacketIngress(w http.ResponseWriter, r *http.Request) bool {
if clusterID, vpnConnectionID, ok := parseVPNClientPacketWebSocketPath(r.URL.Path); ok {
s.handleVPNPacketWebSocket(w, r, clusterID, "", vpnConnectionID, false, true, "")
return true
}
clusterID, vpnConnectionID, ok := parseVPNClientPacketPath(r.URL.Path)
if !ok {
return false
}
return s.handleVPNPacketHTTP(w, r, clusterID, "", vpnConnectionID, "", false, true, "")
}
func (s Server) handleFabricServiceChannelRemoteWorkspaceIngress(w http.ResponseWriter, r *http.Request) bool {
clusterID, channelID, resourceID, channelClass, webSocket, ok := parseFabricServiceChannelRemoteWorkspacePath(r.URL.Path)
if !ok {
return false
}
if webSocket {
http.Error(w, "remote workspace service-channel websocket forwarding is not implemented", http.StatusNotImplemented)
return true
}
decision, valid := s.validateFabricServiceChannelRequest(w, r, clusterID, channelID, resourceID, FabricServiceClassRemoteWorkspace, channelClass)
if !valid {
return true
}
w.Header().Set("X-RAP-Service-Channel-Accepted-By", decision.AcceptedBy)
s.logFabricServiceChannelAccess(r, clusterID, channelID, resourceID, decision)
if r.Method != http.MethodPost && r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return true
}
if r.Method == http.MethodPost {
body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, MaxProductionEnvelopePayloadBytes))
if err != nil {
http.Error(w, "invalid remote workspace probe payload", http.StatusBadRequest)
return true
}
if len(strings.TrimSpace(string(body))) > 0 {
frameProbe, err := validateRemoteWorkspaceFrameBatchProbe(body, decision.ChannelClass)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return true
}
payloadFlow := "validated_probe_only"
adapterSessionID := remoteWorkspaceAdapterSessionID(clusterID, channelID, resourceID, decision.PreferredRouteID)
var deliveryReceipt *RemoteWorkspaceFrameBatchDeliveryReceipt
if s.RemoteWorkspaceFrameSink != nil {
receipt, err := s.RemoteWorkspaceFrameSink.AcceptRemoteWorkspaceFrameBatchProbe(r.Context(), RemoteWorkspaceFrameBatchDelivery{
ClusterID: clusterID,
ChannelID: channelID,
ResourceID: resourceID,
AdapterSessionID: adapterSessionID,
ServiceClass: FabricServiceClassRemoteWorkspace,
ChannelClass: decision.ChannelClass,
AdapterContractID: frameProbe.AdapterContractID,
Frames: frameProbe.Frames,
AcceptedBy: decision.AcceptedBy,
PreferredRouteID: decision.PreferredRouteID,
})
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return true
}
deliveryReceipt = &receipt
payloadFlow = "delivered_probe_only"
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]any{
"schema_version": "rap.remote_workspace_service_channel_ingress_probe.v1",
"accepted": true,
"service_class": FabricServiceClassRemoteWorkspace,
"channel_class": decision.ChannelClass,
"channel_id": channelID,
"resource_id": resourceID,
"adapter_session_id": adapterSessionID,
"data_plane": "validated",
"payload_flow": payloadFlow,
"frame_batch_schema": frameProbe.SchemaVersion,
"frame_count": len(frameProbe.Frames),
"adapter_contract_id": frameProbe.AdapterContractID,
}
if deliveryReceipt != nil {
response["adapter_delivery"] = deliveryReceipt
}
_ = json.NewEncoder(w).Encode(response)
return true
}
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
_ = json.NewEncoder(w).Encode(map[string]any{
"schema_version": "rap.remote_workspace_service_channel_ingress_probe.v1",
"accepted": true,
"service_class": FabricServiceClassRemoteWorkspace,
"channel_class": decision.ChannelClass,
"channel_id": channelID,
"resource_id": resourceID,
"data_plane": "validated",
"payload_flow": "not_implemented",
})
return true
}
type remoteWorkspaceFrameBatchProbe struct {
SchemaVersion string `json:"schema_version"`
ProbeOnly bool `json:"probe_only"`
ServiceClass string `json:"service_class"`
ChannelClass string `json:"channel_class"`
AdapterContractID string `json:"adapter_contract_id,omitempty"`
Frames []RemoteWorkspaceFrameProbeRecord `json:"frames"`
}
type RemoteWorkspaceFrameProbeRecord struct {
Channel string `json:"channel"`
Direction string `json:"direction"`
PayloadEncoding string `json:"payload_encoding,omitempty"`
PayloadLength int `json:"payload_length,omitempty"`
Droppable bool `json:"droppable,omitempty"`
}
type RemoteWorkspaceFrameBatchDelivery struct {
ClusterID string `json:"cluster_id"`
ChannelID string `json:"channel_id"`
ResourceID string `json:"resource_id"`
AdapterSessionID string `json:"adapter_session_id,omitempty"`
ServiceClass string `json:"service_class"`
ChannelClass string `json:"channel_class"`
AdapterContractID string `json:"adapter_contract_id,omitempty"`
Frames []RemoteWorkspaceFrameProbeRecord `json:"frames"`
AcceptedBy string `json:"accepted_by,omitempty"`
PreferredRouteID string `json:"preferred_route_id,omitempty"`
}
type RemoteWorkspaceFrameBatchDeliveryReceipt struct {
SchemaVersion string `json:"schema_version"`
Sink string `json:"sink"`
Accepted bool `json:"accepted"`
ProbeOnly bool `json:"probe_only"`
ClusterID string `json:"cluster_id,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
ServiceClass string `json:"service_class"`
ChannelClass string `json:"channel_class"`
AdapterContractID string `json:"adapter_contract_id,omitempty"`
AdapterSessionID string `json:"adapter_session_id,omitempty"`
AdapterRuntimeID string `json:"adapter_runtime_id,omitempty"`
SessionState string `json:"session_state,omitempty"`
SessionCreatedAt string `json:"session_created_at,omitempty"`
SessionBoundAt string `json:"session_bound_at,omitempty"`
SessionLastActive string `json:"session_last_activity_at,omitempty"`
SessionLifecycle string `json:"session_lifecycle_state,omitempty"`
SessionDeliveries int64 `json:"session_delivery_count,omitempty"`
SessionPressure int64 `json:"session_backpressure_count,omitempty"`
MailboxDepth int `json:"mailbox_depth,omitempty"`
MailboxEnqueued int64 `json:"mailbox_enqueued_total,omitempty"`
FrameCount int `json:"frame_count"`
QueueCapacity int `json:"queue_capacity"`
QueueDepth int `json:"queue_depth"`
AcceptedFrames int `json:"accepted_frames"`
DroppedFrames int `json:"dropped_frames"`
AckedFrames int `json:"acked_frames"`
Backpressure bool `json:"backpressure"`
DropPolicy string `json:"drop_policy,omitempty"`
DeliverySequence int64 `json:"delivery_sequence"`
DeliveredAt string `json:"delivered_at"`
}
func remoteWorkspaceAdapterSessionID(clusterID string, channelID string, resourceID string, preferredRouteID string) string {
seed := strings.Join([]string{
strings.TrimSpace(clusterID),
strings.TrimSpace(channelID),
strings.TrimSpace(resourceID),
strings.TrimSpace(preferredRouteID),
}, "\x00")
sum := sha256.Sum256([]byte(seed))
return "rap-rw-adapter-session-" + hex.EncodeToString(sum[:])[:24]
}
func validateRemoteWorkspaceFrameBatchProbe(payload []byte, requiredChannelClass string) (remoteWorkspaceFrameBatchProbe, error) {
var decoded remoteWorkspaceFrameBatchProbe
if err := json.Unmarshal(payload, &decoded); err != nil {
return decoded, fmt.Errorf("invalid remote workspace frame batch probe")
}
if decoded.SchemaVersion != "rap.remote_workspace_frame_batch.v1" {
return decoded, fmt.Errorf("unsupported remote workspace frame batch schema")
}
if !decoded.ProbeOnly {
return decoded, fmt.Errorf("remote workspace payload forwarding is not implemented")
}
if strings.TrimSpace(strings.ToLower(decoded.ServiceClass)) != FabricServiceClassRemoteWorkspace {
return decoded, fmt.Errorf("remote workspace frame batch service class mismatch")
}
requiredChannelClass = strings.TrimSpace(strings.ToLower(requiredChannelClass))
if strings.TrimSpace(strings.ToLower(decoded.ChannelClass)) != requiredChannelClass {
return decoded, fmt.Errorf("remote workspace frame batch channel class mismatch")
}
if len(decoded.Frames) == 0 || len(decoded.Frames) > 32 {
return decoded, fmt.Errorf("remote workspace frame batch probe must contain 1..32 frames")
}
for _, frame := range decoded.Frames {
channel := strings.TrimSpace(strings.ToLower(frame.Channel))
direction := strings.TrimSpace(strings.ToLower(frame.Direction))
if !isAllowedRemoteWorkspaceAdapterFrameChannel(channel) {
return decoded, fmt.Errorf("unsupported remote workspace adapter frame channel")
}
if !isAllowedRemoteWorkspaceAdapterFrameDirection(channel, direction) {
return decoded, fmt.Errorf("unsupported remote workspace adapter frame direction")
}
encoding := strings.TrimSpace(strings.ToLower(frame.PayloadEncoding))
if encoding != "" && encoding != "none" && encoding != "base64" {
return decoded, fmt.Errorf("unsupported remote workspace frame payload encoding")
}
if frame.PayloadLength < 0 || frame.PayloadLength > MaxProductionEnvelopePayloadBytes {
return decoded, fmt.Errorf("remote workspace frame payload length out of bounds")
}
}
return decoded, nil
}
func isAllowedRemoteWorkspaceAdapterFrameChannel(channel string) bool {
switch channel {
case "input", "control", "display", "cursor", "clipboard", "file_transfer", "audio", "device", "telemetry":
return true
default:
return false
}
}
func isAllowedRemoteWorkspaceAdapterFrameDirection(channel string, direction string) bool {
switch channel {
case "input":
return direction == "client_to_adapter"
case "display", "cursor", "audio", "telemetry":
return direction == "adapter_to_client"
case "control", "clipboard", "file_transfer", "device":
return direction == "client_to_adapter" || direction == "adapter_to_client" || direction == "bidirectional"
default:
return false
}
}
func (s Server) handleFabricServiceChannelVPNPacketIngress(w http.ResponseWriter, r *http.Request) bool {
if clusterID, channelID, vpnConnectionID, ok := parseFabricServiceChannelVPNPacketWebSocketPath(r.URL.Path); ok {
decision, valid := s.validateFabricServiceChannelVPNRequest(w, r, clusterID, channelID, vpnConnectionID)
if !valid {
return true
}
s.logFabricServiceChannelAccess(r, clusterID, channelID, vpnConnectionID, decision)
s.preferVPNPacketIngressRoute(decision.PreferredRouteID)
s.handleVPNPacketWebSocket(w, r, clusterID, channelID, vpnConnectionID, decision.ForceBackendFallback, decision.BackendFallbackAllowed(), decision.BackendRelayPolicy)
return true
}
clusterID, channelID, vpnConnectionID, ok := parseFabricServiceChannelVPNPacketPath(r.URL.Path)
if !ok {
return false
}
decision, valid := s.validateFabricServiceChannelVPNRequest(w, r, clusterID, channelID, vpnConnectionID)
if !valid {
return true
}
w.Header().Set("X-RAP-Service-Channel-Accepted-By", decision.AcceptedBy)
s.logFabricServiceChannelAccess(r, clusterID, channelID, vpnConnectionID, decision)
s.preferVPNPacketIngressRoute(decision.PreferredRouteID)
backendPath := "/api/v1/clusters/" + clusterID + "/vpn-connections/" + vpnConnectionID + "/tunnel/client/packets"
return s.handleVPNPacketHTTP(w, r, clusterID, channelID, vpnConnectionID, backendPath, decision.ForceBackendFallback, decision.BackendFallbackAllowed(), decision.BackendRelayPolicy)
}
func (s Server) preferVPNPacketIngressRoute(routeID string) {
routeID = strings.TrimSpace(routeID)
if routeID == "" || s.VPNPacketIngress == nil {
return
}
if preferred, ok := s.VPNPacketIngress.(VPNPacketIngressRoutePreference); ok {
preferred.PreferClientRoute(routeID)
}
}
func (s Server) handleVPNPacketHTTP(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, vpnConnectionID string, backendFallbackPath string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) bool {
switch r.Method {
case http.MethodPost:
body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, MaxProductionVPNPacketPayloadBytes))
if err != nil {
http.Error(w, "invalid vpn packet payload", http.StatusBadRequest)
return true
}
if r.URL.Query().Get("batch") != "true" && len(body) == 0 {
http.Error(w, "empty vpn packet payload", http.StatusBadRequest)
return true
}
packets := [][]byte{body}
if r.URL.Query().Get("batch") == "true" {
packets, err = decodeVPNIngressPacketBatch(body)
if err != nil {
http.Error(w, "invalid vpn packet batch", http.StatusBadRequest)
return true
}
}
packets = cleanVPNIngressPacketBatch(packets)
if len(packets) == 0 {
http.Error(w, "empty vpn packet batch", http.StatusBadRequest)
return true
}
if forceBackendFallback {
if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, body, backendFallbackPath) {
return true
}
s.logFabricServiceChannelViolation(r, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error())
http.Error(w, ErrRouteNotFound.Error(), vpnIngressStatusCode(ErrRouteNotFound))
return true
}
trafficClass := r.Header.Get("X-RAP-Traffic-Class")
var sendErr error
if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok {
sendErr = classIngress.SendClientPacketBatchWithTrafficClass(r.Context(), clusterID, vpnConnectionID, trafficClass, packets)
} else {
sendErr = s.VPNPacketIngress.SendClientPacketBatch(r.Context(), clusterID, vpnConnectionID, packets)
}
if sendErr != nil {
if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, body, backendFallbackPath) {
return true
}
s.logFabricServiceChannelViolation(r, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error())
http.Error(w, sendErr.Error(), vpnIngressStatusCode(sendErr))
return true
}
w.WriteHeader(http.StatusAccepted)
return true
case http.MethodGet:
if forceBackendFallback {
if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, nil, backendFallbackPath) {
return true
}
s.logFabricServiceChannelViolation(r, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error())
w.WriteHeader(http.StatusNoContent)
return true
}
timeout := vpnIngressTimeout(r)
packets, err := s.VPNPacketIngress.ReceiveClientPacketBatch(r.Context(), clusterID, vpnConnectionID, timeout)
if err != nil {
http.Error(w, err.Error(), vpnIngressStatusCode(err))
return true
}
packets = cleanVPNIngressPacketBatch(packets)
if len(packets) == 0 {
if backendFallbackAllowed && s.proxyVPNPacketIngressToBackendPath(w, r, nil, backendFallbackPath) {
return true
}
w.WriteHeader(http.StatusNoContent)
return true
}
if r.URL.Query().Get("batch") == "true" {
w.Header().Set("Content-Type", "application/vnd.rap.vpn-packet-batch.v1")
_, _ = w.Write(encodeVPNIngressPacketBatch(packets))
return true
}
w.Header().Set("Content-Type", "application/octet-stream")
_, _ = w.Write(packets[0])
return true
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return true
}
}
func (s Server) handleVPNPacketWebSocket(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, vpnConnectionID string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
if s.VPNPacketIngress == nil {
http.Error(w, ErrForwardRuntimeUnavailable.Error(), http.StatusServiceUnavailable)
return
}
upgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
conn.SetReadLimit(MaxProductionVPNPacketPayloadBytes)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
trafficClass := r.Header.Get("X-RAP-Traffic-Class")
errCh := make(chan error, 2)
go func() {
errCh <- s.readVPNPacketWebSocket(ctx, conn, clusterID, channelID, vpnConnectionID, trafficClass, forceBackendFallback, backendFallbackAllowed, backendRelayPolicy)
}()
go func() {
errCh <- s.writeVPNPacketWebSocket(ctx, conn, clusterID, channelID, vpnConnectionID, forceBackendFallback, backendFallbackAllowed, backendRelayPolicy)
}()
select {
case <-ctx.Done():
case <-errCh:
cancel()
}
}
func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn, clusterID string, channelID string, vpnConnectionID string, trafficClass string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) error {
for {
messageType, payload, err := conn.ReadMessage()
if err != nil {
return err
}
if messageType != websocket.BinaryMessage {
continue
}
packets, err := decodeVPNIngressPacketBatch(payload)
if err != nil {
return err
}
packets = cleanVPNIngressPacketBatch(packets)
if len(packets) == 0 {
continue
}
if forceBackendFallback {
if !backendFallbackAllowed {
s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error())
return ErrRouteNotFound
}
if proxyErr := s.backendVPNPacketPost(ctx, clusterID, vpnConnectionID, payload); proxyErr != nil {
return proxyErr
}
continue
}
sendErr := s.sendVPNPacketWebSocketBatch(ctx, clusterID, vpnConnectionID, trafficClass, packets, !backendFallbackAllowed)
if sendErr != nil {
if !backendFallbackAllowed {
s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error())
return sendErr
}
if proxyErr := s.backendVPNPacketPost(ctx, clusterID, vpnConnectionID, payload); proxyErr != nil {
return sendErr
}
}
}
}
func (s Server) sendVPNPacketWebSocketBatch(ctx context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte, retryRouteErrors bool) error {
const maxAttempts = 6
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
if err := ctx.Err(); err != nil {
return err
}
var sendErr error
if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok {
sendErr = classIngress.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, trafficClass, packets)
} else {
sendErr = s.VPNPacketIngress.SendClientPacketBatch(ctx, clusterID, vpnConnectionID, packets)
}
if sendErr == nil {
return nil
}
lastErr = sendErr
if !retryRouteErrors || !isRetryableVPNPacketIngressError(sendErr) {
return sendErr
}
timer := time.NewTimer(time.Duration(75+attempt*50) * time.Millisecond)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
return lastErr
}
func isRetryableVPNPacketIngressError(err error) bool {
return errors.Is(err, ErrRouteNotFound) ||
errors.Is(err, ErrForwardRuntimeUnavailable) ||
errors.Is(err, ErrForwardPeerUnavailable) ||
errors.Is(err, ErrSyntheticPeerUnavailable)
}
func (s Server) receiveVPNPacketWebSocketBatch(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration, retryRouteErrors bool) ([][]byte, error) {
const maxAttempts = 4
var lastErr error
for attempt := 0; attempt < maxAttempts; attempt++ {
if err := ctx.Err(); err != nil {
return nil, err
}
packets, err := s.VPNPacketIngress.ReceiveClientPacketBatch(ctx, clusterID, vpnConnectionID, timeout)
if err == nil {
return packets, nil
}
lastErr = err
if !retryRouteErrors || !isRetryableVPNPacketIngressError(err) {
return nil, err
}
timer := time.NewTimer(time.Duration(75+attempt*50) * time.Millisecond)
select {
case <-ctx.Done():
timer.Stop()
return nil, ctx.Err()
case <-timer.C:
}
}
if retryRouteErrors && isRetryableVPNPacketIngressError(lastErr) {
return nil, nil
}
return nil, lastErr
}
func (s Server) writeVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn, clusterID string, channelID string, vpnConnectionID string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) error {
lastPing := time.Now()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var packets [][]byte
var err error
if !forceBackendFallback {
packets, err = s.receiveVPNPacketWebSocketBatch(ctx, clusterID, vpnConnectionID, 50*time.Millisecond, !backendFallbackAllowed)
}
if forceBackendFallback && !backendFallbackAllowed {
s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error())
return ErrRouteNotFound
}
if err != nil && !backendFallbackAllowed {
s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_receive_failed_backend_fallback_blocked", err.Error())
return err
}
if backendFallbackAllowed && (forceBackendFallback || err != nil || len(packets) == 0) {
backendPackets, proxyErr := s.backendVPNPacketGet(ctx, clusterID, vpnConnectionID, 50*time.Millisecond)
if proxyErr != nil && err != nil {
return err
}
if len(backendPackets) > 0 {
packets = backendPackets
}
}
if len(packets) > 0 {
if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
return err
}
if err := conn.WriteMessage(websocket.BinaryMessage, encodeVPNIngressPacketBatch(packets)); err != nil {
return err
}
continue
}
if time.Since(lastPing) >= 15*time.Second {
if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
return err
}
if err := conn.WriteMessage(websocket.PingMessage, []byte("rap-vpn")); err != nil {
return err
}
lastPing = time.Now()
}
}
}
func (s Server) backendVPNPacketPost(ctx context.Context, clusterID string, vpnConnectionID string, batchPayload []byte) error {
target := strings.TrimRight(strings.TrimSpace(s.BackendProxyBaseURL), "/")
if target == "" {
return ErrRouteNotFound
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, target+"/clusters/"+clusterID+"/vpn-connections/"+vpnConnectionID+"/tunnel/client/packets?batch=true", bytes.NewReader(batchPayload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID)
req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("backend vpn packet post failed: status=%d", resp.StatusCode)
}
return nil
}
func (s Server) backendVPNPacketGet(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration) ([][]byte, error) {
target := strings.TrimRight(strings.TrimSpace(s.BackendProxyBaseURL), "/")
if target == "" {
return nil, ErrRouteNotFound
}
if timeout <= 0 {
timeout = 50 * time.Millisecond
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target+"/clusters/"+clusterID+"/vpn-connections/"+vpnConnectionID+"/tunnel/client/packets?batch=true&timeout_ms="+strconv.FormatInt(timeout.Milliseconds(), 10), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/vnd.rap.vpn-packet-batch.v1")
req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID)
req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil, nil
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("backend vpn packet get failed: status=%d", resp.StatusCode)
}
body, err := io.ReadAll(io.LimitReader(resp.Body, MaxProductionVPNPacketPayloadBytes))
if err != nil {
return nil, err
}
if len(body) == 0 {
return nil, nil
}
return decodeVPNIngressPacketBatch(body)
}
func (s Server) proxyVPNPacketIngressToBackend(w http.ResponseWriter, r *http.Request, body []byte) bool {
return s.proxyVPNPacketIngressToBackendPath(w, r, body, "")
}
func (s Server) proxyVPNPacketIngressToBackendPath(w http.ResponseWriter, r *http.Request, body []byte, backendPath string) bool {
if strings.TrimSpace(s.BackendProxyBaseURL) == "" {
return false
}
target, err := url.Parse(s.BackendProxyBaseURL)
if err != nil || target.Scheme == "" || target.Host == "" {
return false
}
if strings.EqualFold(target.Host, r.Host) {
return false
}
var reader io.Reader
if body != nil {
reader = bytes.NewReader(body)
}
requestURI := r.URL.RequestURI()
if backendPath != "" {
requestURI = backendPath
if r.URL.RawQuery != "" {
requestURI += "?" + r.URL.RawQuery
}
}
req, err := http.NewRequestWithContext(r.Context(), r.Method, target.Scheme+"://"+target.Host+requestURI, reader)
if err != nil {
return false
}
for _, key := range []string{"Accept", "Content-Type"} {
if value := r.Header.Get(key); value != "" {
req.Header.Set(key, value)
}
}
req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID)
req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false
}
defer resp.Body.Close()
for _, key := range []string{"Content-Type"} {
if value := resp.Header.Get(key); value != "" {
w.Header().Set(key, value)
}
}
w.WriteHeader(resp.StatusCode)
_, _ = io.Copy(w, resp.Body)
return true
}
type fabricServiceChannelLeaseAuthorityPayload struct {
SchemaVersion string `json:"schema_version"`
ChannelID string `json:"channel_id"`
ClusterID string `json:"cluster_id"`
ResourceID string `json:"resource_id,omitempty"`
ServiceClass string `json:"service_class"`
Status string `json:"status"`
SelectedEntryNodeID string `json:"selected_entry_node_id"`
SelectedExitNodeID string `json:"selected_exit_node_id"`
AllowedChannels []string `json:"allowed_channels"`
RouteGeneration string `json:"route_generation"`
FencingEpoch int64 `json:"fencing_epoch"`
TokenHash string `json:"token_hash"`
IssuedAt time.Time `json:"issued_at"`
ExpiresAt time.Time `json:"expires_at"`
DataPlane fabricServiceChannelDataPlaneContract `json:"data_plane,omitempty"`
PrimaryRoute struct {
RouteID string `json:"route_id"`
Status string `json:"status"`
} `json:"primary_route"`
}
type fabricServiceChannelDataPlaneContract struct {
SchemaVersion string `json:"schema_version,omitempty"`
Mode string `json:"mode,omitempty"`
ControlPlaneTransport string `json:"control_plane_transport,omitempty"`
WorkingDataTransport string `json:"working_data_transport,omitempty"`
SteadyStateTransport string `json:"steady_state_transport,omitempty"`
BackendRelayPolicy string `json:"backend_relay_policy,omitempty"`
ProductionForwardingRequired bool `json:"production_forwarding_required,omitempty"`
ServiceNeutral bool `json:"service_neutral,omitempty"`
ProtocolAgnostic bool `json:"protocol_agnostic,omitempty"`
LogicalFlowMode string `json:"logical_flow_mode,omitempty"`
RequiredFlowIsolationClasses []string `json:"required_flow_isolation_classes,omitempty"`
RouteSelectionStrategy string `json:"route_selection_strategy,omitempty"`
EntryFailoverMode string `json:"entry_failover_mode,omitempty"`
ExitFailoverMode string `json:"exit_failover_mode,omitempty"`
RouteRebuildMode string `json:"route_rebuild_mode,omitempty"`
FailureDetectionSource string `json:"failure_detection_source,omitempty"`
DegradedFallbackVisibility string `json:"degraded_fallback_visibility,omitempty"`
StableContractForServiceClass string `json:"stable_contract_for_service_class,omitempty"`
}
type fabricServiceChannelRequestDecision struct {
ForceBackendFallback bool
PreferredRouteID string
AcceptedBy string
ServiceClass string
ChannelClass string
DataPlane fabricServiceChannelDataPlaneContract
DataPlaneValid bool
DataPlaneMode string
BackendRelayPolicy string
}
func (d fabricServiceChannelRequestDecision) BackendFallbackAllowed() bool {
return strings.TrimSpace(d.BackendRelayPolicy) != "disabled"
}
func (s Server) validateFabricServiceChannelVPNRequest(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, vpnConnectionID string) (fabricServiceChannelRequestDecision, bool) {
return s.validateFabricServiceChannelRequest(w, r, clusterID, channelID, vpnConnectionID, FabricServiceClassVPNPackets, ProductionChannelVPNPacket)
}
func (s Server) validateFabricServiceChannelRequest(w http.ResponseWriter, r *http.Request, clusterID string, channelID string, resourceID string, expectedServiceClass string, defaultChannelClass string) (fabricServiceChannelRequestDecision, bool) {
var decision fabricServiceChannelRequestDecision
expectedServiceClass = strings.TrimSpace(strings.ToLower(expectedServiceClass))
defaultChannelClass = strings.TrimSpace(strings.ToLower(defaultChannelClass))
if strings.TrimSpace(clusterID) == "" || strings.TrimSpace(channelID) == "" {
http.Error(w, "invalid fabric service channel target", http.StatusBadRequest)
return decision, false
}
if headerChannelID := strings.TrimSpace(r.Header.Get("X-RAP-Fabric-Channel-ID")); headerChannelID != "" && headerChannelID != channelID {
http.Error(w, "fabric service channel mismatch", http.StatusForbidden)
return decision, false
}
serviceClass := strings.TrimSpace(strings.ToLower(r.Header.Get("X-RAP-Service-Class")))
if serviceClass == "" {
serviceClass = expectedServiceClass
}
if serviceClass != expectedServiceClass {
http.Error(w, "unsupported fabric service class", http.StatusForbidden)
return decision, false
}
channelClass := strings.TrimSpace(strings.ToLower(r.Header.Get("X-RAP-Channel-Class")))
if channelClass == "" {
channelClass = defaultChannelClass
}
if !isAllowedFabricServiceChannelForClass(serviceClass, channelClass) {
http.Error(w, "unsupported fabric service channel class", http.StatusForbidden)
return decision, false
}
token := fabricServiceChannelBearerToken(r)
if !strings.HasPrefix(token, "rap_fsc_") {
http.Error(w, "fabric service channel token is required", http.StatusUnauthorized)
return decision, false
}
payload, err := s.verifyFabricServiceChannelLeaseAuthority(r, clusterID, channelID, resourceID, serviceClass, channelClass, token)
if err != nil {
http.Error(w, err.Error(), http.StatusForbidden)
return decision, false
}
decision.AcceptedBy = "legacy_unsigned"
decision.ServiceClass = serviceClass
decision.ChannelClass = channelClass
if payload != nil && (payload.Status == "degraded_fallback" || payload.PrimaryRoute.Status == "missing_route_intent") {
decision.ForceBackendFallback = true
}
if payload != nil {
if err := validateFabricServiceChannelDataPlaneContract(payload.DataPlane, channelClass); err != nil {
http.Error(w, err.Error(), http.StatusForbidden)
return decision, false
}
decision.DataPlane = payload.DataPlane
decision.DataPlaneValid = strings.TrimSpace(payload.DataPlane.SchemaVersion) != ""
decision.DataPlaneMode = strings.TrimSpace(payload.DataPlane.Mode)
decision.BackendRelayPolicy = strings.TrimSpace(payload.DataPlane.BackendRelayPolicy)
if payload.DataPlane.Mode == "degraded_backend_fallback" {
decision.ForceBackendFallback = true
}
}
if payload != nil && !decision.ForceBackendFallback {
decision.PreferredRouteID = strings.TrimSpace(payload.PrimaryRoute.RouteID)
}
if payload != nil && payload.SchemaVersion == "rap.fabric_service_channel_introspection.v1" {
decision.AcceptedBy = "introspection"
} else if payload != nil {
decision.AcceptedBy = "signed"
}
return decision, true
}
type FabricServiceChannelAccessLogEntry struct {
Event string `json:"event"`
ClusterID string `json:"cluster_id,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
LocalNodeID string `json:"local_node_id,omitempty"`
Method string `json:"method,omitempty"`
Path string `json:"path,omitempty"`
AcceptedBy string `json:"accepted_by,omitempty"`
PreferredRouteID string `json:"preferred_route_id,omitempty"`
ServiceClass string `json:"service_class,omitempty"`
ChannelClass string `json:"channel_class,omitempty"`
ForceBackendFallback bool `json:"force_backend_fallback,omitempty"`
DataPlaneMode string `json:"data_plane_mode,omitempty"`
WorkingDataTransport string `json:"working_data_transport,omitempty"`
SteadyStateTransport string `json:"steady_state_transport,omitempty"`
BackendRelayPolicy string `json:"backend_relay_policy,omitempty"`
LogicalFlowMode string `json:"logical_flow_mode,omitempty"`
DataPlaneValid bool `json:"data_plane_valid,omitempty"`
ViolationStatus string `json:"violation_status,omitempty"`
ViolationReason string `json:"violation_reason,omitempty"`
OccurredAt time.Time `json:"occurred_at"`
}
func (s Server) logFabricServiceChannelAccess(r *http.Request, clusterID string, channelID string, resourceID string, decision fabricServiceChannelRequestDecision) {
if s.FabricServiceChannelLogger == nil {
return
}
entry := FabricServiceChannelAccessLogEntry{
Event: "fabric_service_channel_access_accepted",
ClusterID: clusterID,
ChannelID: channelID,
ResourceID: resourceID,
LocalNodeID: s.Local.NodeID,
AcceptedBy: decision.AcceptedBy,
PreferredRouteID: decision.PreferredRouteID,
ServiceClass: decision.ServiceClass,
ChannelClass: decision.ChannelClass,
ForceBackendFallback: decision.ForceBackendFallback,
DataPlaneMode: decision.DataPlaneMode,
WorkingDataTransport: strings.TrimSpace(decision.DataPlane.WorkingDataTransport),
SteadyStateTransport: strings.TrimSpace(decision.DataPlane.SteadyStateTransport),
BackendRelayPolicy: decision.BackendRelayPolicy,
LogicalFlowMode: strings.TrimSpace(decision.DataPlane.LogicalFlowMode),
DataPlaneValid: decision.DataPlaneValid,
OccurredAt: time.Now().UTC(),
}
if r != nil {
entry.Method = r.Method
if r.URL != nil {
entry.Path = r.URL.Path
}
}
s.FabricServiceChannelLogger(entry)
}
func (s Server) logFabricServiceChannelViolation(r *http.Request, clusterID string, channelID string, resourceID string, backendRelayPolicy string, status string, reason string) {
if s.FabricServiceChannelLogger == nil || strings.TrimSpace(channelID) == "" {
return
}
entry := FabricServiceChannelAccessLogEntry{
Event: "fabric_service_channel_data_plane_violation",
ClusterID: clusterID,
ChannelID: channelID,
ResourceID: resourceID,
LocalNodeID: s.Local.NodeID,
BackendRelayPolicy: strings.TrimSpace(backendRelayPolicy),
ViolationStatus: strings.TrimSpace(status),
ViolationReason: strings.TrimSpace(reason),
OccurredAt: time.Now().UTC(),
}
if r != nil {
entry.Method = r.Method
if r.URL != nil {
entry.Path = r.URL.Path
}
}
s.FabricServiceChannelLogger(entry)
}
func (s Server) verifyFabricServiceChannelLeaseAuthority(r *http.Request, clusterID string, channelID string, resourceID string, serviceClass string, channelClass string, token string) (*fabricServiceChannelLeaseAuthorityPayload, error) {
publicKey := strings.TrimSpace(s.ClusterAuthorityPublicKey)
payloadHeader := strings.TrimSpace(r.Header.Get("X-RAP-Service-Channel-Authority-Payload"))
signatureHeader := strings.TrimSpace(r.Header.Get("X-RAP-Service-Channel-Authority-Signature"))
if payloadHeader == "" && signatureHeader == "" {
if publicKey != "" {
if payload, ok, err := s.introspectFabricServiceChannelLease(r, clusterID, channelID, resourceID, serviceClass, channelClass, token); ok || err != nil {
return payload, err
}
return nil, fmt.Errorf("%w: signed service channel authority is required", ErrUnauthorizedChannel)
}
return nil, nil
}
if publicKey == "" {
return nil, ErrUnauthorizedChannel
}
if payloadHeader == "" || signatureHeader == "" {
return nil, fmt.Errorf("%w: service channel authority payload and signature are required together", ErrUnauthorizedChannel)
}
payloadRaw, err := decodeHeaderJSON(payloadHeader)
if err != nil {
return nil, fmt.Errorf("%w: invalid service channel authority payload", ErrUnauthorizedChannel)
}
signatureRaw, err := decodeHeaderJSON(signatureHeader)
if err != nil {
return nil, fmt.Errorf("%w: invalid service channel authority signature", ErrUnauthorizedChannel)
}
var signature authority.Signature
if err := json.Unmarshal(signatureRaw, &signature); err != nil {
return nil, fmt.Errorf("%w: invalid service channel authority signature", ErrUnauthorizedChannel)
}
if err := authority.VerifyRaw(publicKey, payloadRaw, signature); err != nil {
return nil, fmt.Errorf("%w: service channel authority signature rejected", ErrUnauthorizedChannel)
}
var payload fabricServiceChannelLeaseAuthorityPayload
if err := json.Unmarshal(payloadRaw, &payload); err != nil {
return nil, fmt.Errorf("%w: invalid service channel authority payload", ErrUnauthorizedChannel)
}
if payload.SchemaVersion != "rap.fabric_service_channel_lease_authority.v1" ||
payload.ClusterID != clusterID ||
payload.ChannelID != channelID ||
payload.ResourceID != resourceID ||
payload.ServiceClass != serviceClass ||
payload.TokenHash != fabricServiceChannelTokenHash(token) ||
!containsString(payload.AllowedChannels, channelClass) {
return nil, fmt.Errorf("%w: service channel authority payload mismatch", ErrUnauthorizedChannel)
}
if payload.SelectedEntryNodeID != "" && s.Local.NodeID != "" && payload.SelectedEntryNodeID != s.Local.NodeID {
return nil, fmt.Errorf("%w: service channel entry node mismatch", ErrUnauthorizedChannel)
}
if !payload.ExpiresAt.IsZero() && !payload.ExpiresAt.After(time.Now().UTC()) {
return nil, fmt.Errorf("%w: service channel lease expired", ErrUnauthorizedChannel)
}
return &payload, nil
}
func validateFabricServiceChannelDataPlaneContract(contract fabricServiceChannelDataPlaneContract, requiredFlowClass string) error {
if strings.TrimSpace(contract.SchemaVersion) == "" {
return nil
}
requiredFlowClass = strings.TrimSpace(strings.ToLower(requiredFlowClass))
if contract.SchemaVersion != "rap.fabric_service_channel_data_plane.v1" ||
contract.WorkingDataTransport != "fabric_service_channel" ||
contract.SteadyStateTransport != "fabric_route" ||
(contract.BackendRelayPolicy != "degraded_fallback_only" && contract.BackendRelayPolicy != "disabled") ||
!contract.ServiceNeutral ||
!contract.ProtocolAgnostic ||
contract.LogicalFlowMode != "multi_flow_isolated" {
return fmt.Errorf("%w: unsupported service channel data-plane contract", ErrUnauthorizedChannel)
}
if contract.Mode != "" && contract.Mode != "fabric_primary" && contract.Mode != "degraded_backend_fallback" {
return fmt.Errorf("%w: unsupported service channel data-plane mode", ErrUnauthorizedChannel)
}
if requiredFlowClass != "" && len(contract.RequiredFlowIsolationClasses) > 0 && !containsString(contract.RequiredFlowIsolationClasses, requiredFlowClass) {
return fmt.Errorf("%w: service channel data-plane missing required flow isolation", ErrUnauthorizedChannel)
}
return nil
}
type fabricServiceChannelIntrospectionResponse struct {
Introspection fabricServiceChannelIntrospection `json:"fabric_service_channel_introspection"`
}
type fabricServiceChannelIntrospection struct {
Allowed bool `json:"allowed"`
Status string `json:"status"`
Reason string `json:"reason"`
SelectedEntryNodeID string `json:"selected_entry_node_id"`
AllowedChannels []string `json:"allowed_channels"`
PreferredRouteID string `json:"preferred_route_id"`
ForceBackendFallback bool `json:"force_backend_fallback"`
DataPlane fabricServiceChannelDataPlaneContract `json:"data_plane,omitempty"`
LeaseStatus string `json:"lease_status"`
PrimaryRoute struct {
RouteID string `json:"route_id"`
Status string `json:"status"`
} `json:"primary_route"`
ExpiresAt time.Time `json:"expires_at"`
}
func (s Server) introspectFabricServiceChannelLease(r *http.Request, clusterID string, channelID string, resourceID string, serviceClass string, channelClass string, token string) (*fabricServiceChannelLeaseAuthorityPayload, bool, error) {
baseURL := strings.TrimRight(strings.TrimSpace(s.BackendProxyBaseURL), "/")
if baseURL == "" {
return nil, false, nil
}
serviceClass = strings.TrimSpace(strings.ToLower(firstNonEmpty(serviceClass, r.Header.Get("X-RAP-Service-Class"), FabricServiceClassVPNPackets)))
channelClass = strings.TrimSpace(strings.ToLower(firstNonEmpty(channelClass, r.Header.Get("X-RAP-Channel-Class"), ProductionChannelVPNPacket)))
path := "/clusters/" + clusterID + "/fabric/service-channels/" + channelID + "/introspect"
body, _ := json.Marshal(map[string]any{
"token": token,
"resource_id": resourceID,
"service_class": serviceClass,
"channel_class": channelClass,
"entry_node_id": s.Local.NodeID,
})
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+path, bytes.NewReader(body))
if err != nil {
return nil, true, fmt.Errorf("%w: service channel introspection request failed", ErrUnauthorizedChannel)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-RAP-Entry-Node", s.Local.NodeID)
req.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, true, fmt.Errorf("%w: service channel introspection unavailable", ErrUnauthorizedChannel)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, true, fmt.Errorf("%w: service channel introspection denied", ErrUnauthorizedChannel)
}
var decoded fabricServiceChannelIntrospectionResponse
if err := json.NewDecoder(io.LimitReader(resp.Body, 64*1024)).Decode(&decoded); err != nil {
return nil, true, fmt.Errorf("%w: invalid service channel introspection response", ErrUnauthorizedChannel)
}
item := decoded.Introspection
if !item.Allowed {
return nil, true, fmt.Errorf("%w: service channel introspection denied", ErrUnauthorizedChannel)
}
if item.SelectedEntryNodeID != "" && s.Local.NodeID != "" && item.SelectedEntryNodeID != s.Local.NodeID {
return nil, true, fmt.Errorf("%w: service channel entry node mismatch", ErrUnauthorizedChannel)
}
if !item.ExpiresAt.IsZero() && !item.ExpiresAt.After(time.Now().UTC()) {
return nil, true, fmt.Errorf("%w: service channel lease expired", ErrUnauthorizedChannel)
}
payload := &fabricServiceChannelLeaseAuthorityPayload{
SchemaVersion: "rap.fabric_service_channel_introspection.v1",
ChannelID: channelID,
ClusterID: clusterID,
ResourceID: resourceID,
ServiceClass: serviceClass,
Status: item.LeaseStatus,
SelectedEntryNodeID: item.SelectedEntryNodeID,
AllowedChannels: item.AllowedChannels,
DataPlane: item.DataPlane,
ExpiresAt: item.ExpiresAt,
}
payload.PrimaryRoute.RouteID = strings.TrimSpace(firstNonEmpty(item.PreferredRouteID, item.PrimaryRoute.RouteID))
payload.PrimaryRoute.Status = item.PrimaryRoute.Status
if item.ForceBackendFallback {
payload.Status = "degraded_fallback"
if payload.PrimaryRoute.Status == "" {
payload.PrimaryRoute.Status = "missing_route_intent"
}
}
return payload, true, nil
}
func decodeHeaderJSON(value string) (json.RawMessage, error) {
if value == "" {
return nil, fmt.Errorf("empty header")
}
if decoded, err := base64.RawURLEncoding.DecodeString(value); err == nil {
return json.RawMessage(decoded), nil
}
if decoded, err := base64.StdEncoding.DecodeString(value); err == nil {
return json.RawMessage(decoded), nil
}
return json.RawMessage(value), nil
}
func fabricServiceChannelTokenHash(token string) string {
sum := sha256.Sum256([]byte(strings.TrimSpace(token)))
return hex.EncodeToString(sum[:])
}
func fabricServiceChannelBearerToken(r *http.Request) string {
if r == nil {
return ""
}
if token := strings.TrimSpace(r.Header.Get("X-RAP-Service-Channel-Token")); token != "" {
return token
}
auth := strings.TrimSpace(r.Header.Get("Authorization"))
if len(auth) > len("Bearer ") && strings.EqualFold(auth[:len("Bearer ")], "Bearer ") {
return strings.TrimSpace(auth[len("Bearer "):])
}
return strings.TrimSpace(r.URL.Query().Get("service_channel_token"))
}
func isAllowedFabricServiceVPNChannel(channel string) bool {
return isAllowedFabricServiceChannelForClass(FabricServiceClassVPNPackets, channel)
}
func isAllowedFabricServiceChannelForClass(serviceClass string, channel string) bool {
serviceClass = strings.TrimSpace(strings.ToLower(serviceClass))
channel = strings.TrimSpace(strings.ToLower(channel))
switch channel {
case ProductionChannelVPNPacket,
FabricServiceChannelBulk,
FabricServiceChannelControl,
FabricServiceChannelInteractive,
FabricServiceChannelReliable,
FabricServiceChannelDroppable:
if serviceClass == FabricServiceClassRemoteWorkspace && channel == ProductionChannelVPNPacket {
return false
}
return channel != ""
}
return false
}
func containsString(values []string, target string) bool {
for _, value := range values {
if value == target {
return true
}
}
return false
}
func parseFabricServiceChannelVPNPacketWebSocketPath(path string) (string, string, string, bool) {
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) != 11 ||
parts[0] != "api" ||
parts[1] != "v1" ||
parts[2] != "clusters" ||
parts[4] != "fabric" ||
parts[5] != "service-channels" ||
parts[7] != "vpn-connections" ||
parts[9] != "packets" ||
parts[10] != "ws" {
return "", "", "", false
}
if parts[3] == "" || parts[6] == "" || parts[8] == "" {
return "", "", "", false
}
return parts[3], parts[6], parts[8], true
}
func parseFabricServiceChannelRemoteWorkspacePath(path string) (string, string, string, string, bool, bool) {
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) == 11 &&
parts[0] == "api" &&
parts[1] == "v1" &&
parts[2] == "clusters" &&
parts[4] == "fabric" &&
parts[5] == "service-channels" &&
parts[7] == "remote-workspaces" &&
parts[9] == "streams" &&
parts[10] == "ws" &&
parts[3] != "" &&
parts[6] != "" &&
parts[8] != "" {
return parts[3], parts[6], parts[8], FabricServiceChannelInteractive, true, true
}
if len(parts) != 11 ||
parts[0] != "api" ||
parts[1] != "v1" ||
parts[2] != "clusters" ||
parts[4] != "fabric" ||
parts[5] != "service-channels" ||
parts[7] != "remote-workspaces" ||
parts[9] != "streams" {
return "", "", "", "", false, false
}
if parts[3] == "" || parts[6] == "" || parts[8] == "" || parts[10] == "" {
return "", "", "", "", false, false
}
return parts[3], parts[6], parts[8], strings.TrimSpace(strings.ToLower(parts[10])), false, true
}
func parseFabricServiceChannelVPNPacketPath(path string) (string, string, string, bool) {
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) != 10 ||
parts[0] != "api" ||
parts[1] != "v1" ||
parts[2] != "clusters" ||
parts[4] != "fabric" ||
parts[5] != "service-channels" ||
parts[7] != "vpn-connections" ||
parts[9] != "packets" {
return "", "", "", false
}
if parts[3] == "" || parts[6] == "" || parts[8] == "" {
return "", "", "", false
}
return parts[3], parts[6], parts[8], true
}
func parseVPNClientPacketWebSocketPath(path string) (string, string, bool) {
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) != 10 ||
parts[0] != "api" ||
parts[1] != "v1" ||
parts[2] != "clusters" ||
parts[4] != "vpn-connections" ||
parts[6] != "tunnel" ||
parts[7] != "client" ||
parts[8] != "packets" ||
parts[9] != "ws" {
return "", "", false
}
if parts[3] == "" || parts[5] == "" {
return "", "", false
}
return parts[3], parts[5], true
}
func parseVPNClientPacketPath(path string) (string, string, bool) {
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) != 9 ||
parts[0] != "api" ||
parts[1] != "v1" ||
parts[2] != "clusters" ||
parts[4] != "vpn-connections" ||
parts[6] != "tunnel" ||
parts[7] != "client" ||
parts[8] != "packets" {
return "", "", false
}
if parts[3] == "" || parts[5] == "" {
return "", "", false
}
return parts[3], parts[5], true
}
func vpnIngressTimeout(r *http.Request) time.Duration {
timeoutMs, _ := strconv.Atoi(r.URL.Query().Get("timeout_ms"))
if timeoutMs <= 0 {
timeoutMs = 25000
}
if timeoutMs > 30000 {
timeoutMs = 30000
}
return time.Duration(timeoutMs) * time.Millisecond
}
func vpnIngressStatusCode(err error) int {
switch err {
case ErrForwardRuntimeUnavailable, ErrRouteNotFound, ErrForwardPeerUnavailable:
return http.StatusServiceUnavailable
case ErrUnauthorizedChannel, ErrClusterMismatch, ErrNodeMismatch:
return http.StatusForbidden
default:
return http.StatusBadGateway
}
}
func encodeVPNIngressPacketBatch(packets [][]byte) []byte {
packets = cleanVPNIngressPacketBatch(packets)
total := 0
for _, packet := range packets {
total += 4 + len(packet)
}
out := make([]byte, total)
offset := 0
for _, packet := range packets {
binary.BigEndian.PutUint32(out[offset:offset+4], uint32(len(packet)))
offset += 4
copy(out[offset:offset+len(packet)], packet)
offset += len(packet)
}
return out
}
func cleanVPNIngressPacketBatch(packets [][]byte) [][]byte {
if len(packets) == 0 {
return nil
}
cleaned := make([][]byte, 0, len(packets))
for _, packet := range packets {
if len(packet) == 0 {
continue
}
cleaned = append(cleaned, packet)
}
return cleaned
}
func decodeVPNIngressPacketBatch(payload []byte) ([][]byte, error) {
var packets [][]byte
for offset := 0; offset < len(payload); {
if offset+4 > len(payload) {
return nil, fmt.Errorf("%w: truncated vpn packet batch header", ErrForwardEnvelopeInvalid)
}
size := int(binary.BigEndian.Uint32(payload[offset : offset+4]))
offset += 4
if size <= 0 || size > 65535 {
return nil, fmt.Errorf("%w: invalid vpn packet batch item size", ErrForwardEnvelopeInvalid)
}
if offset+size > len(payload) {
return nil, fmt.Errorf("%w: truncated vpn packet batch item", ErrForwardEnvelopeInvalid)
}
packets = append(packets, append([]byte(nil), payload[offset:offset+size]...))
offset += size
}
if len(packets) == 0 {
return nil, fmt.Errorf("%w: empty vpn packet batch", ErrForwardEnvelopeInvalid)
}
return packets, nil
}
func (s Server) backendProxy() http.Handler {
target, err := url.Parse(s.BackendProxyBaseURL)
if err != nil || target.Scheme == "" || target.Host == "" {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "backend proxy misconfigured", http.StatusServiceUnavailable)
})
}
proxy := &httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Scheme = target.Scheme
r.URL.Host = target.Host
r.Host = target.Host
r.Header.Set("X-RAP-Entry-Node", s.Local.NodeID)
r.Header.Set("X-RAP-Entry-Cluster", s.Local.ClusterID)
},
}
return proxy
}
func (s Server) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var message HealthMessage
if err := json.NewDecoder(r.Body).Decode(&message); err != nil {
http.Error(w, "invalid health message", http.StatusBadRequest)
return
}
if message.ProtocolVersion != ProtocolVersion {
http.Error(w, "unsupported mesh protocol version", http.StatusBadRequest)
return
}
if err := ValidatePeer(s.Local, message.From); err != nil {
http.Error(w, err.Error(), http.StatusForbidden)
return
}
if message.To.NodeID != "" && message.To.NodeID != s.Local.NodeID {
http.Error(w, ErrNodeMismatch.Error(), http.StatusForbidden)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(HealthAck{
ProtocolVersion: ProtocolVersion,
Accepted: true,
By: s.Local,
})
}
func (s Server) handleForward(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
if !s.ProductionForwardingEnabled {
s.logProductionForward(ProductionForwardLogEntry{
Event: "production_forward_rejected",
ClusterID: s.Local.ClusterID,
LocalNodeID: s.Local.NodeID,
Reason: ErrForwardDisabled.Error(),
StatusCode: http.StatusNotImplemented,
OccurredAt: time.Now().UTC(),
})
http.Error(w, ErrForwardDisabled.Error(), http.StatusNotImplemented)
return
}
var envelope ProductionEnvelope
if err := json.NewDecoder(r.Body).Decode(&envelope); err != nil {
s.logProductionForward(ProductionForwardLogEntry{
Event: "production_forward_rejected",
ClusterID: s.Local.ClusterID,
LocalNodeID: s.Local.NodeID,
Reason: "invalid production mesh envelope",
StatusCode: http.StatusBadRequest,
OccurredAt: time.Now().UTC(),
})
http.Error(w, "invalid production mesh envelope", http.StatusBadRequest)
return
}
if err := ValidateProductionEnvelope(s.Local, envelope, time.Now().UTC()); err != nil {
s.rejectProductionForward(w, envelope, err, forwardStatusCode(err))
return
}
if err := ValidateProductionEnvelopeRouteConfig(s.Local, envelope, s.ProductionRoutes, time.Now().UTC()); err != nil {
s.rejectProductionForward(w, envelope, err, forwardStatusCode(err))
return
}
s.logProductionForward(productionForwardLogEntry("production_forward_accepted", s.Local, envelope, "", 0))
if s.ProductionEnvelopeObserver != nil {
observation := NewProductionEnvelopeObservation(envelope, time.Now().UTC())
if err := observeProductionEnvelope(r.Context(), s.ProductionEnvelopeObserver, observation); err != nil {
s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardObservationFailed.Error(), http.StatusInternalServerError))
http.Error(w, ErrForwardObservationFailed.Error(), http.StatusInternalServerError)
return
}
}
if envelope.DestinationNodeID == s.Local.NodeID {
if err := deliverProductionEnvelope(r.Context(), s.ProductionEnvelopeDelivery, envelope); err != nil {
s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardDeliveryFailed.Error(), http.StatusInternalServerError))
http.Error(w, ErrForwardDeliveryFailed.Error(), http.StatusInternalServerError)
return
}
s.logProductionForward(productionForwardLogEntry("production_forward_delivered", s.Local, envelope, "", http.StatusOK))
writeProductionForwardResult(w, ProductionForwardResult{
Accepted: true,
Delivered: true,
By: s.Local,
MessageID: envelope.MessageID,
RouteID: envelope.RouteID,
})
return
}
if envelope.NextHopNodeID == s.Local.NodeID {
s.rejectProductionForward(w, envelope, ErrLoopDetected, forwardStatusCode(ErrLoopDetected))
return
}
if len(envelope.RoutePath) == 0 && envelope.NextHopNodeID != envelope.DestinationNodeID {
s.rejectProductionForward(w, envelope, ErrForwardRuntimeUnavailable, http.StatusNotImplemented)
return
}
if s.ProductionForwardTransport == nil {
s.rejectProductionForward(w, envelope, ErrForwardRuntimeUnavailable, http.StatusNotImplemented)
return
}
if envelope.TTL <= 1 {
s.rejectProductionForward(w, envelope, ErrTTLExhausted, forwardStatusCode(ErrTTLExhausted))
return
}
forwarded := envelope
forwarded.CurrentHopNodeID = envelope.NextHopNodeID
forwarded.NextHopNodeID = nextProductionHopAfter(envelope.RoutePath, envelope.NextHopNodeID, envelope.DestinationNodeID)
forwarded.TTL = envelope.TTL - 1
forwarded.HopCount = envelope.HopCount + 1
forwarded.VisitedNodeIDs = append(append([]string{}, envelope.VisitedNodeIDs...), s.Local.NodeID)
result, err := s.ProductionForwardTransport.SendProduction(r.Context(), envelope.NextHopNodeID, forwarded)
if err != nil {
s.rejectProductionForward(w, envelope, err, forwardStatusCode(err))
return
}
s.logProductionForward(productionForwardLogEntry("production_forward_forwarded", s.Local, envelope, "", http.StatusOK))
result.Accepted = true
result.Forwarded = true
result.By = s.Local
result.MessageID = envelope.MessageID
result.RouteID = envelope.RouteID
result.NextNodeID = envelope.NextHopNodeID
writeProductionForwardResult(w, result)
}
func (s Server) rejectProductionForward(w http.ResponseWriter, envelope ProductionEnvelope, err error, statusCode int) {
s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, err.Error(), statusCode))
http.Error(w, err.Error(), statusCode)
}
func (s Server) logProductionForward(entry ProductionForwardLogEntry) {
if s.ProductionForwardLogger == nil {
return
}
if entry.OccurredAt.IsZero() {
entry.OccurredAt = time.Now().UTC()
}
s.ProductionForwardLogger(entry)
}
func productionForwardLogEntry(event string, local PeerIdentity, envelope ProductionEnvelope, reason string, statusCode int) ProductionForwardLogEntry {
return ProductionForwardLogEntry{
Event: event,
RouteID: envelope.RouteID,
MessageID: envelope.MessageID,
ClusterID: envelope.ClusterID,
LocalNodeID: local.NodeID,
SourceNodeID: envelope.SourceNodeID,
DestinationNodeID: envelope.DestinationNodeID,
CurrentHopNodeID: envelope.CurrentHopNodeID,
NextHopNodeID: envelope.NextHopNodeID,
ChannelClass: envelope.ChannelClass,
MessageType: envelope.MessageType,
Reason: reason,
StatusCode: statusCode,
TTL: envelope.TTL,
HopCount: envelope.HopCount,
RoutePathLength: len(envelope.RoutePath),
VisitedCount: len(envelope.VisitedNodeIDs),
PayloadLength: envelope.PayloadLength,
OccurredAt: time.Now().UTC(),
}
}
func nextProductionHopAfter(routePath []string, currentNodeID string, destinationNodeID string) string {
if len(routePath) == 0 {
return destinationNodeID
}
for index, nodeID := range routePath {
if nodeID == currentNodeID {
if index >= len(routePath)-1 {
return currentNodeID
}
return routePath[index+1]
}
}
return destinationNodeID
}
func writeProductionForwardResult(w http.ResponseWriter, result ProductionForwardResult) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(result)
}
func observeProductionEnvelope(ctx context.Context, observer ProductionEnvelopeObserver, observation ProductionEnvelopeObservation) (err error) {
if observer == nil {
return nil
}
defer func() {
if recover() != nil {
err = ErrForwardObservationFailed
}
}()
return observer(ctx, observation)
}
func deliverProductionEnvelope(ctx context.Context, delivery ProductionEnvelopeDelivery, envelope ProductionEnvelope) (err error) {
if delivery == nil {
return nil
}
defer func() {
if recover() != nil {
err = ErrForwardDeliveryFailed
}
}()
return delivery(ctx, envelope)
}
func (s Server) handleSyntheticProbe(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
if s.SyntheticRuntime == nil {
http.Error(w, ErrMeshRuntimeDisabled.Error(), http.StatusServiceUnavailable)
return
}
var envelope SyntheticEnvelope
if err := json.NewDecoder(r.Body).Decode(&envelope); err != nil {
http.Error(w, "invalid synthetic mesh envelope", http.StatusBadRequest)
return
}
ack, err := s.SyntheticRuntime.Receive(r.Context(), envelope)
if err != nil {
http.Error(w, err.Error(), syntheticStatusCode(err))
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(ack)
}
func NewHealthMessage(from, to PeerIdentity) HealthMessage {
status := "reachable"
return HealthMessage{
ProtocolVersion: ProtocolVersion,
From: from,
To: to,
ObservedAt: time.Now().UTC(),
LinkStatus: status,
}
}
func syntheticStatusCode(err error) int {
switch err {
case ErrClusterMismatch, ErrNodeMismatch, ErrUnauthorizedChannel, ErrLoopDetected:
return http.StatusForbidden
case ErrMeshRuntimeDisabled:
return http.StatusServiceUnavailable
case ErrRouteExpired, ErrTTLExhausted, ErrInvalidRoutePath, ErrUnsupportedSyntheticMessage, ErrRouteIDRequired:
return http.StatusBadRequest
case ErrRouteNotFound, ErrSyntheticPeerUnavailable:
return http.StatusNotFound
default:
return http.StatusBadRequest
}
}
func forwardStatusCode(err error) int {
switch err {
case ErrClusterMismatch, ErrNodeMismatch, ErrUnauthorizedChannel, ErrLoopDetected:
return http.StatusForbidden
case ErrRouteExpired, ErrTTLExhausted, ErrInvalidRoutePath, ErrRouteIDRequired:
return http.StatusBadRequest
case ErrForwardRuntimeUnavailable:
return http.StatusNotImplemented
case ErrRouteNotFound:
return http.StatusNotFound
case ErrForwardPeerUnavailable:
return http.StatusBadGateway
default:
return http.StatusBadRequest
}
}