Files
2026-04-28 22:29:50 +03:00

281 lines
7.8 KiB
Go

package mesh
import (
"sync"
"time"
)
type SyntheticRelaySchedulerConfig struct {
Enabled bool
Local PeerIdentity
QueuePolicies []SyntheticRelayQueuePolicy
AllowedChannels []string
AllowedMessageTypes []string
Now func() time.Time
Logger func(SyntheticLogEntry)
}
type SyntheticRelayScheduler struct {
enabled bool
local PeerIdentity
policies map[string]SyntheticRelayQueuePolicy
allowedChannels map[string]struct{}
allowedMessageTypes map[string]struct{}
priorityOrder []string
now func() time.Time
logger func(SyntheticLogEntry)
mu sync.Mutex
queues map[string][]SyntheticEnvelope
metrics SyntheticRelayQueueMetrics
}
func NewSyntheticRelayScheduler(cfg SyntheticRelaySchedulerConfig) *SyntheticRelayScheduler {
policies := cfg.QueuePolicies
if len(policies) == 0 {
policies = []SyntheticRelayQueuePolicy{
{Channel: SyntheticChannelFabricControl, Capacity: 64, Droppable: false},
{Channel: SyntheticChannelRouteControl, Capacity: 64, Droppable: false},
{Channel: SyntheticChannelTelemetry, Capacity: 16, Droppable: true},
}
}
policyMap := map[string]SyntheticRelayQueuePolicy{}
allowedChannels := map[string]struct{}{}
priorityOrder := make([]string, 0, len(policies))
for _, policy := range policies {
if policy.Channel == "" {
continue
}
if policy.Capacity <= 0 {
policy.Capacity = 1
}
policyMap[policy.Channel] = policy
allowedChannels[policy.Channel] = struct{}{}
priorityOrder = append(priorityOrder, policy.Channel)
}
for _, channel := range cfg.AllowedChannels {
if channel != "" {
allowedChannels[channel] = struct{}{}
}
}
messageTypes := cfg.AllowedMessageTypes
if len(messageTypes) == 0 {
messageTypes = []string{
SyntheticMessageProbe,
SyntheticMessageProbeAck,
SyntheticMessageRouteHealth,
SyntheticMessageRouteHealthAck,
SyntheticMessageTelemetry,
SyntheticMessageTestService,
SyntheticMessageTestServiceAck,
}
}
allowedMessageTypes := map[string]struct{}{}
for _, messageType := range messageTypes {
if messageType != "" {
allowedMessageTypes[messageType] = struct{}{}
}
}
now := cfg.Now
if now == nil {
now = func() time.Time { return time.Now().UTC() }
}
return &SyntheticRelayScheduler{
enabled: cfg.Enabled,
local: cfg.Local,
policies: policyMap,
allowedChannels: allowedChannels,
allowedMessageTypes: allowedMessageTypes,
priorityOrder: priorityOrder,
now: now,
logger: cfg.Logger,
queues: map[string][]SyntheticEnvelope{},
metrics: SyntheticRelayQueueMetrics{
QueueDepths: map[string]int{},
},
}
}
func (s *SyntheticRelayScheduler) Enqueue(envelope SyntheticEnvelope) (SyntheticRelayEnqueueResult, error) {
if err := s.validateEnvelope(envelope); err != nil {
s.reject(envelope, err)
return SyntheticRelayEnqueueResult{}, err
}
policy := s.policies[envelope.Channel]
result := SyntheticRelayEnqueueResult{
Channel: envelope.Channel,
QueueCapacity: policy.Capacity,
AcceptedSequence: envelope.Sequence,
}
s.mu.Lock()
queue := s.queues[envelope.Channel]
if len(queue) >= policy.Capacity {
if !policy.Droppable {
s.metrics.Rejected++
s.metrics.LastRejectReason = ErrSyntheticRelayQueueFull.Error()
s.mu.Unlock()
s.log(SyntheticLogEntry{
Event: "fabric_relay_rejected",
RouteID: envelope.RouteID,
ClusterID: envelope.ClusterID,
LocalNodeID: s.local.NodeID,
Channel: envelope.Channel,
MessageType: envelope.MessageType,
Reason: ErrSyntheticRelayQueueFull.Error(),
QueueDepth: len(queue),
QueueCapacity: policy.Capacity,
OccurredAt: s.now(),
})
return SyntheticRelayEnqueueResult{}, ErrSyntheticRelayQueueFull
}
result.Dropped = true
result.DroppedSequence = queue[0].Sequence
queue = queue[1:]
s.metrics.Dropped++
}
queue = append(queue, envelope)
s.queues[envelope.Channel] = queue
result.QueueDepth = len(queue)
s.metrics.Enqueued++
s.metrics.QueueDepths[envelope.Channel] = len(queue)
s.mu.Unlock()
s.log(SyntheticLogEntry{
Event: "fabric_relay_enqueued",
RouteID: envelope.RouteID,
ClusterID: envelope.ClusterID,
LocalNodeID: s.local.NodeID,
Channel: envelope.Channel,
MessageType: envelope.MessageType,
QueueDepth: result.QueueDepth,
QueueCapacity: result.QueueCapacity,
Dropped: result.Dropped,
DroppedSequence: result.DroppedSequence,
OccurredAt: s.now(),
})
return result, nil
}
func (s *SyntheticRelayScheduler) Dequeue() (SyntheticEnvelope, error) {
if !s.enabled {
return SyntheticEnvelope{}, ErrMeshRuntimeDisabled
}
s.mu.Lock()
for _, channel := range s.priorityOrder {
queue := s.queues[channel]
if len(queue) == 0 {
continue
}
envelope := queue[0]
queue = queue[1:]
s.queues[channel] = queue
s.metrics.Dequeued++
s.metrics.QueueDepths[channel] = len(queue)
s.mu.Unlock()
s.log(SyntheticLogEntry{
Event: "fabric_relay_dequeued",
RouteID: envelope.RouteID,
ClusterID: envelope.ClusterID,
LocalNodeID: s.local.NodeID,
Channel: envelope.Channel,
MessageType: envelope.MessageType,
QueueDepth: len(queue),
QueueCapacity: s.policies[channel].Capacity,
OccurredAt: s.now(),
})
return envelope, nil
}
s.mu.Unlock()
return SyntheticEnvelope{}, ErrSyntheticRelayQueueEmpty
}
func (s *SyntheticRelayScheduler) SnapshotQueueMetrics() SyntheticRelayQueueMetrics {
s.mu.Lock()
defer s.mu.Unlock()
depths := map[string]int{}
for channel, depth := range s.metrics.QueueDepths {
depths[channel] = depth
}
for channel, queue := range s.queues {
depths[channel] = len(queue)
}
return SyntheticRelayQueueMetrics{
Enqueued: s.metrics.Enqueued,
Dequeued: s.metrics.Dequeued,
Dropped: s.metrics.Dropped,
Rejected: s.metrics.Rejected,
LastRejectReason: s.metrics.LastRejectReason,
QueueDepths: depths,
}
}
func (s *SyntheticRelayScheduler) validateEnvelope(envelope SyntheticEnvelope) error {
if s == nil || !s.enabled {
return ErrMeshRuntimeDisabled
}
if envelope.ProtocolVersion != ProtocolVersion {
return ErrUnsupportedSyntheticMessage
}
if envelope.RouteID == "" {
return ErrRouteIDRequired
}
if envelope.ClusterID == "" || envelope.ClusterID != s.local.ClusterID {
return ErrClusterMismatch
}
if envelope.From.ClusterID != s.local.ClusterID || envelope.From.NodeID == "" {
return ErrNodeMismatch
}
if envelope.To.ClusterID != s.local.ClusterID || envelope.To.NodeID != s.local.NodeID {
return ErrNodeMismatch
}
if envelope.TTL <= 0 {
return ErrTTLExhausted
}
if envelope.HopCount <= 0 {
return ErrInvalidRoutePath
}
if contains(envelope.Visited, s.local.NodeID) {
return ErrLoopDetected
}
if _, ok := s.allowedChannels[envelope.Channel]; !ok {
return ErrUnauthorizedChannel
}
if _, ok := s.policies[envelope.Channel]; !ok {
return ErrUnauthorizedChannel
}
if _, ok := s.allowedMessageTypes[envelope.MessageType]; !ok {
return ErrUnsupportedSyntheticMessage
}
return nil
}
func (s *SyntheticRelayScheduler) reject(envelope SyntheticEnvelope, err error) {
reason := ""
if err != nil {
reason = err.Error()
}
if s != nil {
s.mu.Lock()
s.metrics.Rejected++
s.metrics.LastRejectReason = reason
s.mu.Unlock()
}
if s != nil {
s.log(SyntheticLogEntry{
Event: "fabric_relay_rejected",
RouteID: envelope.RouteID,
ClusterID: envelope.ClusterID,
LocalNodeID: s.local.NodeID,
Channel: envelope.Channel,
MessageType: envelope.MessageType,
Reason: reason,
OccurredAt: s.now(),
})
}
}
func (s *SyntheticRelayScheduler) log(entry SyntheticLogEntry) {
if s.logger != nil {
s.logger(entry)
}
}