218 lines
7.1 KiB
Go
218 lines
7.1 KiB
Go
package webingress
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
|
|
)
|
|
|
|
var (
|
|
ErrMeshEnvelopeRuntimeRequired = errors.New("web ingress mesh envelope runtime required")
|
|
ErrMeshEnvelopeRouteRequired = errors.New("web ingress mesh envelope route set required")
|
|
ErrMeshEnvelopeIdentityInvalid = errors.New("web ingress mesh envelope identity invalid")
|
|
)
|
|
|
|
type FabricChannelReliableRuntime interface {
|
|
SendReliable(ctx context.Context, spec mesh.FabricChannelSpec, routeSet mesh.FabricRouteSet, payloads [][]byte) (mesh.FabricChannelRuntimeResult, error)
|
|
}
|
|
|
|
type FabricChannelRequestResponseRuntime interface {
|
|
SendRequestResponse(ctx context.Context, spec mesh.FabricChannelSpec, routeSet mesh.FabricRouteSet, payload []byte) (mesh.FabricChannelRequestResponseResult, error)
|
|
}
|
|
|
|
type MeshEnvelopeSender struct {
|
|
Runtime FabricChannelReliableRuntime
|
|
ResponseRuntime FabricChannelRequestResponseRuntime
|
|
RouteSet mesh.FabricRouteSet
|
|
ClusterID string
|
|
SourceNodeID string
|
|
TargetKind mesh.FabricChannelTargetKind
|
|
TargetID string
|
|
ChannelID string
|
|
Now func() time.Time
|
|
}
|
|
|
|
type MeshEnvelopeDeliveryResponse struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
Status string `json:"status"`
|
|
ChannelID string `json:"channel_id"`
|
|
RouteID string `json:"route_id,omitempty"`
|
|
TargetNode string `json:"target_node,omitempty"`
|
|
BytesSent uint64 `json:"bytes_sent"`
|
|
FramesSent uint64 `json:"frames_sent"`
|
|
AcksReceived uint64 `json:"acks_received"`
|
|
MigrationEvents int `json:"migration_events"`
|
|
}
|
|
|
|
func (s MeshEnvelopeSender) Send(ctx context.Context, envelope SignedFabricServiceChannelEnvelope) (FabricResponse, error) {
|
|
if s.Runtime == nil && s.ResponseRuntime == nil {
|
|
return FabricResponse{}, ErrMeshEnvelopeRuntimeRequired
|
|
}
|
|
if strings.TrimSpace(s.RouteSet.Primary.RouteID) == "" && len(s.RouteSet.WarmStandby) == 0 && len(s.RouteSet.ColdFallbacks) == 0 {
|
|
return FabricResponse{}, ErrMeshEnvelopeRouteRequired
|
|
}
|
|
spec, err := s.channelSpec(envelope)
|
|
if err != nil {
|
|
return FabricResponse{}, err
|
|
}
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
return FabricResponse{}, err
|
|
}
|
|
if s.ResponseRuntime != nil {
|
|
result, err := s.ResponseRuntime.SendRequestResponse(ctx, spec, s.routeSet(spec), payload)
|
|
if err != nil {
|
|
return FabricResponse{}, err
|
|
}
|
|
responsePayload, err := unwrapWebIngressForwardResponse(result.ResponsePayload)
|
|
if err != nil {
|
|
return FabricResponse{}, err
|
|
}
|
|
if response, ok := decodeRuntimeHTTPResponse(responsePayload); ok {
|
|
return response, nil
|
|
}
|
|
return acceptedDeliveryResponse(spec.ChannelID, result.FabricChannelRuntimeResult)
|
|
}
|
|
result, err := s.Runtime.SendReliable(ctx, spec, s.routeSet(spec), [][]byte{payload})
|
|
if err != nil {
|
|
return FabricResponse{}, err
|
|
}
|
|
return acceptedDeliveryResponse(spec.ChannelID, result)
|
|
}
|
|
|
|
func unwrapWebIngressForwardResponse(payload []byte) ([]byte, error) {
|
|
var response struct {
|
|
Payload json.RawMessage `json:"payload,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
if len(payload) == 0 || json.Unmarshal(payload, &response) != nil {
|
|
return payload, nil
|
|
}
|
|
if strings.TrimSpace(response.Error) != "" {
|
|
return nil, fmt.Errorf("%w: %s", ErrMeshEnvelopeRuntimeRequired, response.Error)
|
|
}
|
|
if len(response.Payload) == 0 {
|
|
return payload, nil
|
|
}
|
|
return append([]byte(nil), response.Payload...), nil
|
|
}
|
|
|
|
func acceptedDeliveryResponse(channelID string, result mesh.FabricChannelRuntimeResult) (FabricResponse, error) {
|
|
response, err := json.Marshal(MeshEnvelopeDeliveryResponse{
|
|
SchemaVersion: "rap.web_ingress.mesh_envelope_delivery_response.v1",
|
|
Status: "accepted",
|
|
ChannelID: channelID,
|
|
RouteID: result.Channel.RouteID,
|
|
TargetNode: result.Channel.TargetNode,
|
|
BytesSent: result.BytesSent,
|
|
FramesSent: result.FramesSent,
|
|
AcksReceived: result.AcksReceived,
|
|
MigrationEvents: result.MigrationEvents,
|
|
})
|
|
if err != nil {
|
|
return FabricResponse{}, err
|
|
}
|
|
return FabricResponse{
|
|
StatusCode: http.StatusAccepted,
|
|
Headers: http.Header{"Content-Type": []string{"application/json"}},
|
|
Body: response,
|
|
}, nil
|
|
}
|
|
|
|
func decodeRuntimeHTTPResponse(payload []byte) (FabricResponse, bool) {
|
|
var response struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
StatusCode int `json:"status_code"`
|
|
Headers map[string][]string `json:"headers,omitempty"`
|
|
BodyBase64 string `json:"body_b64,omitempty"`
|
|
Body string `json:"body,omitempty"`
|
|
}
|
|
if len(payload) == 0 || json.Unmarshal(payload, &response) != nil {
|
|
return FabricResponse{}, false
|
|
}
|
|
if response.SchemaVersion != FabricRuntimeResponseSchema {
|
|
return FabricResponse{}, false
|
|
}
|
|
body := []byte(response.Body)
|
|
if response.BodyBase64 != "" {
|
|
decoded, err := decodeEnvelopeBase64(response.BodyBase64)
|
|
if err != nil {
|
|
return FabricResponse{}, false
|
|
}
|
|
body = decoded
|
|
}
|
|
headers := http.Header{}
|
|
for key, values := range response.Headers {
|
|
if !safeResponseHeader(key) {
|
|
continue
|
|
}
|
|
for _, value := range values {
|
|
headers.Add(key, value)
|
|
}
|
|
}
|
|
return FabricResponse{StatusCode: response.StatusCode, Headers: headers, Body: body}, true
|
|
}
|
|
|
|
func (s MeshEnvelopeSender) channelSpec(envelope SignedFabricServiceChannelEnvelope) (mesh.FabricChannelSpec, error) {
|
|
clusterID := strings.TrimSpace(s.ClusterID)
|
|
sourceNodeID := strings.TrimSpace(s.SourceNodeID)
|
|
targetID := strings.TrimSpace(s.TargetID)
|
|
if clusterID == "" || sourceNodeID == "" || targetID == "" {
|
|
return mesh.FabricChannelSpec{}, ErrMeshEnvelopeIdentityInvalid
|
|
}
|
|
targetKind := s.TargetKind
|
|
if targetKind == "" {
|
|
targetKind = mesh.FabricChannelTargetPool
|
|
}
|
|
channelID := strings.TrimSpace(s.ChannelID)
|
|
if channelID == "" {
|
|
channelID = defaultMeshEnvelopeChannelID(envelope, s.now())
|
|
}
|
|
spec := mesh.FabricChannelSpec{
|
|
ChannelID: channelID,
|
|
ClusterID: clusterID,
|
|
SourceNodeID: sourceNodeID,
|
|
TargetKind: targetKind,
|
|
TargetID: targetID,
|
|
TrafficClass: "control",
|
|
StickyKey: envelope.Envelope.Scope + ":" + envelope.Envelope.ServiceClass,
|
|
CreatedAt: s.now(),
|
|
}
|
|
if err := mesh.ValidateFabricChannelSpec(spec); err != nil {
|
|
return mesh.FabricChannelSpec{}, err
|
|
}
|
|
return spec, nil
|
|
}
|
|
|
|
func (s MeshEnvelopeSender) routeSet(spec mesh.FabricChannelSpec) mesh.FabricRouteSet {
|
|
routeSet := s.RouteSet
|
|
if routeSet.TargetKind == "" {
|
|
routeSet.TargetKind = spec.TargetKind
|
|
}
|
|
if strings.TrimSpace(routeSet.TargetID) == "" {
|
|
routeSet.TargetID = spec.TargetID
|
|
}
|
|
return routeSet
|
|
}
|
|
|
|
func (s MeshEnvelopeSender) now() time.Time {
|
|
if s.Now != nil {
|
|
return s.Now().UTC()
|
|
}
|
|
return time.Now().UTC()
|
|
}
|
|
|
|
func defaultMeshEnvelopeChannelID(envelope SignedFabricServiceChannelEnvelope, now time.Time) string {
|
|
serviceClass := strings.ReplaceAll(strings.TrimSpace(envelope.Envelope.ServiceClass), "_", "-")
|
|
if serviceClass == "" {
|
|
serviceClass = "web-ingress"
|
|
}
|
|
return fmt.Sprintf("web-ingress-%s-%d", serviceClass, now.UnixNano())
|
|
}
|