package mesh import ( "context" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" ) type ProductionEnvelopeObserver func(context.Context, ProductionEnvelopeObservation) error type ProductionEnvelopeDelivery func(context.Context, ProductionEnvelope) error type ProductionForwardLogger func(ProductionForwardLogEntry) type FabricServiceChannelAccessLogger func(FabricServiceChannelAccessLogEntry) type FabricSessionEventLogger func(FabricSessionEventLogEntry) type FabricSessionEventLogEntry struct { Event string `json:"event"` ClusterID string `json:"cluster_id,omitempty"` NodeID string `json:"node_id,omitempty"` PeerID string `json:"peer_id,omitempty"` AcceptedBy string `json:"accepted_by,omitempty"` SessionID string `json:"session_id,omitempty"` SessionEvent fabricproto.SessionEventType `json:"session_event,omitempty"` StreamID uint64 `json:"stream_id,omitempty"` Sequence uint64 `json:"sequence,omitempty"` TrafficClass fabricproto.TrafficClass `json:"traffic_class,omitempty"` RemoteAddr string `json:"remote_addr,omitempty"` Reason string `json:"reason,omitempty"` ObservedAt time.Time `json:"observed_at"` } type FabricServiceChannelAccessLogEntry struct { SchemaVersion string `json:"schema_version,omitempty"` Event string `json:"event,omitempty"` ClusterID string `json:"cluster_id,omitempty"` ChannelID string `json:"channel_id,omitempty"` ResourceID string `json:"resource_id,omitempty"` ServiceClass string `json:"service_class,omitempty"` ChannelClass string `json:"channel_class,omitempty"` PreferredRouteID string `json:"preferred_route_id,omitempty"` AcceptedBy string `json:"accepted_by,omitempty"` Method string `json:"method,omitempty"` Path string `json:"path,omitempty"` StatusCode int `json:"status_code,omitempty"` DataPlaneMode string `json:"data_plane_mode,omitempty"` WorkingDataTransport string `json:"working_data_transport,omitempty"` SteadyStateTransport string `json:"steady_state_transport,omitempty"` DegradedRoutePolicy string `json:"degraded_route_policy,omitempty"` LogicalFlowMode string `json:"logical_flow_mode,omitempty"` ViolationStatus string `json:"violation_status,omitempty"` ViolationReason string `json:"violation_reason,omitempty"` DegradedRouteRequested bool `json:"degraded_route_requested,omitempty"` DataPlaneValid bool `json:"data_plane_valid"` OccurredAt time.Time `json:"occurred_at"` } type RemoteWorkspaceFrameBatchFrame struct { Droppable bool `json:"droppable"` } type RemoteWorkspaceFrameBatchDelivery struct { ClusterID string `json:"cluster_id,omitempty"` ChannelID string `json:"channel_id,omitempty"` ResourceID string `json:"resource_id,omitempty"` ServiceClass string `json:"service_class,omitempty"` ChannelClass string `json:"channel_class,omitempty"` PreferredRouteID string `json:"preferred_route_id,omitempty"` AdapterContractID string `json:"adapter_contract_id,omitempty"` AdapterSessionID string `json:"adapter_session_id,omitempty"` Frames []RemoteWorkspaceFrameBatchFrame `json:"frames,omitempty"` } type RemoteWorkspaceFrameBatchDeliveryReceipt struct { SchemaVersion string `json:"schema_version,omitempty"` Sink string `json:"sink,omitempty"` Accepted bool `json:"accepted"` ProbeOnly bool `json:"probe_only"` ClusterID string `json:"cluster_id,omitempty"` ChannelID string `json:"channel_id,omitempty"` ResourceID string `json:"resource_id,omitempty"` ServiceClass string `json:"service_class,omitempty"` ChannelClass string `json:"channel_class,omitempty"` AdapterContractID string `json:"adapter_contract_id,omitempty"` AdapterSessionID string `json:"adapter_session_id,omitempty"` AdapterRuntimeID string `json:"adapter_runtime_id,omitempty"` SessionState string `json:"session_state,omitempty"` SessionCreatedAt string `json:"session_created_at,omitempty"` SessionBoundAt string `json:"session_bound_at,omitempty"` SessionLastActive string `json:"session_last_active,omitempty"` SessionLifecycle string `json:"session_lifecycle,omitempty"` SessionDeliveries int64 `json:"session_deliveries,omitempty"` SessionPressure int64 `json:"session_pressure,omitempty"` MailboxDepth int `json:"mailbox_depth,omitempty"` MailboxEnqueued int64 `json:"mailbox_enqueued,omitempty"` FrameCount int `json:"frame_count,omitempty"` QueueCapacity int `json:"queue_capacity,omitempty"` QueueDepth int `json:"queue_depth,omitempty"` AcceptedFrames int `json:"accepted_frames,omitempty"` DroppedFrames int `json:"dropped_frames,omitempty"` AckedFrames int `json:"acked_frames,omitempty"` Backpressure bool `json:"backpressure"` DropPolicy string `json:"drop_policy,omitempty"` DeliverySequence uint64 `json:"delivery_sequence,omitempty"` DeliveredAt string `json:"delivered_at,omitempty"` } type Server struct { Local PeerIdentity ProductionForwardingEnabled bool ProductionEnvelopeObserver ProductionEnvelopeObserver ProductionEnvelopeDelivery ProductionEnvelopeDelivery ProductionForwardTransport ProductionForwardTransport ProductionForwardLogger ProductionForwardLogger ProductionRoutes []SyntheticRoute } func (s Server) ForwardProduction(ctx context.Context, envelope ProductionEnvelope) (ProductionForwardResult, error) { if !s.ProductionForwardingEnabled { return ProductionForwardResult{}, ErrForwardDisabled } if err := ValidateProductionEnvelope(s.Local, envelope, time.Now().UTC()); err != nil { return ProductionForwardResult{}, err } if err := ValidateProductionEnvelopeRouteConfig(s.Local, envelope, s.ProductionRoutes, time.Now().UTC()); err != nil { return ProductionForwardResult{}, err } s.logProductionForward(productionForwardLogEntry("production_forward_accepted", s.Local, envelope, "", 0)) if s.ProductionEnvelopeObserver != nil { observation := NewProductionEnvelopeObservation(envelope, time.Now().UTC()) if err := observeProductionEnvelope(ctx, s.ProductionEnvelopeObserver, observation); err != nil { s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardObservationFailed.Error(), 500)) return ProductionForwardResult{}, ErrForwardObservationFailed } } if envelope.DestinationNodeID == s.Local.NodeID { if err := deliverProductionEnvelope(ctx, s.ProductionEnvelopeDelivery, envelope); err != nil { s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardDeliveryFailed.Error(), 500)) return ProductionForwardResult{}, ErrForwardDeliveryFailed } s.logProductionForward(productionForwardLogEntry("production_forward_delivered", s.Local, envelope, "", 200)) return ProductionForwardResult{ Accepted: true, Delivered: true, By: s.Local, MessageID: envelope.MessageID, RouteID: envelope.RouteID, }, nil } if envelope.NextHopNodeID == s.Local.NodeID { return ProductionForwardResult{}, ErrLoopDetected } if len(envelope.RoutePath) == 0 && envelope.NextHopNodeID != envelope.DestinationNodeID { return ProductionForwardResult{}, ErrForwardRuntimeUnavailable } if s.ProductionForwardTransport == nil { return ProductionForwardResult{}, ErrForwardRuntimeUnavailable } if envelope.TTL <= 1 { return ProductionForwardResult{}, ErrTTLExhausted } forwarded := envelope forwarded.CurrentHopNodeID = envelope.NextHopNodeID forwarded.NextHopNodeID = nextProductionHopAfter(envelope.RoutePath, envelope.NextHopNodeID, envelope.DestinationNodeID) forwarded.TTL = envelope.TTL - 1 forwarded.HopCount = envelope.HopCount + 1 forwarded.VisitedNodeIDs = append(append([]string{}, envelope.VisitedNodeIDs...), s.Local.NodeID) result, err := s.ProductionForwardTransport.SendProduction(ctx, envelope.NextHopNodeID, forwarded) if err != nil { return ProductionForwardResult{}, err } s.logProductionForward(productionForwardLogEntry("production_forward_forwarded", s.Local, envelope, "", 200)) result.Accepted = true result.Forwarded = true result.By = s.Local result.MessageID = envelope.MessageID result.RouteID = envelope.RouteID result.NextNodeID = envelope.NextHopNodeID return result, nil } func (s Server) logProductionForward(entry ProductionForwardLogEntry) { if s.ProductionForwardLogger == nil { return } if entry.OccurredAt.IsZero() { entry.OccurredAt = time.Now().UTC() } s.ProductionForwardLogger(entry) } func productionForwardLogEntry(event string, local PeerIdentity, envelope ProductionEnvelope, reason string, statusCode int) ProductionForwardLogEntry { return ProductionForwardLogEntry{ Event: event, RouteID: envelope.RouteID, MessageID: envelope.MessageID, ClusterID: envelope.ClusterID, LocalNodeID: local.NodeID, SourceNodeID: envelope.SourceNodeID, DestinationNodeID: envelope.DestinationNodeID, CurrentHopNodeID: envelope.CurrentHopNodeID, NextHopNodeID: envelope.NextHopNodeID, ChannelClass: envelope.ChannelClass, MessageType: envelope.MessageType, Reason: reason, StatusCode: statusCode, TTL: envelope.TTL, HopCount: envelope.HopCount, RoutePathLength: len(envelope.RoutePath), VisitedCount: len(envelope.VisitedNodeIDs), PayloadLength: envelope.PayloadLength, OccurredAt: time.Now().UTC(), } } func nextProductionHopAfter(routePath []string, currentNodeID string, destinationNodeID string) string { if len(routePath) == 0 { return destinationNodeID } for index, nodeID := range routePath { if nodeID == currentNodeID { if index >= len(routePath)-1 { return currentNodeID } return routePath[index+1] } } return destinationNodeID } func observeProductionEnvelope(ctx context.Context, observer ProductionEnvelopeObserver, observation ProductionEnvelopeObservation) (err error) { if observer == nil { return nil } defer func() { if recover() != nil { err = ErrForwardObservationFailed } }() return observer(ctx, observation) } func deliverProductionEnvelope(ctx context.Context, delivery ProductionEnvelopeDelivery, envelope ProductionEnvelope) (err error) { if delivery == nil { return nil } defer func() { if recover() != nil { err = ErrForwardDeliveryFailed } }() return delivery(ctx, envelope) }