Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

253 lines
11 KiB
Go

package mesh
import (
"context"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
)
type ProductionEnvelopeObserver func(context.Context, ProductionEnvelopeObservation) error
type ProductionEnvelopeDelivery func(context.Context, ProductionEnvelope) error
type ProductionForwardLogger func(ProductionForwardLogEntry)
type FabricServiceChannelAccessLogger func(FabricServiceChannelAccessLogEntry)
type FabricSessionEventLogger func(FabricSessionEventLogEntry)
type FabricSessionEventLogEntry struct {
Event string `json:"event"`
ClusterID string `json:"cluster_id,omitempty"`
NodeID string `json:"node_id,omitempty"`
PeerID string `json:"peer_id,omitempty"`
AcceptedBy string `json:"accepted_by,omitempty"`
SessionID string `json:"session_id,omitempty"`
SessionEvent fabricproto.SessionEventType `json:"session_event,omitempty"`
StreamID uint64 `json:"stream_id,omitempty"`
Sequence uint64 `json:"sequence,omitempty"`
TrafficClass fabricproto.TrafficClass `json:"traffic_class,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
Reason string `json:"reason,omitempty"`
ObservedAt time.Time `json:"observed_at"`
}
type FabricServiceChannelAccessLogEntry struct {
SchemaVersion string `json:"schema_version,omitempty"`
Event string `json:"event,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
ServiceClass string `json:"service_class,omitempty"`
ChannelClass string `json:"channel_class,omitempty"`
PreferredRouteID string `json:"preferred_route_id,omitempty"`
AcceptedBy string `json:"accepted_by,omitempty"`
Method string `json:"method,omitempty"`
Path string `json:"path,omitempty"`
StatusCode int `json:"status_code,omitempty"`
DataPlaneMode string `json:"data_plane_mode,omitempty"`
WorkingDataTransport string `json:"working_data_transport,omitempty"`
SteadyStateTransport string `json:"steady_state_transport,omitempty"`
DegradedRoutePolicy string `json:"degraded_route_policy,omitempty"`
LogicalFlowMode string `json:"logical_flow_mode,omitempty"`
ViolationStatus string `json:"violation_status,omitempty"`
ViolationReason string `json:"violation_reason,omitempty"`
DegradedRouteRequested bool `json:"degraded_route_requested,omitempty"`
DataPlaneValid bool `json:"data_plane_valid"`
OccurredAt time.Time `json:"occurred_at"`
}
type RemoteWorkspaceFrameBatchFrame struct {
Droppable bool `json:"droppable"`
}
type RemoteWorkspaceFrameBatchDelivery struct {
ClusterID string `json:"cluster_id,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
ServiceClass string `json:"service_class,omitempty"`
ChannelClass string `json:"channel_class,omitempty"`
PreferredRouteID string `json:"preferred_route_id,omitempty"`
AdapterContractID string `json:"adapter_contract_id,omitempty"`
AdapterSessionID string `json:"adapter_session_id,omitempty"`
Frames []RemoteWorkspaceFrameBatchFrame `json:"frames,omitempty"`
}
type RemoteWorkspaceFrameBatchDeliveryReceipt struct {
SchemaVersion string `json:"schema_version,omitempty"`
Sink string `json:"sink,omitempty"`
Accepted bool `json:"accepted"`
ProbeOnly bool `json:"probe_only"`
ClusterID string `json:"cluster_id,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
ServiceClass string `json:"service_class,omitempty"`
ChannelClass string `json:"channel_class,omitempty"`
AdapterContractID string `json:"adapter_contract_id,omitempty"`
AdapterSessionID string `json:"adapter_session_id,omitempty"`
AdapterRuntimeID string `json:"adapter_runtime_id,omitempty"`
SessionState string `json:"session_state,omitempty"`
SessionCreatedAt string `json:"session_created_at,omitempty"`
SessionBoundAt string `json:"session_bound_at,omitempty"`
SessionLastActive string `json:"session_last_active,omitempty"`
SessionLifecycle string `json:"session_lifecycle,omitempty"`
SessionDeliveries int64 `json:"session_deliveries,omitempty"`
SessionPressure int64 `json:"session_pressure,omitempty"`
MailboxDepth int `json:"mailbox_depth,omitempty"`
MailboxEnqueued int64 `json:"mailbox_enqueued,omitempty"`
FrameCount int `json:"frame_count,omitempty"`
QueueCapacity int `json:"queue_capacity,omitempty"`
QueueDepth int `json:"queue_depth,omitempty"`
AcceptedFrames int `json:"accepted_frames,omitempty"`
DroppedFrames int `json:"dropped_frames,omitempty"`
AckedFrames int `json:"acked_frames,omitempty"`
Backpressure bool `json:"backpressure"`
DropPolicy string `json:"drop_policy,omitempty"`
DeliverySequence uint64 `json:"delivery_sequence,omitempty"`
DeliveredAt string `json:"delivered_at,omitempty"`
}
type Server struct {
Local PeerIdentity
ProductionForwardingEnabled bool
ProductionEnvelopeObserver ProductionEnvelopeObserver
ProductionEnvelopeDelivery ProductionEnvelopeDelivery
ProductionForwardTransport ProductionForwardTransport
ProductionForwardLogger ProductionForwardLogger
ProductionRoutes []SyntheticRoute
}
func (s Server) ForwardProduction(ctx context.Context, envelope ProductionEnvelope) (ProductionForwardResult, error) {
if !s.ProductionForwardingEnabled {
return ProductionForwardResult{}, ErrForwardDisabled
}
if err := ValidateProductionEnvelope(s.Local, envelope, time.Now().UTC()); err != nil {
return ProductionForwardResult{}, err
}
if err := ValidateProductionEnvelopeRouteConfig(s.Local, envelope, s.ProductionRoutes, time.Now().UTC()); err != nil {
return ProductionForwardResult{}, err
}
s.logProductionForward(productionForwardLogEntry("production_forward_accepted", s.Local, envelope, "", 0))
if s.ProductionEnvelopeObserver != nil {
observation := NewProductionEnvelopeObservation(envelope, time.Now().UTC())
if err := observeProductionEnvelope(ctx, s.ProductionEnvelopeObserver, observation); err != nil {
s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardObservationFailed.Error(), 500))
return ProductionForwardResult{}, ErrForwardObservationFailed
}
}
if envelope.DestinationNodeID == s.Local.NodeID {
if err := deliverProductionEnvelope(ctx, s.ProductionEnvelopeDelivery, envelope); err != nil {
s.logProductionForward(productionForwardLogEntry("production_forward_rejected", s.Local, envelope, ErrForwardDeliveryFailed.Error(), 500))
return ProductionForwardResult{}, ErrForwardDeliveryFailed
}
s.logProductionForward(productionForwardLogEntry("production_forward_delivered", s.Local, envelope, "", 200))
return ProductionForwardResult{
Accepted: true,
Delivered: true,
By: s.Local,
MessageID: envelope.MessageID,
RouteID: envelope.RouteID,
}, nil
}
if envelope.NextHopNodeID == s.Local.NodeID {
return ProductionForwardResult{}, ErrLoopDetected
}
if len(envelope.RoutePath) == 0 && envelope.NextHopNodeID != envelope.DestinationNodeID {
return ProductionForwardResult{}, ErrForwardRuntimeUnavailable
}
if s.ProductionForwardTransport == nil {
return ProductionForwardResult{}, ErrForwardRuntimeUnavailable
}
if envelope.TTL <= 1 {
return ProductionForwardResult{}, ErrTTLExhausted
}
forwarded := envelope
forwarded.CurrentHopNodeID = envelope.NextHopNodeID
forwarded.NextHopNodeID = nextProductionHopAfter(envelope.RoutePath, envelope.NextHopNodeID, envelope.DestinationNodeID)
forwarded.TTL = envelope.TTL - 1
forwarded.HopCount = envelope.HopCount + 1
forwarded.VisitedNodeIDs = append(append([]string{}, envelope.VisitedNodeIDs...), s.Local.NodeID)
result, err := s.ProductionForwardTransport.SendProduction(ctx, envelope.NextHopNodeID, forwarded)
if err != nil {
return ProductionForwardResult{}, err
}
s.logProductionForward(productionForwardLogEntry("production_forward_forwarded", s.Local, envelope, "", 200))
result.Accepted = true
result.Forwarded = true
result.By = s.Local
result.MessageID = envelope.MessageID
result.RouteID = envelope.RouteID
result.NextNodeID = envelope.NextHopNodeID
return result, nil
}
func (s Server) logProductionForward(entry ProductionForwardLogEntry) {
if s.ProductionForwardLogger == nil {
return
}
if entry.OccurredAt.IsZero() {
entry.OccurredAt = time.Now().UTC()
}
s.ProductionForwardLogger(entry)
}
func productionForwardLogEntry(event string, local PeerIdentity, envelope ProductionEnvelope, reason string, statusCode int) ProductionForwardLogEntry {
return ProductionForwardLogEntry{
Event: event,
RouteID: envelope.RouteID,
MessageID: envelope.MessageID,
ClusterID: envelope.ClusterID,
LocalNodeID: local.NodeID,
SourceNodeID: envelope.SourceNodeID,
DestinationNodeID: envelope.DestinationNodeID,
CurrentHopNodeID: envelope.CurrentHopNodeID,
NextHopNodeID: envelope.NextHopNodeID,
ChannelClass: envelope.ChannelClass,
MessageType: envelope.MessageType,
Reason: reason,
StatusCode: statusCode,
TTL: envelope.TTL,
HopCount: envelope.HopCount,
RoutePathLength: len(envelope.RoutePath),
VisitedCount: len(envelope.VisitedNodeIDs),
PayloadLength: envelope.PayloadLength,
OccurredAt: time.Now().UTC(),
}
}
func nextProductionHopAfter(routePath []string, currentNodeID string, destinationNodeID string) string {
if len(routePath) == 0 {
return destinationNodeID
}
for index, nodeID := range routePath {
if nodeID == currentNodeID {
if index >= len(routePath)-1 {
return currentNodeID
}
return routePath[index+1]
}
}
return destinationNodeID
}
func observeProductionEnvelope(ctx context.Context, observer ProductionEnvelopeObserver, observation ProductionEnvelopeObservation) (err error) {
if observer == nil {
return nil
}
defer func() {
if recover() != nil {
err = ErrForwardObservationFailed
}
}()
return observer(ctx, observation)
}
func deliverProductionEnvelope(ctx context.Context, delivery ProductionEnvelopeDelivery, envelope ProductionEnvelope) (err error) {
if delivery == nil {
return nil
}
defer func() {
if recover() != nil {
err = ErrForwardDeliveryFailed
}
}()
return delivery(ctx, envelope)
}