package webingress import ( "context" "encoding/json" "errors" "fmt" "net/http" "strings" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" ) var ( ErrMeshEnvelopeRuntimeRequired = errors.New("web ingress mesh envelope runtime required") ErrMeshEnvelopeRouteRequired = errors.New("web ingress mesh envelope route set required") ErrMeshEnvelopeIdentityInvalid = errors.New("web ingress mesh envelope identity invalid") ) type FabricChannelReliableRuntime interface { SendReliable(ctx context.Context, spec mesh.FabricChannelSpec, routeSet mesh.FabricRouteSet, payloads [][]byte) (mesh.FabricChannelRuntimeResult, error) } type FabricChannelRequestResponseRuntime interface { SendRequestResponse(ctx context.Context, spec mesh.FabricChannelSpec, routeSet mesh.FabricRouteSet, payload []byte) (mesh.FabricChannelRequestResponseResult, error) } type MeshEnvelopeSender struct { Runtime FabricChannelReliableRuntime ResponseRuntime FabricChannelRequestResponseRuntime RouteSet mesh.FabricRouteSet ClusterID string SourceNodeID string TargetKind mesh.FabricChannelTargetKind TargetID string ChannelID string Now func() time.Time } type MeshEnvelopeDeliveryResponse struct { SchemaVersion string `json:"schema_version"` Status string `json:"status"` ChannelID string `json:"channel_id"` RouteID string `json:"route_id,omitempty"` TargetNode string `json:"target_node,omitempty"` BytesSent uint64 `json:"bytes_sent"` FramesSent uint64 `json:"frames_sent"` AcksReceived uint64 `json:"acks_received"` MigrationEvents int `json:"migration_events"` } func (s MeshEnvelopeSender) Send(ctx context.Context, envelope SignedFabricServiceChannelEnvelope) (FabricResponse, error) { if s.Runtime == nil && s.ResponseRuntime == nil { return FabricResponse{}, ErrMeshEnvelopeRuntimeRequired } if strings.TrimSpace(s.RouteSet.Primary.RouteID) == "" && len(s.RouteSet.WarmStandby) == 0 && len(s.RouteSet.ColdFallbacks) == 0 { return FabricResponse{}, ErrMeshEnvelopeRouteRequired } spec, err := s.channelSpec(envelope) if err != nil { return FabricResponse{}, err } payload, err := json.Marshal(envelope) if err != nil { return FabricResponse{}, err } if s.ResponseRuntime != nil { result, err := s.ResponseRuntime.SendRequestResponse(ctx, spec, s.routeSet(spec), payload) if err != nil { return FabricResponse{}, err } responsePayload, err := unwrapWebIngressForwardResponse(result.ResponsePayload) if err != nil { return FabricResponse{}, err } if response, ok := decodeRuntimeHTTPResponse(responsePayload); ok { return response, nil } return acceptedDeliveryResponse(spec.ChannelID, result.FabricChannelRuntimeResult) } result, err := s.Runtime.SendReliable(ctx, spec, s.routeSet(spec), [][]byte{payload}) if err != nil { return FabricResponse{}, err } return acceptedDeliveryResponse(spec.ChannelID, result) } func unwrapWebIngressForwardResponse(payload []byte) ([]byte, error) { var response struct { Payload json.RawMessage `json:"payload,omitempty"` Error string `json:"error,omitempty"` } if len(payload) == 0 || json.Unmarshal(payload, &response) != nil { return payload, nil } if strings.TrimSpace(response.Error) != "" { return nil, fmt.Errorf("%w: %s", ErrMeshEnvelopeRuntimeRequired, response.Error) } if len(response.Payload) == 0 { return payload, nil } return append([]byte(nil), response.Payload...), nil } func acceptedDeliveryResponse(channelID string, result mesh.FabricChannelRuntimeResult) (FabricResponse, error) { response, err := json.Marshal(MeshEnvelopeDeliveryResponse{ SchemaVersion: "rap.web_ingress.mesh_envelope_delivery_response.v1", Status: "accepted", ChannelID: channelID, RouteID: result.Channel.RouteID, TargetNode: result.Channel.TargetNode, BytesSent: result.BytesSent, FramesSent: result.FramesSent, AcksReceived: result.AcksReceived, MigrationEvents: result.MigrationEvents, }) if err != nil { return FabricResponse{}, err } return FabricResponse{ StatusCode: http.StatusAccepted, Headers: http.Header{"Content-Type": []string{"application/json"}}, Body: response, }, nil } func decodeRuntimeHTTPResponse(payload []byte) (FabricResponse, bool) { var response struct { SchemaVersion string `json:"schema_version"` StatusCode int `json:"status_code"` Headers map[string][]string `json:"headers,omitempty"` BodyBase64 string `json:"body_b64,omitempty"` Body string `json:"body,omitempty"` } if len(payload) == 0 || json.Unmarshal(payload, &response) != nil { return FabricResponse{}, false } if response.SchemaVersion != FabricRuntimeResponseSchema { return FabricResponse{}, false } body := []byte(response.Body) if response.BodyBase64 != "" { decoded, err := decodeEnvelopeBase64(response.BodyBase64) if err != nil { return FabricResponse{}, false } body = decoded } headers := http.Header{} for key, values := range response.Headers { if !safeResponseHeader(key) { continue } for _, value := range values { headers.Add(key, value) } } return FabricResponse{StatusCode: response.StatusCode, Headers: headers, Body: body}, true } func (s MeshEnvelopeSender) channelSpec(envelope SignedFabricServiceChannelEnvelope) (mesh.FabricChannelSpec, error) { clusterID := strings.TrimSpace(s.ClusterID) sourceNodeID := strings.TrimSpace(s.SourceNodeID) targetID := strings.TrimSpace(s.TargetID) if clusterID == "" || sourceNodeID == "" || targetID == "" { return mesh.FabricChannelSpec{}, ErrMeshEnvelopeIdentityInvalid } targetKind := s.TargetKind if targetKind == "" { targetKind = mesh.FabricChannelTargetPool } channelID := strings.TrimSpace(s.ChannelID) if channelID == "" { channelID = defaultMeshEnvelopeChannelID(envelope, s.now()) } spec := mesh.FabricChannelSpec{ ChannelID: channelID, ClusterID: clusterID, SourceNodeID: sourceNodeID, TargetKind: targetKind, TargetID: targetID, TrafficClass: "control", StickyKey: envelope.Envelope.Scope + ":" + envelope.Envelope.ServiceClass, CreatedAt: s.now(), } if err := mesh.ValidateFabricChannelSpec(spec); err != nil { return mesh.FabricChannelSpec{}, err } return spec, nil } func (s MeshEnvelopeSender) routeSet(spec mesh.FabricChannelSpec) mesh.FabricRouteSet { routeSet := s.RouteSet if routeSet.TargetKind == "" { routeSet.TargetKind = spec.TargetKind } if strings.TrimSpace(routeSet.TargetID) == "" { routeSet.TargetID = spec.TargetID } return routeSet } func (s MeshEnvelopeSender) now() time.Time { if s.Now != nil { return s.Now().UTC() } return time.Now().UTC() } func defaultMeshEnvelopeChannelID(envelope SignedFabricServiceChannelEnvelope, now time.Time) string { serviceClass := strings.ReplaceAll(strings.TrimSpace(envelope.Envelope.ServiceClass), "_", "-") if serviceClass == "" { serviceClass = "web-ingress" } return fmt.Sprintf("web-ingress-%s-%d", serviceClass, now.UnixNano()) }