281 lines
7.8 KiB
Go
281 lines
7.8 KiB
Go
package mesh
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type SyntheticRelaySchedulerConfig struct {
|
|
Enabled bool
|
|
Local PeerIdentity
|
|
QueuePolicies []SyntheticRelayQueuePolicy
|
|
AllowedChannels []string
|
|
AllowedMessageTypes []string
|
|
Now func() time.Time
|
|
Logger func(SyntheticLogEntry)
|
|
}
|
|
|
|
type SyntheticRelayScheduler struct {
|
|
enabled bool
|
|
local PeerIdentity
|
|
policies map[string]SyntheticRelayQueuePolicy
|
|
allowedChannels map[string]struct{}
|
|
allowedMessageTypes map[string]struct{}
|
|
priorityOrder []string
|
|
now func() time.Time
|
|
logger func(SyntheticLogEntry)
|
|
|
|
mu sync.Mutex
|
|
queues map[string][]SyntheticEnvelope
|
|
metrics SyntheticRelayQueueMetrics
|
|
}
|
|
|
|
func NewSyntheticRelayScheduler(cfg SyntheticRelaySchedulerConfig) *SyntheticRelayScheduler {
|
|
policies := cfg.QueuePolicies
|
|
if len(policies) == 0 {
|
|
policies = []SyntheticRelayQueuePolicy{
|
|
{Channel: SyntheticChannelFabricControl, Capacity: 64, Droppable: false},
|
|
{Channel: SyntheticChannelRouteControl, Capacity: 64, Droppable: false},
|
|
{Channel: SyntheticChannelTelemetry, Capacity: 16, Droppable: true},
|
|
}
|
|
}
|
|
policyMap := map[string]SyntheticRelayQueuePolicy{}
|
|
allowedChannels := map[string]struct{}{}
|
|
priorityOrder := make([]string, 0, len(policies))
|
|
for _, policy := range policies {
|
|
if policy.Channel == "" {
|
|
continue
|
|
}
|
|
if policy.Capacity <= 0 {
|
|
policy.Capacity = 1
|
|
}
|
|
policyMap[policy.Channel] = policy
|
|
allowedChannels[policy.Channel] = struct{}{}
|
|
priorityOrder = append(priorityOrder, policy.Channel)
|
|
}
|
|
for _, channel := range cfg.AllowedChannels {
|
|
if channel != "" {
|
|
allowedChannels[channel] = struct{}{}
|
|
}
|
|
}
|
|
messageTypes := cfg.AllowedMessageTypes
|
|
if len(messageTypes) == 0 {
|
|
messageTypes = []string{
|
|
SyntheticMessageProbe,
|
|
SyntheticMessageProbeAck,
|
|
SyntheticMessageRouteHealth,
|
|
SyntheticMessageRouteHealthAck,
|
|
SyntheticMessageTelemetry,
|
|
SyntheticMessageTestService,
|
|
SyntheticMessageTestServiceAck,
|
|
}
|
|
}
|
|
allowedMessageTypes := map[string]struct{}{}
|
|
for _, messageType := range messageTypes {
|
|
if messageType != "" {
|
|
allowedMessageTypes[messageType] = struct{}{}
|
|
}
|
|
}
|
|
now := cfg.Now
|
|
if now == nil {
|
|
now = func() time.Time { return time.Now().UTC() }
|
|
}
|
|
return &SyntheticRelayScheduler{
|
|
enabled: cfg.Enabled,
|
|
local: cfg.Local,
|
|
policies: policyMap,
|
|
allowedChannels: allowedChannels,
|
|
allowedMessageTypes: allowedMessageTypes,
|
|
priorityOrder: priorityOrder,
|
|
now: now,
|
|
logger: cfg.Logger,
|
|
queues: map[string][]SyntheticEnvelope{},
|
|
metrics: SyntheticRelayQueueMetrics{
|
|
QueueDepths: map[string]int{},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *SyntheticRelayScheduler) Enqueue(envelope SyntheticEnvelope) (SyntheticRelayEnqueueResult, error) {
|
|
if err := s.validateEnvelope(envelope); err != nil {
|
|
s.reject(envelope, err)
|
|
return SyntheticRelayEnqueueResult{}, err
|
|
}
|
|
policy := s.policies[envelope.Channel]
|
|
result := SyntheticRelayEnqueueResult{
|
|
Channel: envelope.Channel,
|
|
QueueCapacity: policy.Capacity,
|
|
AcceptedSequence: envelope.Sequence,
|
|
}
|
|
s.mu.Lock()
|
|
queue := s.queues[envelope.Channel]
|
|
if len(queue) >= policy.Capacity {
|
|
if !policy.Droppable {
|
|
s.metrics.Rejected++
|
|
s.metrics.LastRejectReason = ErrSyntheticRelayQueueFull.Error()
|
|
s.mu.Unlock()
|
|
s.log(SyntheticLogEntry{
|
|
Event: "fabric_relay_rejected",
|
|
RouteID: envelope.RouteID,
|
|
ClusterID: envelope.ClusterID,
|
|
LocalNodeID: s.local.NodeID,
|
|
Channel: envelope.Channel,
|
|
MessageType: envelope.MessageType,
|
|
Reason: ErrSyntheticRelayQueueFull.Error(),
|
|
QueueDepth: len(queue),
|
|
QueueCapacity: policy.Capacity,
|
|
OccurredAt: s.now(),
|
|
})
|
|
return SyntheticRelayEnqueueResult{}, ErrSyntheticRelayQueueFull
|
|
}
|
|
result.Dropped = true
|
|
result.DroppedSequence = queue[0].Sequence
|
|
queue = queue[1:]
|
|
s.metrics.Dropped++
|
|
}
|
|
queue = append(queue, envelope)
|
|
s.queues[envelope.Channel] = queue
|
|
result.QueueDepth = len(queue)
|
|
s.metrics.Enqueued++
|
|
s.metrics.QueueDepths[envelope.Channel] = len(queue)
|
|
s.mu.Unlock()
|
|
s.log(SyntheticLogEntry{
|
|
Event: "fabric_relay_enqueued",
|
|
RouteID: envelope.RouteID,
|
|
ClusterID: envelope.ClusterID,
|
|
LocalNodeID: s.local.NodeID,
|
|
Channel: envelope.Channel,
|
|
MessageType: envelope.MessageType,
|
|
QueueDepth: result.QueueDepth,
|
|
QueueCapacity: result.QueueCapacity,
|
|
Dropped: result.Dropped,
|
|
DroppedSequence: result.DroppedSequence,
|
|
OccurredAt: s.now(),
|
|
})
|
|
return result, nil
|
|
}
|
|
|
|
func (s *SyntheticRelayScheduler) Dequeue() (SyntheticEnvelope, error) {
|
|
if !s.enabled {
|
|
return SyntheticEnvelope{}, ErrMeshRuntimeDisabled
|
|
}
|
|
s.mu.Lock()
|
|
for _, channel := range s.priorityOrder {
|
|
queue := s.queues[channel]
|
|
if len(queue) == 0 {
|
|
continue
|
|
}
|
|
envelope := queue[0]
|
|
queue = queue[1:]
|
|
s.queues[channel] = queue
|
|
s.metrics.Dequeued++
|
|
s.metrics.QueueDepths[channel] = len(queue)
|
|
s.mu.Unlock()
|
|
s.log(SyntheticLogEntry{
|
|
Event: "fabric_relay_dequeued",
|
|
RouteID: envelope.RouteID,
|
|
ClusterID: envelope.ClusterID,
|
|
LocalNodeID: s.local.NodeID,
|
|
Channel: envelope.Channel,
|
|
MessageType: envelope.MessageType,
|
|
QueueDepth: len(queue),
|
|
QueueCapacity: s.policies[channel].Capacity,
|
|
OccurredAt: s.now(),
|
|
})
|
|
return envelope, nil
|
|
}
|
|
s.mu.Unlock()
|
|
return SyntheticEnvelope{}, ErrSyntheticRelayQueueEmpty
|
|
}
|
|
|
|
func (s *SyntheticRelayScheduler) SnapshotQueueMetrics() SyntheticRelayQueueMetrics {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
depths := map[string]int{}
|
|
for channel, depth := range s.metrics.QueueDepths {
|
|
depths[channel] = depth
|
|
}
|
|
for channel, queue := range s.queues {
|
|
depths[channel] = len(queue)
|
|
}
|
|
return SyntheticRelayQueueMetrics{
|
|
Enqueued: s.metrics.Enqueued,
|
|
Dequeued: s.metrics.Dequeued,
|
|
Dropped: s.metrics.Dropped,
|
|
Rejected: s.metrics.Rejected,
|
|
LastRejectReason: s.metrics.LastRejectReason,
|
|
QueueDepths: depths,
|
|
}
|
|
}
|
|
|
|
func (s *SyntheticRelayScheduler) validateEnvelope(envelope SyntheticEnvelope) error {
|
|
if s == nil || !s.enabled {
|
|
return ErrMeshRuntimeDisabled
|
|
}
|
|
if envelope.ProtocolVersion != ProtocolVersion {
|
|
return ErrUnsupportedSyntheticMessage
|
|
}
|
|
if envelope.RouteID == "" {
|
|
return ErrRouteIDRequired
|
|
}
|
|
if envelope.ClusterID == "" || envelope.ClusterID != s.local.ClusterID {
|
|
return ErrClusterMismatch
|
|
}
|
|
if envelope.From.ClusterID != s.local.ClusterID || envelope.From.NodeID == "" {
|
|
return ErrNodeMismatch
|
|
}
|
|
if envelope.To.ClusterID != s.local.ClusterID || envelope.To.NodeID != s.local.NodeID {
|
|
return ErrNodeMismatch
|
|
}
|
|
if envelope.TTL <= 0 {
|
|
return ErrTTLExhausted
|
|
}
|
|
if envelope.HopCount <= 0 {
|
|
return ErrInvalidRoutePath
|
|
}
|
|
if contains(envelope.Visited, s.local.NodeID) {
|
|
return ErrLoopDetected
|
|
}
|
|
if _, ok := s.allowedChannels[envelope.Channel]; !ok {
|
|
return ErrUnauthorizedChannel
|
|
}
|
|
if _, ok := s.policies[envelope.Channel]; !ok {
|
|
return ErrUnauthorizedChannel
|
|
}
|
|
if _, ok := s.allowedMessageTypes[envelope.MessageType]; !ok {
|
|
return ErrUnsupportedSyntheticMessage
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SyntheticRelayScheduler) reject(envelope SyntheticEnvelope, err error) {
|
|
reason := ""
|
|
if err != nil {
|
|
reason = err.Error()
|
|
}
|
|
if s != nil {
|
|
s.mu.Lock()
|
|
s.metrics.Rejected++
|
|
s.metrics.LastRejectReason = reason
|
|
s.mu.Unlock()
|
|
}
|
|
if s != nil {
|
|
s.log(SyntheticLogEntry{
|
|
Event: "fabric_relay_rejected",
|
|
RouteID: envelope.RouteID,
|
|
ClusterID: envelope.ClusterID,
|
|
LocalNodeID: s.local.NodeID,
|
|
Channel: envelope.Channel,
|
|
MessageType: envelope.MessageType,
|
|
Reason: reason,
|
|
OccurredAt: s.now(),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *SyntheticRelayScheduler) log(entry SyntheticLogEntry) {
|
|
if s.logger != nil {
|
|
s.logger(entry)
|
|
}
|
|
}
|