package mesh import ( "sync" "time" ) type SyntheticRelaySchedulerConfig struct { Enabled bool Local PeerIdentity QueuePolicies []SyntheticRelayQueuePolicy AllowedChannels []string AllowedMessageTypes []string Now func() time.Time Logger func(SyntheticLogEntry) } type SyntheticRelayScheduler struct { enabled bool local PeerIdentity policies map[string]SyntheticRelayQueuePolicy allowedChannels map[string]struct{} allowedMessageTypes map[string]struct{} priorityOrder []string now func() time.Time logger func(SyntheticLogEntry) mu sync.Mutex queues map[string][]SyntheticEnvelope metrics SyntheticRelayQueueMetrics } func NewSyntheticRelayScheduler(cfg SyntheticRelaySchedulerConfig) *SyntheticRelayScheduler { policies := cfg.QueuePolicies if len(policies) == 0 { policies = []SyntheticRelayQueuePolicy{ {Channel: SyntheticChannelFabricControl, Capacity: 64, Droppable: false}, {Channel: SyntheticChannelRouteControl, Capacity: 64, Droppable: false}, {Channel: SyntheticChannelTelemetry, Capacity: 16, Droppable: true}, } } policyMap := map[string]SyntheticRelayQueuePolicy{} allowedChannels := map[string]struct{}{} priorityOrder := make([]string, 0, len(policies)) for _, policy := range policies { if policy.Channel == "" { continue } if policy.Capacity <= 0 { policy.Capacity = 1 } policyMap[policy.Channel] = policy allowedChannels[policy.Channel] = struct{}{} priorityOrder = append(priorityOrder, policy.Channel) } for _, channel := range cfg.AllowedChannels { if channel != "" { allowedChannels[channel] = struct{}{} } } messageTypes := cfg.AllowedMessageTypes if len(messageTypes) == 0 { messageTypes = []string{ SyntheticMessageProbe, SyntheticMessageProbeAck, SyntheticMessageRouteHealth, SyntheticMessageRouteHealthAck, SyntheticMessageTelemetry, SyntheticMessageTestService, SyntheticMessageTestServiceAck, } } allowedMessageTypes := map[string]struct{}{} for _, messageType := range messageTypes { if messageType != "" { allowedMessageTypes[messageType] = struct{}{} } } now := cfg.Now if now == nil { now = func() time.Time { return time.Now().UTC() } } return &SyntheticRelayScheduler{ enabled: cfg.Enabled, local: cfg.Local, policies: policyMap, allowedChannels: allowedChannels, allowedMessageTypes: allowedMessageTypes, priorityOrder: priorityOrder, now: now, logger: cfg.Logger, queues: map[string][]SyntheticEnvelope{}, metrics: SyntheticRelayQueueMetrics{ QueueDepths: map[string]int{}, }, } } func (s *SyntheticRelayScheduler) Enqueue(envelope SyntheticEnvelope) (SyntheticRelayEnqueueResult, error) { if err := s.validateEnvelope(envelope); err != nil { s.reject(envelope, err) return SyntheticRelayEnqueueResult{}, err } policy := s.policies[envelope.Channel] result := SyntheticRelayEnqueueResult{ Channel: envelope.Channel, QueueCapacity: policy.Capacity, AcceptedSequence: envelope.Sequence, } s.mu.Lock() queue := s.queues[envelope.Channel] if len(queue) >= policy.Capacity { if !policy.Droppable { s.metrics.Rejected++ s.metrics.LastRejectReason = ErrSyntheticRelayQueueFull.Error() s.mu.Unlock() s.log(SyntheticLogEntry{ Event: "fabric_relay_rejected", RouteID: envelope.RouteID, ClusterID: envelope.ClusterID, LocalNodeID: s.local.NodeID, Channel: envelope.Channel, MessageType: envelope.MessageType, Reason: ErrSyntheticRelayQueueFull.Error(), QueueDepth: len(queue), QueueCapacity: policy.Capacity, OccurredAt: s.now(), }) return SyntheticRelayEnqueueResult{}, ErrSyntheticRelayQueueFull } result.Dropped = true result.DroppedSequence = queue[0].Sequence queue = queue[1:] s.metrics.Dropped++ } queue = append(queue, envelope) s.queues[envelope.Channel] = queue result.QueueDepth = len(queue) s.metrics.Enqueued++ s.metrics.QueueDepths[envelope.Channel] = len(queue) s.mu.Unlock() s.log(SyntheticLogEntry{ Event: "fabric_relay_enqueued", RouteID: envelope.RouteID, ClusterID: envelope.ClusterID, LocalNodeID: s.local.NodeID, Channel: envelope.Channel, MessageType: envelope.MessageType, QueueDepth: result.QueueDepth, QueueCapacity: result.QueueCapacity, Dropped: result.Dropped, DroppedSequence: result.DroppedSequence, OccurredAt: s.now(), }) return result, nil } func (s *SyntheticRelayScheduler) Dequeue() (SyntheticEnvelope, error) { if !s.enabled { return SyntheticEnvelope{}, ErrMeshRuntimeDisabled } s.mu.Lock() for _, channel := range s.priorityOrder { queue := s.queues[channel] if len(queue) == 0 { continue } envelope := queue[0] queue = queue[1:] s.queues[channel] = queue s.metrics.Dequeued++ s.metrics.QueueDepths[channel] = len(queue) s.mu.Unlock() s.log(SyntheticLogEntry{ Event: "fabric_relay_dequeued", RouteID: envelope.RouteID, ClusterID: envelope.ClusterID, LocalNodeID: s.local.NodeID, Channel: envelope.Channel, MessageType: envelope.MessageType, QueueDepth: len(queue), QueueCapacity: s.policies[channel].Capacity, OccurredAt: s.now(), }) return envelope, nil } s.mu.Unlock() return SyntheticEnvelope{}, ErrSyntheticRelayQueueEmpty } func (s *SyntheticRelayScheduler) SnapshotQueueMetrics() SyntheticRelayQueueMetrics { s.mu.Lock() defer s.mu.Unlock() depths := map[string]int{} for channel, depth := range s.metrics.QueueDepths { depths[channel] = depth } for channel, queue := range s.queues { depths[channel] = len(queue) } return SyntheticRelayQueueMetrics{ Enqueued: s.metrics.Enqueued, Dequeued: s.metrics.Dequeued, Dropped: s.metrics.Dropped, Rejected: s.metrics.Rejected, LastRejectReason: s.metrics.LastRejectReason, QueueDepths: depths, } } func (s *SyntheticRelayScheduler) validateEnvelope(envelope SyntheticEnvelope) error { if s == nil || !s.enabled { return ErrMeshRuntimeDisabled } if envelope.ProtocolVersion != ProtocolVersion { return ErrUnsupportedSyntheticMessage } if envelope.RouteID == "" { return ErrRouteIDRequired } if envelope.ClusterID == "" || envelope.ClusterID != s.local.ClusterID { return ErrClusterMismatch } if envelope.From.ClusterID != s.local.ClusterID || envelope.From.NodeID == "" { return ErrNodeMismatch } if envelope.To.ClusterID != s.local.ClusterID || envelope.To.NodeID != s.local.NodeID { return ErrNodeMismatch } if envelope.TTL <= 0 { return ErrTTLExhausted } if envelope.HopCount <= 0 { return ErrInvalidRoutePath } if contains(envelope.Visited, s.local.NodeID) { return ErrLoopDetected } if _, ok := s.allowedChannels[envelope.Channel]; !ok { return ErrUnauthorizedChannel } if _, ok := s.policies[envelope.Channel]; !ok { return ErrUnauthorizedChannel } if _, ok := s.allowedMessageTypes[envelope.MessageType]; !ok { return ErrUnsupportedSyntheticMessage } return nil } func (s *SyntheticRelayScheduler) reject(envelope SyntheticEnvelope, err error) { reason := "" if err != nil { reason = err.Error() } if s != nil { s.mu.Lock() s.metrics.Rejected++ s.metrics.LastRejectReason = reason s.mu.Unlock() } if s != nil { s.log(SyntheticLogEntry{ Event: "fabric_relay_rejected", RouteID: envelope.RouteID, ClusterID: envelope.ClusterID, LocalNodeID: s.local.NodeID, Channel: envelope.Channel, MessageType: envelope.MessageType, Reason: reason, OccurredAt: s.now(), }) } } func (s *SyntheticRelayScheduler) log(entry SyntheticLogEntry) { if s.logger != nil { s.logger(entry) } }