package vpnruntime import ( "context" "encoding/binary" "errors" "fmt" "hash/fnv" "sort" "strings" "sync" "sync/atomic" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" ) const ( FabricDirectionClientToGateway = "client_to_gateway" FabricDirectionGatewayToClient = "gateway_to_client" defaultFabricFlowShardCount = 8 defaultFabricFlowQueueCapacity = 1024 defaultFabricFlowParallelSendWindow = 8 defaultFabricFlowQualityWindowCapacity = 32 defaultFabricFlowFailureThreshold = 2 defaultFabricFlowSlowSendThreshold = 2 * time.Second defaultFabricRouteQualitySwitchThreshold = 30 ) type FabricPacketTransport struct { ForwardTransport mesh.ProductionForwardTransport Inbox *FabricPacketInbox ClusterID string VPNConnectionID string RouteID string LocalNodeID string RemoteNodeID string NextHopNodeID string RoutePath []string SendDirection string ReceiveDirection string } type FabricClientPacketIngress struct { ForwardTransport mesh.ProductionForwardTransport Inbox *FabricPacketInbox Routes func() []mesh.SyntheticRoute LocalGateway func(vpnConnectionID string) bool AllowLegacyLocalGatewayFallback bool FlowScheduler *FabricFlowScheduler MaxParallelFlowSends int RecoveryPolicyFingerprint string AdaptivePolicyFingerprint string PreventLastRouteWithdrawal bool ClusterID string LocalNodeID string RouteManager FabricServiceChannelRouteManager RouteManagerTransition FabricServiceChannelRouteManagerTransition RouteQualityPreferences map[string]FabricServiceChannelRouteQualityPreference mu sync.Mutex lastSelectedRouteID string lastSelectedNextHop string lastError string sendBatches uint64 sendPackets uint64 sendRouteAttempts uint64 sendRouteFailures uint64 sendFallbackLocal uint64 sendFlowBatches uint64 sendFlowPackets uint64 sendFlowDropped uint64 sendFlowParallel uint64 receiveBatches uint64 receivePackets uint64 receiveEmpty uint64 } type FabricServiceChannelRouteManager struct { SchemaVersion string `json:"schema_version"` Generation string `json:"generation,omitempty"` RebuildRequestCount int `json:"rebuild_request_count"` RebuildAppliedCount int `json:"rebuild_applied_count"` WithdrawnRouteCount int `json:"withdrawn_route_count"` PendingFallbackCount int `json:"pending_degraded_fallback_count"` LastAppliedAt string `json:"last_applied_at,omitempty"` Decisions []FabricServiceChannelRouteManagerDecision `json:"decisions,omitempty"` withdrawnRoutes map[string]FabricServiceChannelRouteManagerDecision replacements map[string]string } type FabricServiceChannelRouteManagerTransition struct { SchemaVersion string `json:"schema_version"` PreviousGeneration string `json:"previous_generation,omitempty"` Generation string `json:"generation,omitempty"` Status string `json:"status,omitempty"` ObservedAt string `json:"observed_at,omitempty"` DecisionCount int `json:"decision_count"` WithdrawnRouteCount int `json:"withdrawn_route_count"` RestoredRouteCount int `json:"restored_route_count"` ClearedSelectedRouteID string `json:"cleared_selected_route_id,omitempty"` PendingFallbackCount int `json:"pending_degraded_fallback_count"` RebuildAppliedCount int `json:"rebuild_applied_count"` } type FabricServiceChannelRouteManagerDecision struct { RouteID string `json:"route_id"` ReplacementRouteID string `json:"replacement_route_id,omitempty"` RebuildRequestID string `json:"rebuild_request_id,omitempty"` RebuildStatus string `json:"rebuild_status,omitempty"` RebuildReason string `json:"rebuild_reason,omitempty"` RebuildAttempt int `json:"rebuild_attempt,omitempty"` DecisionSource string `json:"decision_source,omitempty"` Generation string `json:"generation,omitempty"` EffectiveHops []string `json:"effective_hops,omitempty"` } type FabricServiceChannelRouteQualityPreference struct { RouteID string `json:"route_id"` FeedbackStatus string `json:"feedback_status,omitempty"` ScoreAdjustment int `json:"score_adjustment"` RawScoreAdjustment int `json:"raw_score_adjustment,omitempty"` Reasons []string `json:"reasons,omitempty"` LastSendDurationMs int64 `json:"last_send_duration_ms,omitempty"` ObservedAt string `json:"observed_at,omitempty"` ExpiresAt string `json:"expires_at,omitempty"` } type FabricFlowScheduler struct { mu sync.Mutex shardCount int queueCapacity int adaptivePolicy FabricServiceChannelAdaptivePolicy queues map[string]*fabricFlowQueue enqueued uint64 dequeued uint64 dropped uint64 highWatermark int inFlight int maxInFlight int } type FabricServiceChannelAdaptivePolicy struct { SchemaVersion string Fingerprint string MaxParallelWindow int BulkPressureChannelThreshold int QueuePressureHighWatermark int QueuePressureMaxInFlight int ClassWindows map[string]int } const ( FabricTrafficClassControl = "control" FabricTrafficClassInteractive = "interactive" FabricTrafficClassReliable = "reliable" FabricTrafficClassBulk = "bulk" FabricTrafficClassDroppable = "droppable" ) type fabricFlowQueue struct { TrafficClass string Depth int Enqueued uint64 Dequeued uint64 Dropped uint64 HighWatermark int Served uint64 InFlight int MaxInFlight int SendAttempts uint64 SendSuccesses uint64 SendFailures uint64 LastServedAt time.Time LastRouteID string RoutePolicyVersion string RouteGeneration string RecoveryPolicyFingerprint string LastNextHop string LastFailedRouteID string LastFailedRoutePolicyVersion string LastFailedRouteGeneration string LastRouteFailureAt time.Time LastRecoveredFromRouteID string LastRecoveredNextHop string LastRouteSwitchAt time.Time LastRouteRecoveryMillis int64 RouteSwitchCount uint64 LastError string ConsecutiveFailures uint64 StallCount uint64 LastSendDurationMillis int64 RouteRebuildRecommended bool DegradedFallbackRecommended bool QualityPreferenceRouteID string QualityPreferenceScore int QualityPreferenceRawScore int QualityPreferenceReasons []string LatencyLe10Millis uint64 LatencyLe100Millis uint64 LatencyLe1000Millis uint64 LatencyGt1000Millis uint64 QualityWindow []fabricFlowQualitySample } type fabricFlowQualitySample struct { At time.Time Success bool Failure bool Dropped bool Slow bool DurationMillis int64 } type fabricFlowQualityWindowStats struct { SampleCount int SuccessCount int FailureCount int SlowCount int DropCount int AvgLatencyMs int64 LastUpdatedAt time.Time } type FabricScheduledPacketBatch struct { ChannelID string FlowID string Shard int TrafficClass string Packets [][]byte QueueDepth int Dropped uint64 Classifier string ServiceMode string } type FabricFlowSchedulerSnapshot struct { SchemaVersion string `json:"schema_version"` Enabled bool `json:"enabled"` ServiceNeutral bool `json:"service_neutral"` Classifier string `json:"classifier"` ServiceMode string `json:"service_mode"` ShardCount int `json:"shard_count"` QueueCapacity int `json:"queue_capacity"` ChannelCount int `json:"channel_count"` Enqueued uint64 `json:"enqueued"` Dequeued uint64 `json:"dequeued"` Dropped uint64 `json:"dropped"` HighWatermark int `json:"high_watermark"` BackpressureActive bool `json:"backpressure_active"` InFlight int `json:"in_flight"` MaxInFlight int `json:"max_in_flight"` AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"` AdaptiveBackpressureReason string `json:"adaptive_backpressure_reason,omitempty"` RecommendedParallelWindows map[string]int `json:"recommended_parallel_windows,omitempty"` AdaptivePolicyFingerprint string `json:"adaptive_policy_fingerprint,omitempty"` BulkPressureActive bool `json:"bulk_pressure_active,omitempty"` BulkPressureChannelCount int `json:"bulk_pressure_channel_count,omitempty"` InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"` RouteRecoveredChannelCount int `json:"route_recovered_channel_count,omitempty"` RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` SlowChannelCount int `json:"slow_channel_count"` FailingChannelCount int `json:"failing_channel_count"` QualityWindowSampleCount int `json:"quality_window_sample_count"` QualityWindowFailureCount int `json:"quality_window_failure_count"` QualityWindowSlowCount int `json:"quality_window_slow_count"` QualityWindowDropCount int `json:"quality_window_drop_count"` QueueDepths map[string]int `json:"queue_depths"` TrafficClassCounts map[string]int `json:"traffic_class_counts,omitempty"` ChannelStats map[string]FabricFlowStat `json:"channel_stats"` } type FabricFlowStat struct { TrafficClass string `json:"traffic_class,omitempty"` Depth int `json:"depth"` Enqueued uint64 `json:"enqueued"` Dequeued uint64 `json:"dequeued"` Dropped uint64 `json:"dropped"` HighWatermark int `json:"high_watermark"` Served uint64 `json:"served"` InFlight int `json:"in_flight"` MaxInFlight int `json:"max_in_flight"` SendAttempts uint64 `json:"send_attempts"` SendSuccesses uint64 `json:"send_successes"` SendFailures uint64 `json:"send_failures"` LastServedAt string `json:"last_served_at,omitempty"` LastRouteID string `json:"last_route_id,omitempty"` LastNextHop string `json:"last_next_hop,omitempty"` RoutePolicyVersion string `json:"route_policy_version,omitempty"` RouteGeneration string `json:"route_generation,omitempty"` RecoveryPolicyFingerprint string `json:"recovery_policy_fingerprint,omitempty"` LastFailedRouteID string `json:"last_failed_route_id,omitempty"` LastFailedRoutePolicyVersion string `json:"last_failed_route_policy_version,omitempty"` LastFailedRouteGeneration string `json:"last_failed_route_generation,omitempty"` LastRouteFailureAt string `json:"last_route_failure_at,omitempty"` LastRecoveredFromRouteID string `json:"last_recovered_from_route_id,omitempty"` LastRecoveredNextHop string `json:"last_recovered_next_hop,omitempty"` LastRouteSwitchAt string `json:"last_route_switch_at,omitempty"` LastRouteRecoveryMillis int64 `json:"last_route_recovery_ms,omitempty"` RouteSwitchCount uint64 `json:"route_switch_count,omitempty"` LastError string `json:"last_error,omitempty"` ConsecutiveFailures uint64 `json:"consecutive_failures"` StallCount uint64 `json:"stall_count"` LastSendDurationMillis int64 `json:"last_send_duration_ms,omitempty"` RouteRebuildRecommended bool `json:"route_rebuild_recommended"` DegradedFallbackRecommended bool `json:"degraded_fallback_recommended"` QualityPreferenceRouteID string `json:"quality_preference_route_id,omitempty"` QualityPreferenceScore int `json:"quality_preference_score,omitempty"` QualityPreferenceRawScore int `json:"quality_preference_raw_score,omitempty"` QualityPreferenceReasons []string `json:"quality_preference_reasons,omitempty"` LatencyLe10Millis uint64 `json:"latency_le_10ms"` LatencyLe100Millis uint64 `json:"latency_le_100ms"` LatencyLe1000Millis uint64 `json:"latency_le_1000ms"` LatencyGt1000Millis uint64 `json:"latency_gt_1000ms"` QualityWindowSampleCount int `json:"quality_window_sample_count"` QualityWindowSuccessCount int `json:"quality_window_success_count"` QualityWindowFailureCount int `json:"quality_window_failure_count"` QualityWindowSlowCount int `json:"quality_window_slow_count"` QualityWindowDropCount int `json:"quality_window_drop_count"` QualityWindowAvgLatencyMs int64 `json:"quality_window_avg_latency_ms,omitempty"` QualityWindowLastUpdatedAt string `json:"quality_window_last_updated_at,omitempty"` } func NewFabricFlowScheduler(shardCount int, queueCapacity int) *FabricFlowScheduler { if shardCount <= 0 { shardCount = defaultFabricFlowShardCount } if queueCapacity <= 0 { queueCapacity = defaultFabricFlowQueueCapacity } return &FabricFlowScheduler{ shardCount: shardCount, queueCapacity: queueCapacity, adaptivePolicy: defaultFabricServiceChannelAdaptivePolicy(), queues: map[string]*fabricFlowQueue{}, } } func defaultFabricServiceChannelAdaptivePolicy() FabricServiceChannelAdaptivePolicy { return normalizeFabricServiceChannelAdaptivePolicy(FabricServiceChannelAdaptivePolicy{ SchemaVersion: "rap.fabric_service_channel_adaptive_policy.v1", MaxParallelWindow: defaultFabricFlowParallelSendWindow, BulkPressureChannelThreshold: 16, QueuePressureHighWatermark: 16, QueuePressureMaxInFlight: defaultFabricFlowParallelSendWindow * 4, ClassWindows: map[string]int{ FabricTrafficClassControl: defaultFabricFlowParallelSendWindow, FabricTrafficClassInteractive: defaultFabricFlowParallelSendWindow, FabricTrafficClassReliable: 6, FabricTrafficClassBulk: 4, FabricTrafficClassDroppable: 1, }, }) } func normalizeFabricServiceChannelAdaptivePolicy(policy FabricServiceChannelAdaptivePolicy) FabricServiceChannelAdaptivePolicy { if policy.SchemaVersion == "" { policy.SchemaVersion = "rap.fabric_service_channel_adaptive_policy.v1" } if policy.MaxParallelWindow <= 0 { policy.MaxParallelWindow = defaultFabricFlowParallelSendWindow } if policy.BulkPressureChannelThreshold <= 0 { policy.BulkPressureChannelThreshold = 16 } if policy.QueuePressureHighWatermark <= 0 { policy.QueuePressureHighWatermark = 16 } if policy.QueuePressureMaxInFlight <= 0 { policy.QueuePressureMaxInFlight = defaultFabricFlowParallelSendWindow * 4 } if policy.ClassWindows == nil { policy.ClassWindows = map[string]int{} } defaults := map[string]int{ FabricTrafficClassControl: policy.MaxParallelWindow, FabricTrafficClassInteractive: policy.MaxParallelWindow, FabricTrafficClassReliable: minPositive(policy.MaxParallelWindow, 6), FabricTrafficClassBulk: minPositive(policy.MaxParallelWindow, 4), FabricTrafficClassDroppable: 1, } next := map[string]int{} for className, fallback := range defaults { value := policy.ClassWindows[className] if value <= 0 { value = fallback } next[className] = clampFabricWindow(value, 1, policy.MaxParallelWindow) } policy.ClassWindows = next return policy } func (s *FabricFlowScheduler) ConfigureAdaptivePolicy(policy FabricServiceChannelAdaptivePolicy) { if s == nil { return } s.mu.Lock() defer s.mu.Unlock() s.adaptivePolicy = normalizeFabricServiceChannelAdaptivePolicy(policy) } func (s *FabricFlowScheduler) ScheduleClientPackets(packets [][]byte) []FabricScheduledPacketBatch { scheduled, _ := s.scheduleClientPackets("", "", packets) return scheduled } func (s *FabricFlowScheduler) ScheduleClientPacketsForConnection(vpnConnectionID string, packets [][]byte) []FabricScheduledPacketBatch { scheduled, _ := s.scheduleClientPackets(vpnConnectionID, "", packets) return scheduled } func (s *FabricFlowScheduler) ScheduleClientPacketsForConnectionClass(vpnConnectionID string, trafficClass string, packets [][]byte) []FabricScheduledPacketBatch { scheduled, _ := s.scheduleClientPackets(vpnConnectionID, trafficClass, packets) return scheduled } func (s *FabricFlowScheduler) scheduleClientPackets(vpnConnectionID string, trafficClass string, packets [][]byte) ([]FabricScheduledPacketBatch, uint64) { packets = cleanPacketBatch(packets) if len(packets) == 0 { return nil, 0 } if s == nil { s = NewFabricFlowScheduler(0, 0) } trafficClass = normalizeFabricTrafficClass(trafficClass) grouped := map[string]*FabricScheduledPacketBatch{} var droppedCount uint64 for _, packet := range packets { flowID, shard := classifyPacketFlow(packet, s.shardCountValue()) channelID := fabricFlowChannelIDForClass(vpnConnectionID, trafficClass, shard) queueDepth, dropped := s.enqueue(channelID, trafficClass) if dropped { droppedCount++ continue } batch := grouped[channelID] if batch == nil { batch = &FabricScheduledPacketBatch{ ChannelID: channelID, FlowID: flowID, Shard: shard, TrafficClass: trafficClass, Classifier: "ip_5tuple_or_packet_hash", ServiceMode: "application_protocol_agnostic", } grouped[channelID] = batch } batch.Packets = append(batch.Packets, append([]byte(nil), packet...)) batch.QueueDepth = queueDepth } out := make([]FabricScheduledPacketBatch, 0, len(grouped)) for _, batch := range grouped { out = append(out, *batch) } s.sortScheduledBatches(out) return out, droppedCount } func fabricFlowChannelID(vpnConnectionID string, shard int) string { return fabricFlowChannelIDForClass(vpnConnectionID, "", shard) } func fabricFlowChannelIDForClass(vpnConnectionID string, trafficClass string, shard int) string { base := fmt.Sprintf("flow-%02d", shard) vpnConnectionID = strings.TrimSpace(vpnConnectionID) if vpnConnectionID == "" { return base } trafficClass = normalizeFabricTrafficClass(trafficClass) if trafficClass != "" && trafficClass != FabricTrafficClassBulk { return "vpn:" + vpnConnectionID + ":" + trafficClass + ":" + base } return "vpn:" + vpnConnectionID + ":" + base } func (s *FabricFlowScheduler) Complete(batch FabricScheduledPacketBatch) { if s == nil || len(batch.Packets) == 0 { return } s.dequeue(batch.ChannelID, len(batch.Packets)) } func (s *FabricFlowScheduler) BeginSend(channelID string) { if s == nil || channelID == "" { return } s.mu.Lock() defer s.mu.Unlock() queue := s.ensureQueueLocked(channelID) queue.InFlight++ queue.SendAttempts++ if queue.InFlight > queue.MaxInFlight { queue.MaxInFlight = queue.InFlight } s.inFlight++ if s.inFlight > s.maxInFlight { s.maxInFlight = s.inFlight } } func (s *FabricFlowScheduler) EndSend(channelID string) { if s == nil || channelID == "" { return } s.mu.Lock() defer s.mu.Unlock() queue := s.queues[channelID] if queue != nil && queue.InFlight > 0 { queue.InFlight-- } if s.inFlight > 0 { s.inFlight-- } } func (s *FabricFlowScheduler) RecommendedParallelSendWindow(maxWindow int) int { return s.RecommendedParallelSendWindowForTrafficClass("", maxWindow) } func (s *FabricFlowScheduler) RecommendedParallelSendWindowForTrafficClass(trafficClass string, maxWindow int) int { if maxWindow <= 1 { return 1 } if s == nil { return maxWindow } trafficClass = normalizeFabricTrafficClass(trafficClass) s.mu.Lock() defer s.mu.Unlock() if maxWindow > s.adaptivePolicy.MaxParallelWindow && s.adaptivePolicy.MaxParallelWindow > 0 { maxWindow = s.adaptivePolicy.MaxParallelWindow } global := s.parallelPressureLocked("") classPressure := s.parallelPressureLocked(trafficClass) if fabricTrafficClassPriority(trafficClass) <= fabricTrafficClassPriority(FabricTrafficClassInteractive) { if classPressure.hasDrops { return boundedParallelWindow(maxWindow - 1) } if classPressure.failing > 0 || classPressure.slow > 0 { return boundedParallelWindow(maxWindow - 1) } return maxWindow } if trafficClass == FabricTrafficClassReliable { if classPressure.hasDrops || classPressure.failing > 0 { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) } if global.hasDrops || global.failing+global.slow > 0 || global.highPressure { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1)) } return maxWindow } if classPressure.hasDrops { return classWindowLimit(s.adaptivePolicy, trafficClass, 1) } if global.hasDrops { return classWindowLimit(s.adaptivePolicy, trafficClass, 1) } if global.highPressure && global.interactiveOrControlQueues > 0 { if trafficClass == FabricTrafficClassBulk || trafficClass == FabricTrafficClassDroppable { return classWindowLimit(s.adaptivePolicy, trafficClass, 1) } return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) } if global.highPressure { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) } if classPressure.failing >= maxWindow || classPressure.slow >= maxWindow { return classWindowLimit(s.adaptivePolicy, trafficClass, 1) } if classPressure.failing+classPressure.slow > 0 { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1)) } return maxWindow } type fabricParallelPressure struct { hasDrops bool failing int slow int highPressure bool interactiveOrControlQueues int bulkQueues int } func (s *FabricFlowScheduler) parallelPressureLocked(trafficClass string) fabricParallelPressure { out := fabricParallelPressure{} if s == nil { return out } trafficClass = strings.TrimSpace(trafficClass) failing := 0 slow := 0 for _, queue := range s.queues { if queue == nil { continue } queueClass := normalizeFabricTrafficClass(queue.TrafficClass) if queueClass == FabricTrafficClassControl || queueClass == FabricTrafficClassInteractive { out.interactiveOrControlQueues++ } if queueClass == FabricTrafficClassBulk { out.bulkQueues++ } if trafficClass != "" && queueClass != trafficClass { continue } stats := queue.qualityWindowStats() if stats.DropCount > 0 { out.hasDrops = true } if stats.FailureCount > stats.SuccessCount || (stats.FailureCount > 0 && queue.DegradedFallbackRecommended) { failing++ } if stats.SlowCount > 0 { slow++ } policy := s.adaptivePolicy if policy.QueuePressureHighWatermark <= 0 { policy = defaultFabricServiceChannelAdaptivePolicy() } if queue.HighWatermark >= policy.QueuePressureHighWatermark || queue.MaxInFlight >= policy.QueuePressureMaxInFlight { out.highPressure = true } } policy := s.adaptivePolicy if policy.QueuePressureHighWatermark <= 0 { policy = defaultFabricServiceChannelAdaptivePolicy() } if s.highWatermark >= policy.QueuePressureHighWatermark || s.maxInFlight >= policy.QueuePressureMaxInFlight { out.highPressure = true } if out.bulkQueues >= policy.BulkPressureChannelThreshold && out.interactiveOrControlQueues > 0 { out.highPressure = true } out.failing = failing out.slow = slow return out } func boundedParallelWindow(value int) int { if value < 1 { return 1 } return value } func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot { snapshot := FabricFlowSchedulerSnapshot{ SchemaVersion: "rap.fabric_flow_scheduler.v1", Enabled: s != nil, ServiceNeutral: true, Classifier: "ip_5tuple_or_packet_hash", ServiceMode: "application_protocol_agnostic", QueueDepths: map[string]int{}, TrafficClassCounts: map[string]int{}, RecommendedParallelWindows: map[string]int{}, ChannelStats: map[string]FabricFlowStat{}, } if s == nil { snapshot.ShardCount = defaultFabricFlowShardCount snapshot.QueueCapacity = defaultFabricFlowQueueCapacity return snapshot } s.mu.Lock() defer s.mu.Unlock() snapshot.ShardCount = s.shardCount snapshot.QueueCapacity = s.queueCapacity snapshot.AdaptivePolicyFingerprint = s.adaptivePolicy.Fingerprint snapshot.ChannelCount = len(s.queues) snapshot.Enqueued = s.enqueued snapshot.Dequeued = s.dequeued snapshot.Dropped = s.dropped snapshot.HighWatermark = s.highWatermark snapshot.InFlight = s.inFlight snapshot.MaxInFlight = s.maxInFlight for channelID, queue := range s.queues { qualityStats := queue.qualityWindowStats() snapshot.QueueDepths[channelID] = queue.Depth trafficClass := normalizeFabricTrafficClass(queue.TrafficClass) snapshot.TrafficClassCounts[trafficClass]++ stat := FabricFlowStat{ Depth: queue.Depth, TrafficClass: trafficClass, Enqueued: queue.Enqueued, Dequeued: queue.Dequeued, Dropped: queue.Dropped, HighWatermark: queue.HighWatermark, Served: queue.Served, InFlight: queue.InFlight, MaxInFlight: queue.MaxInFlight, SendAttempts: queue.SendAttempts, SendSuccesses: queue.SendSuccesses, SendFailures: queue.SendFailures, LastRouteID: queue.LastRouteID, RoutePolicyVersion: queue.RoutePolicyVersion, RouteGeneration: queue.RouteGeneration, RecoveryPolicyFingerprint: queue.RecoveryPolicyFingerprint, LastNextHop: queue.LastNextHop, LastFailedRouteID: queue.LastFailedRouteID, LastFailedRoutePolicyVersion: queue.LastFailedRoutePolicyVersion, LastFailedRouteGeneration: queue.LastFailedRouteGeneration, LastRecoveredFromRouteID: queue.LastRecoveredFromRouteID, LastRecoveredNextHop: queue.LastRecoveredNextHop, LastRouteRecoveryMillis: queue.LastRouteRecoveryMillis, RouteSwitchCount: queue.RouteSwitchCount, LastError: queue.LastError, ConsecutiveFailures: queue.ConsecutiveFailures, StallCount: queue.StallCount, LastSendDurationMillis: queue.LastSendDurationMillis, RouteRebuildRecommended: queue.RouteRebuildRecommended, DegradedFallbackRecommended: queue.DegradedFallbackRecommended, QualityPreferenceRouteID: queue.QualityPreferenceRouteID, QualityPreferenceScore: queue.QualityPreferenceScore, QualityPreferenceRawScore: queue.QualityPreferenceRawScore, QualityPreferenceReasons: append([]string{}, queue.QualityPreferenceReasons...), LatencyLe10Millis: queue.LatencyLe10Millis, LatencyLe100Millis: queue.LatencyLe100Millis, LatencyLe1000Millis: queue.LatencyLe1000Millis, LatencyGt1000Millis: queue.LatencyGt1000Millis, QualityWindowSampleCount: qualityStats.SampleCount, QualityWindowSuccessCount: qualityStats.SuccessCount, QualityWindowFailureCount: qualityStats.FailureCount, QualityWindowSlowCount: qualityStats.SlowCount, QualityWindowDropCount: qualityStats.DropCount, QualityWindowAvgLatencyMs: qualityStats.AvgLatencyMs, } if !queue.LastServedAt.IsZero() { stat.LastServedAt = queue.LastServedAt.UTC().Format(time.RFC3339Nano) } if !qualityStats.LastUpdatedAt.IsZero() { stat.QualityWindowLastUpdatedAt = qualityStats.LastUpdatedAt.UTC().Format(time.RFC3339Nano) } if !queue.LastRouteFailureAt.IsZero() { stat.LastRouteFailureAt = queue.LastRouteFailureAt.UTC().Format(time.RFC3339Nano) } if !queue.LastRouteSwitchAt.IsZero() { stat.LastRouteSwitchAt = queue.LastRouteSwitchAt.UTC().Format(time.RFC3339Nano) } snapshot.ChannelStats[channelID] = FabricFlowStat{ Depth: stat.Depth, TrafficClass: stat.TrafficClass, Enqueued: stat.Enqueued, Dequeued: stat.Dequeued, Dropped: stat.Dropped, HighWatermark: stat.HighWatermark, Served: stat.Served, InFlight: stat.InFlight, MaxInFlight: stat.MaxInFlight, SendAttempts: stat.SendAttempts, SendSuccesses: stat.SendSuccesses, SendFailures: stat.SendFailures, LastServedAt: stat.LastServedAt, LastRouteID: stat.LastRouteID, LastNextHop: stat.LastNextHop, RoutePolicyVersion: stat.RoutePolicyVersion, RouteGeneration: stat.RouteGeneration, RecoveryPolicyFingerprint: stat.RecoveryPolicyFingerprint, LastFailedRouteID: stat.LastFailedRouteID, LastFailedRoutePolicyVersion: stat.LastFailedRoutePolicyVersion, LastFailedRouteGeneration: stat.LastFailedRouteGeneration, LastRouteFailureAt: stat.LastRouteFailureAt, LastRecoveredFromRouteID: stat.LastRecoveredFromRouteID, LastRecoveredNextHop: stat.LastRecoveredNextHop, LastRouteSwitchAt: stat.LastRouteSwitchAt, LastRouteRecoveryMillis: stat.LastRouteRecoveryMillis, RouteSwitchCount: stat.RouteSwitchCount, LastError: stat.LastError, ConsecutiveFailures: stat.ConsecutiveFailures, StallCount: stat.StallCount, LastSendDurationMillis: stat.LastSendDurationMillis, RouteRebuildRecommended: stat.RouteRebuildRecommended, DegradedFallbackRecommended: stat.DegradedFallbackRecommended, QualityPreferenceRouteID: stat.QualityPreferenceRouteID, QualityPreferenceScore: stat.QualityPreferenceScore, QualityPreferenceRawScore: stat.QualityPreferenceRawScore, QualityPreferenceReasons: append([]string{}, stat.QualityPreferenceReasons...), LatencyLe10Millis: stat.LatencyLe10Millis, LatencyLe100Millis: stat.LatencyLe100Millis, LatencyLe1000Millis: stat.LatencyLe1000Millis, LatencyGt1000Millis: stat.LatencyGt1000Millis, QualityWindowSampleCount: stat.QualityWindowSampleCount, QualityWindowSuccessCount: stat.QualityWindowSuccessCount, QualityWindowFailureCount: stat.QualityWindowFailureCount, QualityWindowSlowCount: stat.QualityWindowSlowCount, QualityWindowDropCount: stat.QualityWindowDropCount, QualityWindowAvgLatencyMs: stat.QualityWindowAvgLatencyMs, QualityWindowLastUpdatedAt: stat.QualityWindowLastUpdatedAt, } snapshot.QualityWindowSampleCount += qualityStats.SampleCount snapshot.QualityWindowFailureCount += qualityStats.FailureCount snapshot.QualityWindowSlowCount += qualityStats.SlowCount snapshot.QualityWindowDropCount += qualityStats.DropCount snapshot.RouteSwitchCount += queue.RouteSwitchCount if queue.LastRecoveredFromRouteID != "" { snapshot.RouteRecoveredChannelCount++ } if queue.Depth >= s.queueCapacity || qualityStats.DropCount > 0 { snapshot.BackpressureActive = true } if (queue.RouteRebuildRecommended || queue.DegradedFallbackRecommended) && qualityStats.FailureCount > 0 { snapshot.BackpressureActive = true } if qualityStats.SlowCount > 0 { snapshot.SlowChannelCount++ } if qualityStats.FailureCount > qualityStats.SuccessCount || (qualityStats.FailureCount > 0 && queue.DegradedFallbackRecommended) { snapshot.FailingChannelCount++ } } if snapshot.QualityWindowDropCount > 0 { snapshot.BackpressureActive = true } snapshot.BulkPressureChannelCount = snapshot.TrafficClassCounts[FabricTrafficClassBulk] snapshot.InteractiveOrControlCount = snapshot.TrafficClassCounts[FabricTrafficClassControl] + snapshot.TrafficClassCounts[FabricTrafficClassInteractive] bulkPressureThreshold := s.adaptivePolicy.BulkPressureChannelThreshold if bulkPressureThreshold <= 0 { bulkPressureThreshold = defaultFabricServiceChannelAdaptivePolicy().BulkPressureChannelThreshold } if snapshot.BulkPressureChannelCount >= bulkPressureThreshold && snapshot.InteractiveOrControlCount > 0 { snapshot.BulkPressureActive = true snapshot.BackpressureActive = true } for _, trafficClass := range []string{FabricTrafficClassControl, FabricTrafficClassInteractive, FabricTrafficClassReliable, FabricTrafficClassBulk, FabricTrafficClassDroppable} { snapshot.RecommendedParallelWindows[trafficClass] = s.recommendedParallelSendWindowForTrafficClassLocked(trafficClass, s.adaptivePolicy.MaxParallelWindow) } if len(snapshot.RecommendedParallelWindows) > 0 { bulkWindow := snapshot.RecommendedParallelWindows[FabricTrafficClassBulk] interactiveWindow := snapshot.RecommendedParallelWindows[FabricTrafficClassInteractive] if bulkWindow > 0 && interactiveWindow > 0 && bulkWindow < interactiveWindow { snapshot.AdaptiveBackpressureActive = true snapshot.AdaptiveBackpressureReason = "bulk_window_reduced_to_protect_interactive" } } return snapshot } func (s *FabricFlowScheduler) recommendedParallelSendWindowForTrafficClassLocked(trafficClass string, maxWindow int) int { if maxWindow <= 1 { return 1 } trafficClass = normalizeFabricTrafficClass(trafficClass) if s == nil { return maxWindow } if maxWindow > s.adaptivePolicy.MaxParallelWindow && s.adaptivePolicy.MaxParallelWindow > 0 { maxWindow = s.adaptivePolicy.MaxParallelWindow } // The public method cannot be called here because Snapshot already holds the // scheduler mutex. Keep this wrapper intentionally small by mirroring the // public policy on already-locked state. globalPressure := s.parallelPressureLocked("") classPressure := s.parallelPressureLocked(trafficClass) if fabricTrafficClassPriority(trafficClass) <= fabricTrafficClassPriority(FabricTrafficClassInteractive) { if classPressure.hasDrops || classPressure.failing > 0 || classPressure.slow > 0 { return boundedParallelWindow(maxWindow - 1) } return maxWindow } if trafficClass == FabricTrafficClassReliable { if classPressure.hasDrops || classPressure.failing > 0 { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) } if globalPressure.hasDrops || globalPressure.failing+globalPressure.slow > 0 || globalPressure.highPressure { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1)) } return maxWindow } if classPressure.hasDrops || globalPressure.hasDrops { return classWindowLimit(s.adaptivePolicy, trafficClass, 1) } if globalPressure.highPressure && globalPressure.interactiveOrControlQueues > 0 { if trafficClass == FabricTrafficClassBulk || trafficClass == FabricTrafficClassDroppable { return classWindowLimit(s.adaptivePolicy, trafficClass, 1) } return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) } if globalPressure.highPressure { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) } if classPressure.failing >= maxWindow || classPressure.slow >= maxWindow { return classWindowLimit(s.adaptivePolicy, trafficClass, 1) } if classPressure.failing+classPressure.slow > 0 { return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1)) } return maxWindow } func classWindowLimit(policy FabricServiceChannelAdaptivePolicy, trafficClass string, maxWindow int) int { if policy.ClassWindows != nil { if value := policy.ClassWindows[normalizeFabricTrafficClass(trafficClass)]; value > 0 && value < maxWindow { return value } } return maxWindow } func clampFabricWindow(value, minValue, maxValue int) int { if value < minValue { return minValue } if value > maxValue { return maxValue } return value } func minPositive(a, b int) int { if a <= 0 { return b } if b <= 0 || a < b { return a } return b } func (s *FabricFlowScheduler) Dropped() uint64 { if s == nil { return 0 } s.mu.Lock() defer s.mu.Unlock() return s.dropped } func (s *FabricFlowScheduler) shardCountValue() int { if s == nil || s.shardCount <= 0 { return defaultFabricFlowShardCount } return s.shardCount } func (s *FabricFlowScheduler) enqueue(channelID string, trafficClass string) (int, bool) { if s == nil { return 0, false } s.mu.Lock() defer s.mu.Unlock() queue := s.ensureQueueLocked(channelID) if queue.TrafficClass == "" { queue.TrafficClass = normalizeFabricTrafficClass(trafficClass) } if queue.Depth >= s.queueCapacity { queue.Dropped++ s.dropped++ queue.recordQualitySample(fabricFlowQualitySample{ At: time.Now().UTC(), Dropped: true, }) return queue.Depth, true } queue.Depth++ queue.Enqueued++ s.enqueued++ if queue.Depth > queue.HighWatermark { queue.HighWatermark = queue.Depth } if queue.Depth > s.highWatermark { s.highWatermark = queue.Depth } return queue.Depth, false } func (s *FabricFlowScheduler) dequeue(channelID string, count int) { if s == nil || count <= 0 { return } s.mu.Lock() defer s.mu.Unlock() queue := s.queues[channelID] if queue == nil { return } if count > queue.Depth { count = queue.Depth } queue.Depth -= count queue.Dequeued += uint64(count) queue.Served++ queue.LastServedAt = time.Now().UTC() s.dequeued += uint64(count) } func (s *FabricFlowScheduler) sortScheduledBatches(batches []FabricScheduledPacketBatch) { if len(batches) < 2 { return } s.mu.Lock() defer s.mu.Unlock() sort.Slice(batches, func(a, b int) bool { leftPriority := fabricTrafficClassPriority(batches[a].TrafficClass) rightPriority := fabricTrafficClassPriority(batches[b].TrafficClass) if leftPriority != rightPriority { return leftPriority < rightPriority } left := s.queues[batches[a].ChannelID] right := s.queues[batches[b].ChannelID] leftStalled := left != nil && (left.RouteRebuildRecommended || left.DegradedFallbackRecommended) rightStalled := right != nil && (right.RouteRebuildRecommended || right.DegradedFallbackRecommended) if leftStalled != rightStalled { return !leftStalled } leftServed := uint64(0) rightServed := uint64(0) if left != nil { leftServed = left.Served } if right != nil { rightServed = right.Served } if leftServed != rightServed { return leftServed < rightServed } leftServedAt := time.Time{} rightServedAt := time.Time{} if left != nil { leftServedAt = left.LastServedAt } if right != nil { rightServedAt = right.LastServedAt } if !leftServedAt.Equal(rightServedAt) { if leftServedAt.IsZero() { return true } if rightServedAt.IsZero() { return false } return leftServedAt.Before(rightServedAt) } return batches[a].ChannelID < batches[b].ChannelID }) } func normalizeFabricTrafficClass(value string) string { switch strings.TrimSpace(strings.ToLower(value)) { case FabricTrafficClassControl: return FabricTrafficClassControl case FabricTrafficClassInteractive: return FabricTrafficClassInteractive case FabricTrafficClassReliable: return FabricTrafficClassReliable case FabricTrafficClassDroppable: return FabricTrafficClassDroppable case FabricTrafficClassBulk: return FabricTrafficClassBulk default: return FabricTrafficClassBulk } } func fabricTrafficClassPriority(value string) int { switch normalizeFabricTrafficClass(value) { case FabricTrafficClassControl: return 0 case FabricTrafficClassInteractive: return 1 case FabricTrafficClassReliable: return 2 case FabricTrafficClassBulk: return 3 case FabricTrafficClassDroppable: return 4 default: return 3 } } func (s *FabricFlowScheduler) RoutePreference(channelID string) (preferredRouteID string, avoidRouteID string) { if s == nil || channelID == "" { return "", "" } s.mu.Lock() defer s.mu.Unlock() queue := s.queues[channelID] if queue == nil { return "", "" } if queue.ConsecutiveFailures == 0 { preferredRouteID = queue.LastRouteID } return preferredRouteID, queue.LastFailedRouteID } func (s *FabricFlowScheduler) RecordRouteSuccess(channelID string, routeID string, nextHop string, duration time.Duration, preferences ...FabricServiceChannelRouteQualityPreference) { s.RecordRouteSuccessWithProvenance(channelID, routeID, nextHop, duration, fabricFlowRouteProvenance{}, preferences...) } func (s *FabricFlowScheduler) RecordRouteSuccessWithProvenance(channelID string, routeID string, nextHop string, duration time.Duration, provenance fabricFlowRouteProvenance, preferences ...FabricServiceChannelRouteQualityPreference) { if s == nil || channelID == "" { return } s.mu.Lock() defer s.mu.Unlock() queue := s.ensureQueueLocked(channelID) failedRouteID := strings.TrimSpace(queue.LastFailedRouteID) failedNextHop := strings.TrimSpace(queue.LastNextHop) if failedRouteID != "" && strings.TrimSpace(routeID) != "" && failedRouteID != strings.TrimSpace(routeID) { switchedAt := time.Now().UTC() queue.LastRecoveredFromRouteID = failedRouteID queue.LastRecoveredNextHop = failedNextHop queue.LastRouteSwitchAt = switchedAt queue.LastRouteRecoveryMillis = 0 if !queue.LastRouteFailureAt.IsZero() { queue.LastRouteRecoveryMillis = switchedAt.Sub(queue.LastRouteFailureAt).Milliseconds() if queue.LastRouteRecoveryMillis < 0 { queue.LastRouteRecoveryMillis = 0 } } queue.RouteSwitchCount++ } queue.LastRouteID = routeID queue.RoutePolicyVersion = strings.TrimSpace(provenance.PolicyVersion) queue.RouteGeneration = strings.TrimSpace(provenance.Generation) queue.RecoveryPolicyFingerprint = strings.TrimSpace(provenance.RecoveryPolicyFingerprint) queue.LastNextHop = nextHop queue.LastFailedRouteID = "" queue.LastFailedRoutePolicyVersion = "" queue.LastFailedRouteGeneration = "" queue.LastError = "" queue.ConsecutiveFailures = 0 queue.LastSendDurationMillis = fabricSendDurationMillis(duration) queue.SendSuccesses++ queue.recordLatency(duration) queue.recordQualitySample(fabricFlowQualitySample{ At: time.Now().UTC(), Success: true, Slow: duration > defaultFabricFlowSlowSendThreshold, DurationMillis: queue.LastSendDurationMillis, }) queue.recordQualityPreference(preferences...) if duration > defaultFabricFlowSlowSendThreshold { queue.StallCount++ queue.RouteRebuildRecommended = true } else { queue.RouteRebuildRecommended = false queue.DegradedFallbackRecommended = false } } func (s *FabricFlowScheduler) RecordRouteFailure(channelID string, routeID string, nextHop string, err error, duration time.Duration) { s.RecordRouteFailureWithProvenance(channelID, routeID, nextHop, err, duration, fabricFlowRouteProvenance{}) } func (s *FabricFlowScheduler) RecordRouteFailureWithProvenance(channelID string, routeID string, nextHop string, err error, duration time.Duration, provenance fabricFlowRouteProvenance) { if s == nil || channelID == "" { return } s.mu.Lock() defer s.mu.Unlock() queue := s.ensureQueueLocked(channelID) queue.LastFailedRouteID = routeID queue.LastFailedRoutePolicyVersion = strings.TrimSpace(provenance.PolicyVersion) queue.LastFailedRouteGeneration = strings.TrimSpace(provenance.Generation) queue.LastRouteFailureAt = time.Now().UTC() if fp := strings.TrimSpace(provenance.RecoveryPolicyFingerprint); fp != "" { queue.RecoveryPolicyFingerprint = fp } queue.LastNextHop = nextHop queue.ConsecutiveFailures++ queue.StallCount++ queue.LastSendDurationMillis = fabricSendDurationMillis(duration) queue.SendFailures++ queue.recordLatency(duration) queue.recordQualitySample(fabricFlowQualitySample{ At: time.Now().UTC(), Failure: true, Slow: duration > defaultFabricFlowSlowSendThreshold, DurationMillis: queue.LastSendDurationMillis, }) if err != nil { queue.LastError = err.Error() } queue.RouteRebuildRecommended = true if queue.ConsecutiveFailures >= defaultFabricFlowFailureThreshold { queue.DegradedFallbackRecommended = true } queue.clearQualityPreference() } func (s *FabricFlowScheduler) RecordLocalFallback(channelID string) { if s == nil || channelID == "" { return } s.mu.Lock() defer s.mu.Unlock() queue := s.ensureQueueLocked(channelID) queue.LastRouteID = "local_gateway" queue.RoutePolicyVersion = "" queue.RouteGeneration = "" queue.RecoveryPolicyFingerprint = "" queue.LastNextHop = "local_gateway" queue.LastFailedRouteID = "" queue.LastFailedRoutePolicyVersion = "" queue.LastFailedRouteGeneration = "" queue.LastError = "" queue.ConsecutiveFailures = 0 queue.RouteRebuildRecommended = false queue.DegradedFallbackRecommended = false queue.clearQualityPreference() } func (s *FabricFlowScheduler) ClearQualityPreferencesNotIn(validRouteIDs map[string]struct{}) { if s == nil { return } s.mu.Lock() defer s.mu.Unlock() for _, queue := range s.queues { if queue == nil || queue.QualityPreferenceRouteID == "" { continue } if _, ok := validRouteIDs[queue.QualityPreferenceRouteID]; !ok { queue.clearQualityPreference() } } } func (s *FabricFlowScheduler) ClearQualityPreferencesForRoutes(routeIDs map[string]struct{}) { if s == nil || len(routeIDs) == 0 { return } s.mu.Lock() defer s.mu.Unlock() for _, queue := range s.queues { if queue == nil || queue.QualityPreferenceRouteID == "" { continue } if _, ok := routeIDs[queue.QualityPreferenceRouteID]; ok { queue.clearQualityPreference() } } } func (q *fabricFlowQueue) recordLatency(duration time.Duration) { if q == nil { return } millis := fabricSendDurationMillis(duration) switch { case millis <= 10: q.LatencyLe10Millis++ case millis <= 100: q.LatencyLe100Millis++ case millis <= 1000: q.LatencyLe1000Millis++ default: q.LatencyGt1000Millis++ } } func (q *fabricFlowQueue) recordQualitySample(sample fabricFlowQualitySample) { if q == nil { return } if sample.At.IsZero() { sample.At = time.Now().UTC() } q.QualityWindow = append(q.QualityWindow, sample) if len(q.QualityWindow) > defaultFabricFlowQualityWindowCapacity { keepFrom := len(q.QualityWindow) - defaultFabricFlowQualityWindowCapacity copy(q.QualityWindow, q.QualityWindow[keepFrom:]) q.QualityWindow = q.QualityWindow[:defaultFabricFlowQualityWindowCapacity] } } func (q *fabricFlowQueue) qualityWindowStats() fabricFlowQualityWindowStats { stats := fabricFlowQualityWindowStats{} if q == nil || len(q.QualityWindow) == 0 { return stats } var latencyTotal int64 var latencySamples int64 for _, sample := range q.QualityWindow { stats.SampleCount++ if sample.Success { stats.SuccessCount++ } if sample.Failure { stats.FailureCount++ } if sample.Slow { stats.SlowCount++ } if sample.Dropped { stats.DropCount++ } if sample.DurationMillis > 0 { latencyTotal += sample.DurationMillis latencySamples++ } if sample.At.After(stats.LastUpdatedAt) { stats.LastUpdatedAt = sample.At } } if latencySamples > 0 { stats.AvgLatencyMs = latencyTotal / latencySamples } return stats } func (q *fabricFlowQueue) recordQualityPreference(preferences ...FabricServiceChannelRouteQualityPreference) { if q == nil { return } if len(preferences) == 0 || strings.TrimSpace(preferences[0].RouteID) == "" || preferences[0].ScoreAdjustment <= 0 { q.clearQualityPreference() return } preference := preferences[0] q.QualityPreferenceRouteID = preference.RouteID q.QualityPreferenceScore = preference.ScoreAdjustment q.QualityPreferenceRawScore = preference.RawScoreAdjustment if q.QualityPreferenceRawScore <= 0 { q.QualityPreferenceRawScore = preference.ScoreAdjustment } q.QualityPreferenceReasons = dedupeStrings(preference.Reasons) } func (q *fabricFlowQueue) clearQualityPreference() { if q == nil { return } q.QualityPreferenceRouteID = "" q.QualityPreferenceScore = 0 q.QualityPreferenceRawScore = 0 q.QualityPreferenceReasons = nil } func fabricSendDurationMillis(duration time.Duration) int64 { if duration <= 0 { return 0 } millis := duration.Milliseconds() if millis == 0 { return 1 } return millis } func (s *FabricFlowScheduler) ensureQueueLocked(channelID string) *fabricFlowQueue { if s.queues == nil { s.queues = map[string]*fabricFlowQueue{} } queue := s.queues[channelID] if queue == nil { queue = &fabricFlowQueue{} s.queues[channelID] = queue } return queue } type LocalPacketTransport struct { Inbox *FabricPacketInbox VPNConnectionID string } type AdaptivePacketTransport struct { Primary PacketTransport Fallback PacketTransport PrimaryTimeout time.Duration lastReceive atomic.Int32 } const ( adaptiveTransportPrimary int32 = iota adaptiveTransportFallback ) func (t *LocalPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error { packets = cleanPacketBatch(packets) if len(packets) == 0 { return nil } if t == nil || t.Inbox == nil { return mesh.ErrForwardRuntimeUnavailable } return t.Inbox.DeliverLocalPacketBatch(t.VPNConnectionID, FabricDirectionGatewayToClient, packets) } func (t *LocalPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { if t == nil || t.Inbox == nil { return nil, mesh.ErrForwardRuntimeUnavailable } return t.Inbox.Receive(ctx, t.VPNConnectionID, FabricDirectionClientToGateway, timeout) } func (t *AdaptivePacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { packets = cleanPacketBatch(packets) if len(packets) == 0 { return nil } if t == nil || (t.Primary == nil && t.Fallback == nil) { return mesh.ErrForwardRuntimeUnavailable } preferred, alternate := t.preferredSendOrder() if preferred != nil { if err := preferred.SendGatewayPacketBatch(ctx, packets); err == nil { return nil } else if alternate == nil { return err } } if alternate != nil { return alternate.SendGatewayPacketBatch(ctx, packets) } return mesh.ErrForwardRuntimeUnavailable } func (t *AdaptivePacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { if t == nil || (t.Primary == nil && t.Fallback == nil) { return nil, mesh.ErrForwardRuntimeUnavailable } if t.Primary == nil { t.lastReceive.Store(adaptiveTransportFallback) return t.Fallback.ReceiveGatewayPacketBatch(ctx, timeout) } if t.Fallback == nil { t.lastReceive.Store(adaptiveTransportPrimary) return t.Primary.ReceiveGatewayPacketBatch(ctx, timeout) } primaryTimeout := t.PrimaryTimeout if primaryTimeout <= 0 { primaryTimeout = 50 * time.Millisecond } if timeout > 0 && primaryTimeout > timeout { primaryTimeout = timeout } packets, err := t.Primary.ReceiveGatewayPacketBatch(ctx, primaryTimeout) if err == nil && len(packets) > 0 { t.lastReceive.Store(adaptiveTransportPrimary) return packets, nil } if err != nil && timeout <= 0 { return nil, err } fallbackTimeout := timeout if timeout > primaryTimeout { fallbackTimeout = timeout - primaryTimeout } packets, fallbackErr := t.Fallback.ReceiveGatewayPacketBatch(ctx, fallbackTimeout) if fallbackErr == nil && len(packets) > 0 { t.lastReceive.Store(adaptiveTransportFallback) } if fallbackErr != nil { return nil, fallbackErr } return packets, nil } func (t *AdaptivePacketTransport) preferredSendOrder() (PacketTransport, PacketTransport) { if t == nil { return nil, nil } if t.lastReceive.Load() == adaptiveTransportFallback { return t.Fallback, t.Primary } return t.Primary, t.Fallback } func (t *FabricPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { packets = cleanPacketBatch(packets) if len(packets) == 0 { return nil } if t == nil || t.ForwardTransport == nil { return mesh.ErrForwardRuntimeUnavailable } if t.ClusterID == "" || t.VPNConnectionID == "" || t.RouteID == "" || t.LocalNodeID == "" || t.RemoteNodeID == "" { return errors.New("fabric packet transport route identity is incomplete") } nextHop := t.NextHopNodeID if nextHop == "" { nextHop = t.RemoteNodeID } envelopeCurrentHop := nextHop envelopeNextHop := nextHopAfter(t.RoutePath, envelopeCurrentHop, t.RemoteNodeID) direction := t.SendDirection if direction == "" { direction = FabricDirectionGatewayToClient } envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{ RouteID: t.RouteID, ClusterID: t.ClusterID, SourceNodeID: t.LocalNodeID, DestinationNodeID: t.RemoteNodeID, CurrentHopNodeID: envelopeCurrentHop, NextHopNodeID: envelopeNextHop, RoutePath: t.RoutePath, VPNConnectionID: t.VPNConnectionID, Direction: direction, Packets: packets, }) if err != nil { return err } _, err = t.ForwardTransport.SendProduction(ctx, nextHop, envelope) return err } func (i *FabricClientPacketIngress) SendClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, packets [][]byte) error { return i.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, "", packets) } func (i *FabricClientPacketIngress) SendClientPacketBatchWithTrafficClass(ctx context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte) error { packets = cleanPacketBatch(packets) if len(packets) == 0 { return nil } if i == nil { return mesh.ErrForwardRuntimeUnavailable } i.recordSendBatch(len(packets)) scheduler := i.flowScheduler() scheduled, droppedCount := scheduler.scheduleClientPackets(vpnConnectionID, trafficClass, packets) if droppedCount > 0 { i.recordFlowDropped(droppedCount) } if len(scheduled) == 0 { i.recordError(mesh.ErrSyntheticRelayQueueFull) return mesh.ErrSyntheticRelayQueueFull } maxParallel := scheduler.RecommendedParallelSendWindowForTrafficClass(trafficClass, i.maxParallelFlowSends()) if maxParallel > 1 && len(scheduled) > 1 { return i.sendScheduledClientPacketBatchesParallel(ctx, clusterID, vpnConnectionID, scheduled, maxParallel) } for _, batch := range scheduled { i.recordFlowBatch(len(batch.Packets)) if err := i.sendScheduledClientPacketBatch(ctx, clusterID, vpnConnectionID, batch); err != nil { return err } } return nil } func (i *FabricClientPacketIngress) sendScheduledClientPacketBatchesParallel(ctx context.Context, clusterID string, vpnConnectionID string, scheduled []FabricScheduledPacketBatch, maxParallel int) error { if maxParallel <= 1 || len(scheduled) <= 1 { for _, batch := range scheduled { i.recordFlowBatch(len(batch.Packets)) if err := i.sendScheduledClientPacketBatch(ctx, clusterID, vpnConnectionID, batch); err != nil { return err } } return nil } if maxParallel > len(scheduled) { maxParallel = len(scheduled) } i.recordFlowParallel() sem := make(chan struct{}, maxParallel) var wg sync.WaitGroup var errMu sync.Mutex var firstErr error for _, scheduledBatch := range scheduled { batch := scheduledBatch i.recordFlowBatch(len(batch.Packets)) sem <- struct{}{} wg.Add(1) go func() { defer wg.Done() defer func() { <-sem }() if err := i.sendScheduledClientPacketBatch(ctx, clusterID, vpnConnectionID, batch); err != nil { errMu.Lock() if firstErr == nil { firstErr = err } errMu.Unlock() } }() } wg.Wait() return firstErr } func (i *FabricClientPacketIngress) PreferClientRoute(routeID string) { routeID = strings.TrimSpace(routeID) if i == nil || routeID == "" { return } i.mu.Lock() defer i.mu.Unlock() i.lastSelectedRouteID = routeID i.lastSelectedNextHop = "" } func (i *FabricClientPacketIngress) sendScheduledClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, batch FabricScheduledPacketBatch) error { scheduler := i.flowScheduler() scheduler.BeginSend(batch.ChannelID) defer scheduler.EndSend(batch.ChannelID) defer scheduler.Complete(batch) packets := cleanPacketBatch(batch.Packets) if len(packets) == 0 { return nil } candidates := i.routeCandidatesForChannel(clusterID, batch.ChannelID) if len(candidates) == 0 && i.localGatewayReady(vpnConnectionID) { if err := i.inbox().DeliverLocalPacketBatch(vpnConnectionID, FabricDirectionClientToGateway, packets); err != nil { i.recordError(err) return err } scheduler.RecordLocalFallback(batch.ChannelID) i.recordLocalFallback() return nil } if len(candidates) == 0 { i.recordError(mesh.ErrRouteNotFound) return mesh.ErrRouteNotFound } transport := i.forwardTransport() if transport == nil { i.recordError(mesh.ErrForwardRuntimeUnavailable) return mesh.ErrForwardRuntimeUnavailable } var lastErr error for _, candidate := range candidates { startedAt := time.Now() i.recordRouteAttempt() envelopeCurrentHop := candidate.NextHop envelopeNextHop := nextHopAfter(candidate.Route.Hops, envelopeCurrentHop, candidate.Route.DestinationNodeID) envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{ RouteID: candidate.Route.RouteID, ClusterID: candidate.Route.ClusterID, SourceNodeID: candidate.Route.SourceNodeID, DestinationNodeID: candidate.Route.DestinationNodeID, CurrentHopNodeID: envelopeCurrentHop, NextHopNodeID: envelopeNextHop, RoutePath: candidate.Route.Hops, TTL: candidate.Route.MaxTTL, ExpiresAt: candidate.Route.ExpiresAt, VPNConnectionID: vpnConnectionID, Direction: FabricDirectionClientToGateway, Packets: packets, }) if err != nil { lastErr = err scheduler.RecordRouteFailureWithProvenance(batch.ChannelID, candidate.Route.RouteID, candidate.NextHop, err, time.Since(startedAt), i.routeProvenanceFor(candidate.Route)) i.recordRouteFailure(err) continue } if _, err = transport.SendProduction(ctx, candidate.NextHop, envelope); err != nil { lastErr = err scheduler.RecordRouteFailureWithProvenance(batch.ChannelID, candidate.Route.RouteID, candidate.NextHop, err, time.Since(startedAt), i.routeProvenanceFor(candidate.Route)) i.recordRouteFailure(err) continue } preference, _ := i.routeQualityPreference(candidate.Route.RouteID) scheduler.RecordRouteSuccessWithProvenance(batch.ChannelID, candidate.Route.RouteID, candidate.NextHop, time.Since(startedAt), i.routeProvenanceFor(candidate.Route), preference) i.recordRouteSuccess(candidate.Route.RouteID, candidate.NextHop) return nil } if i.localGatewayReady(vpnConnectionID) { if err := i.inbox().DeliverLocalPacketBatch(vpnConnectionID, FabricDirectionClientToGateway, packets); err != nil { i.recordError(err) return err } scheduler.RecordLocalFallback(batch.ChannelID) i.recordLocalFallback() return nil } if lastErr == nil { lastErr = mesh.ErrRouteNotFound } i.recordError(lastErr) return lastErr } func (i *FabricClientPacketIngress) ReceiveClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration) ([][]byte, error) { inbox := i.inbox() if i == nil || inbox == nil { return nil, mesh.ErrForwardRuntimeUnavailable } if _, _, ok := i.selectRoute(clusterID); !ok { if !i.localGatewayReady(vpnConnectionID) { i.recordReceiveEmpty() return nil, mesh.ErrRouteNotFound } } packets, err := inbox.Receive(ctx, vpnConnectionID, FabricDirectionGatewayToClient, timeout) if err != nil { i.recordError(err) return nil, err } if len(packets) == 0 { i.recordReceiveEmpty() return nil, nil } i.recordReceiveBatch(len(packets)) return packets, nil } func (i *FabricClientPacketIngress) localGatewayReady(vpnConnectionID string) bool { if i == nil || !i.AllowLegacyLocalGatewayFallback || i.inbox() == nil || vpnConnectionID == "" { return false } localGateway := i.localGateway() return localGateway != nil && localGateway(vpnConnectionID) } func (i *FabricClientPacketIngress) selectRoute(clusterID string) (mesh.SyntheticRoute, string, bool) { candidates := i.routeCandidates(clusterID) if len(candidates) == 0 { return mesh.SyntheticRoute{}, "", false } return candidates[0].Route, candidates[0].NextHop, true } type fabricClientRouteCandidate struct { Route mesh.SyntheticRoute NextHop string } func (i *FabricClientPacketIngress) routeCandidates(clusterID string) []fabricClientRouteCandidate { return i.routeCandidatesWithPreference(clusterID, i.lastRouteID(), "") } func (i *FabricClientPacketIngress) routeCandidatesForChannel(clusterID string, channelID string) []fabricClientRouteCandidate { preferredRouteID, avoidRouteID := i.flowScheduler().RoutePreference(channelID) if preferredRouteID == "" && avoidRouteID == "" { preferredRouteID = i.lastRouteID() } return i.routeCandidatesWithPreference(clusterID, preferredRouteID, avoidRouteID) } func (i *FabricClientPacketIngress) routeCandidatesWithPreference(clusterID string, preferredRouteID string, avoidRouteID string) []fabricClientRouteCandidate { routesFunc := i.routesFunc() if i == nil || routesFunc == nil { return nil } localClusterID := i.clusterID() localNodeID := i.localNodeID() if clusterID == "" { clusterID = localClusterID } now := time.Now().UTC() var preferred []fabricClientRouteCandidate var alternates []fabricClientRouteCandidate var deferred []fabricClientRouteCandidate var withdrawn []fabricClientRouteCandidate manager := i.routeManager() if preferredRouteID != "" && manager.isWithdrawn(preferredRouteID) { if replacementRouteID := manager.replacementRouteID(preferredRouteID); replacementRouteID != "" { preferredRouteID = replacementRouteID } else { if avoidRouteID == "" { avoidRouteID = preferredRouteID } preferredRouteID = "" } } for _, route := range routesFunc() { if route.ClusterID != clusterID || route.SourceNodeID != localNodeID || !containsString(route.AllowedChannels, mesh.ProductionChannelVPNPacket) { continue } if !route.ExpiresAt.IsZero() && !route.ExpiresAt.After(now) { continue } nextHop := nextHopAfter(route.Hops, localNodeID, route.DestinationNodeID) if nextHop == "" || nextHop == localNodeID { continue } candidate := fabricClientRouteCandidate{Route: route, NextHop: nextHop} if manager.isWithdrawn(route.RouteID) { withdrawn = append(withdrawn, candidate) continue } if preferredRouteID != "" && route.RouteID == preferredRouteID { preferred = append(preferred, candidate) } else if avoidRouteID != "" && route.RouteID == avoidRouteID { deferred = append(deferred, candidate) } else { alternates = append(alternates, candidate) } } if len(preferred) > 0 { destinationNodeID := strings.TrimSpace(preferred[0].Route.DestinationNodeID) alternates = filterRouteCandidatesByDestination(alternates, destinationNodeID) deferred = filterRouteCandidatesByDestination(deferred, destinationNodeID) } out := append(preferred, alternates...) out = i.applyRouteQualityPreferences(out, preferredRouteID) out = append(out, deferred...) if len(out) == 0 && i.preventLastRouteWithdrawal() { return withdrawn } return out } func filterRouteCandidatesByDestination(candidates []fabricClientRouteCandidate, destinationNodeID string) []fabricClientRouteCandidate { destinationNodeID = strings.TrimSpace(destinationNodeID) if destinationNodeID == "" || len(candidates) == 0 { return candidates } out := candidates[:0] for _, candidate := range candidates { if strings.TrimSpace(candidate.Route.DestinationNodeID) == destinationNodeID { out = append(out, candidate) } } return out } func (i *FabricClientPacketIngress) applyRouteQualityPreferences(candidates []fabricClientRouteCandidate, preferredRouteID string) []fabricClientRouteCandidate { if len(candidates) < 2 { return candidates } preferences := i.routeQualityPreferences() if len(preferences) == 0 { return candidates } preferredScore := 0 if preferredRouteID != "" { if preference, ok := preferences[preferredRouteID]; ok { preferredScore = preference.ScoreAdjustment } } bestIndex := -1 bestScore := 0 for index, candidate := range candidates { preference, ok := preferences[candidate.Route.RouteID] if !ok || preference.ScoreAdjustment <= 0 { continue } if bestIndex == -1 || preference.ScoreAdjustment > bestScore { bestIndex = index bestScore = preference.ScoreAdjustment } } if bestIndex <= 0 || bestScore < preferredScore+defaultFabricRouteQualitySwitchThreshold { return candidates } out := make([]fabricClientRouteCandidate, 0, len(candidates)) out = append(out, candidates[bestIndex]) out = append(out, candidates[:bestIndex]...) out = append(out, candidates[bestIndex+1:]...) return out } func (i *FabricClientPacketIngress) preventLastRouteWithdrawal() bool { if i == nil { return false } i.mu.Lock() defer i.mu.Unlock() return i.PreventLastRouteWithdrawal } func (t *FabricPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { if t == nil || t.Inbox == nil { return nil, mesh.ErrForwardRuntimeUnavailable } direction := t.ReceiveDirection if direction == "" { direction = FabricDirectionClientToGateway } return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout) } type FabricPacketInbox struct { capacity int mu sync.Mutex queues map[string]*fabricPacketInboxQueue dropped uint64 } type fabricPacketInboxQueue struct { normal chan mesh.VPNPacketBatchPayload priority chan mesh.VPNPacketBatchPayload } func NewFabricPacketInbox(capacity int) *FabricPacketInbox { if capacity <= 0 { capacity = 4096 } return &FabricPacketInbox{ capacity: capacity, queues: map[string]*fabricPacketInboxQueue{}, } } func (i *FabricPacketInbox) DeliverProductionEnvelope(_ context.Context, envelope mesh.ProductionEnvelope) error { if i == nil { return mesh.ErrForwardRuntimeUnavailable } if envelope.ChannelClass != mesh.ProductionChannelVPNPacket || envelope.MessageType != mesh.ProductionMessageVPNPacketBatch { return nil } payload, err := mesh.DecodeProductionVPNPacketBatch(envelope) if err != nil { return err } payload.Packets = cleanPacketBatch(payload.Packets) if len(payload.Packets) == 0 { return nil } return i.enqueue(payload) } func (i *FabricPacketInbox) DeliverLocalPacketBatch(vpnConnectionID, direction string, packets [][]byte) error { if i == nil { return mesh.ErrForwardRuntimeUnavailable } if vpnConnectionID == "" || direction == "" { return mesh.ErrForwardEnvelopeInvalid } packets = cleanPacketBatch(packets) if len(packets) == 0 { return nil } return i.enqueue(mesh.VPNPacketBatchPayload{ SchemaVersion: "rap.vpn_packet_batch.v1", VPNConnectionID: vpnConnectionID, Direction: direction, Packets: packets, SentAt: time.Now().UTC(), }) } func (i *FabricPacketInbox) Receive(ctx context.Context, vpnConnectionID, direction string, timeout time.Duration) ([][]byte, error) { if i == nil { return nil, mesh.ErrForwardRuntimeUnavailable } if vpnConnectionID == "" || direction == "" { return nil, mesh.ErrForwardEnvelopeInvalid } if timeout <= 0 { timeout = 25 * time.Second } timer := time.NewTimer(timeout) defer timer.Stop() queue := i.queue(vpnConnectionID, direction) for { select { case payload := <-queue.priority: packets := cleanPacketBatch(payload.Packets) if len(packets) == 0 { continue } return packets, nil default: } if len(queue.normal) > 0 { priorityTimer := time.NewTimer(2 * time.Millisecond) select { case <-ctx.Done(): priorityTimer.Stop() return nil, ctx.Err() case <-timer.C: priorityTimer.Stop() return nil, nil case payload := <-queue.priority: priorityTimer.Stop() packets := cleanPacketBatch(payload.Packets) if len(packets) == 0 { continue } return packets, nil case <-priorityTimer.C: } } select { case <-ctx.Done(): return nil, ctx.Err() case <-timer.C: return nil, nil case payload := <-queue.priority: packets := cleanPacketBatch(payload.Packets) if len(packets) == 0 { continue } return packets, nil case payload := <-queue.normal: packets := cleanPacketBatch(payload.Packets) if len(packets) == 0 { continue } return packets, nil } } } func (i *FabricPacketInbox) enqueue(payload mesh.VPNPacketBatchPayload) error { queue := i.queue(payload.VPNConnectionID, payload.Direction) target := queue.normal if payload.Direction == FabricDirectionGatewayToClient && batchHasTCPControlPacket(payload.Packets) { target = queue.priority } select { case target <- payload: default: i.mu.Lock() i.dropped++ i.mu.Unlock() } return nil } func (i *FabricPacketInbox) queue(vpnConnectionID, direction string) *fabricPacketInboxQueue { key := vpnConnectionID + "\x00" + direction i.mu.Lock() defer i.mu.Unlock() if i.queues == nil { i.queues = map[string]*fabricPacketInboxQueue{} } queue, ok := i.queues[key] if !ok { priorityCapacity := maxInt(1, i.capacity/4) queue = &fabricPacketInboxQueue{ normal: make(chan mesh.VPNPacketBatchPayload, i.capacity), priority: make(chan mesh.VPNPacketBatchPayload, priorityCapacity), } i.queues[key] = queue } return queue } func batchHasTCPControlPacket(packets [][]byte) bool { for _, packet := range packets { if isTCPControlPacket(packet) { return true } } return false } func maxInt(a, b int) int { if a > b { return a } return b } func (i *FabricPacketInbox) Dropped() uint64 { if i == nil { return 0 } i.mu.Lock() defer i.mu.Unlock() return i.dropped } type FabricPacketInboxSnapshot struct { SchemaVersion string `json:"schema_version"` Capacity int `json:"capacity"` Dropped uint64 `json:"dropped"` QueueDepths map[string]int `json:"queue_depths"` QueueCount int `json:"queue_count"` } func (i *FabricPacketInbox) Snapshot() FabricPacketInboxSnapshot { snapshot := FabricPacketInboxSnapshot{ SchemaVersion: "rap.fabric_packet_inbox.v1", QueueDepths: map[string]int{}, } if i == nil { return snapshot } i.mu.Lock() defer i.mu.Unlock() snapshot.Capacity = i.capacity snapshot.Dropped = i.dropped snapshot.QueueCount = len(i.queues) for key, queue := range i.queues { snapshot.QueueDepths[strings.ReplaceAll(key, "\x00", ":")] = len(queue.normal) + len(queue.priority) } return snapshot } type FabricClientPacketIngressSnapshot struct { SchemaVersion string `json:"schema_version"` ClusterID string `json:"cluster_id,omitempty"` LocalNodeID string `json:"local_node_id,omitempty"` RouteCandidateCount int `json:"route_candidate_count"` LastSelectedRouteID string `json:"last_selected_route_id,omitempty"` LastSelectedNextHop string `json:"last_selected_next_hop,omitempty"` LastError string `json:"last_error,omitempty"` SendBatches uint64 `json:"send_batches"` SendPackets uint64 `json:"send_packets"` SendRouteAttempts uint64 `json:"send_route_attempts"` SendRouteFailures uint64 `json:"send_route_failures"` SendFallbackLocal uint64 `json:"send_fallback_local"` SendFlowBatches uint64 `json:"send_flow_batches"` SendFlowPackets uint64 `json:"send_flow_packets"` SendFlowDropped uint64 `json:"send_flow_dropped"` SendFlowParallel uint64 `json:"send_flow_parallel_batches"` MaxParallelFlowSends int `json:"max_parallel_flow_sends"` RecommendedParallelFlowSends int `json:"recommended_parallel_flow_sends"` ReceiveBatches uint64 `json:"receive_batches"` ReceivePackets uint64 `json:"receive_packets"` ReceiveEmpty uint64 `json:"receive_empty"` Inbox FabricPacketInboxSnapshot `json:"inbox"` FlowScheduler FabricFlowSchedulerSnapshot `json:"flow_scheduler"` RouteManager FabricServiceChannelRouteManager `json:"route_manager"` RouteManagerTransition FabricServiceChannelRouteManagerTransition `json:"route_manager_transition"` RouteQualityPreferenceCount int `json:"route_quality_preference_count"` RouteQualityPreferences []FabricServiceChannelRouteQualityPreference `json:"route_quality_preferences,omitempty"` } func (i *FabricClientPacketIngress) Snapshot(clusterID string) FabricClientPacketIngressSnapshot { snapshot := FabricClientPacketIngressSnapshot{ SchemaVersion: "rap.fabric_service_channel_route_manager.v1", } if i == nil { return snapshot } i.mu.Lock() snapshot.ClusterID = firstNonEmpty(clusterID, i.ClusterID) snapshot.LocalNodeID = i.LocalNodeID snapshot.LastSelectedRouteID = i.lastSelectedRouteID snapshot.LastSelectedNextHop = i.lastSelectedNextHop snapshot.LastError = i.lastError snapshot.SendBatches = i.sendBatches snapshot.SendPackets = i.sendPackets snapshot.SendRouteAttempts = i.sendRouteAttempts snapshot.SendRouteFailures = i.sendRouteFailures snapshot.SendFallbackLocal = i.sendFallbackLocal snapshot.SendFlowBatches = i.sendFlowBatches snapshot.SendFlowPackets = i.sendFlowPackets snapshot.SendFlowDropped = i.sendFlowDropped snapshot.SendFlowParallel = i.sendFlowParallel snapshot.MaxParallelFlowSends = i.maxParallelFlowSendsLocked() snapshot.ReceiveBatches = i.receiveBatches snapshot.ReceivePackets = i.receivePackets snapshot.ReceiveEmpty = i.receiveEmpty snapshot.RouteManager = i.RouteManager.snapshot() snapshot.RouteManagerTransition = i.RouteManagerTransition.snapshot() snapshot.RouteQualityPreferenceCount = len(i.RouteQualityPreferences) snapshot.RouteQualityPreferences = routeQualityPreferenceSlice(i.RouteQualityPreferences) recoveryPolicyFingerprint := strings.TrimSpace(i.RecoveryPolicyFingerprint) i.mu.Unlock() snapshot.RouteCandidateCount = len(i.routeCandidates(snapshot.ClusterID)) snapshot.Inbox = i.inbox().Snapshot() snapshot.FlowScheduler = i.flowScheduler().Snapshot() annotateFabricFlowSchedulerProvenance(&snapshot.FlowScheduler, i.routeProvenance(snapshot.ClusterID), recoveryPolicyFingerprint) snapshot.RecommendedParallelFlowSends = i.flowScheduler().RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassBulk, snapshot.MaxParallelFlowSends) return snapshot } func (i *FabricClientPacketIngress) UpdateRuntime(forwardTransport mesh.ProductionForwardTransport, inbox *FabricPacketInbox, clusterID string, localNodeID string, localGateway func(string) bool, routes func() []mesh.SyntheticRoute, recoveryPolicyFingerprint string, adaptivePolicies ...FabricServiceChannelAdaptivePolicy) { if i == nil { return } i.mu.Lock() defer i.mu.Unlock() i.ForwardTransport = forwardTransport i.Inbox = inbox i.ClusterID = clusterID i.LocalNodeID = localNodeID i.LocalGateway = localGateway i.Routes = routes i.RecoveryPolicyFingerprint = strings.TrimSpace(recoveryPolicyFingerprint) adaptivePolicy := defaultFabricServiceChannelAdaptivePolicy() if len(adaptivePolicies) > 0 { adaptivePolicy = adaptivePolicies[0] } i.AdaptivePolicyFingerprint = strings.TrimSpace(adaptivePolicy.Fingerprint) if i.FlowScheduler == nil { i.FlowScheduler = NewFabricFlowScheduler(0, 0) } i.FlowScheduler.ConfigureAdaptivePolicy(adaptivePolicy) if i.MaxParallelFlowSends <= 0 { i.MaxParallelFlowSends = defaultFabricFlowParallelSendWindow } } type fabricRouteProvenance struct { PolicyVersion string Generation string } type fabricFlowRouteProvenance struct { PolicyVersion string Generation string RecoveryPolicyFingerprint string } func (i *FabricClientPacketIngress) routeProvenanceFor(route mesh.SyntheticRoute) fabricFlowRouteProvenance { policyVersion := strings.TrimSpace(route.PolicyVersion) if policyVersion == "" { policyVersion = strings.TrimSpace(route.RouteVersion) } generation := policyVersion return fabricFlowRouteProvenance{ PolicyVersion: policyVersion, Generation: generation, RecoveryPolicyFingerprint: strings.TrimSpace(i.RecoveryPolicyFingerprint), } } func (i *FabricClientPacketIngress) routeProvenance(clusterID string) map[string]fabricRouteProvenance { out := map[string]fabricRouteProvenance{} routesFunc := i.routesFunc() if i == nil || routesFunc == nil { return out } localNodeID := i.localNodeID() for _, route := range routesFunc() { if strings.TrimSpace(route.RouteID) == "" { continue } if clusterID != "" && route.ClusterID != clusterID { continue } if localNodeID != "" && route.SourceNodeID != localNodeID { continue } policyVersion := strings.TrimSpace(route.PolicyVersion) if policyVersion == "" { policyVersion = strings.TrimSpace(route.RouteVersion) } out[route.RouteID] = fabricRouteProvenance{ PolicyVersion: policyVersion, Generation: policyVersion, } } return out } func annotateFabricFlowSchedulerProvenance(snapshot *FabricFlowSchedulerSnapshot, routes map[string]fabricRouteProvenance, recoveryPolicyFingerprint string) { if snapshot == nil || len(snapshot.ChannelStats) == 0 { return } for channelID, stat := range snapshot.ChannelStats { if recoveryPolicyFingerprint != "" { stat.RecoveryPolicyFingerprint = recoveryPolicyFingerprint } if route, ok := routes[stat.LastRouteID]; ok { stat.RoutePolicyVersion = route.PolicyVersion stat.RouteGeneration = route.Generation } if route, ok := routes[stat.LastFailedRouteID]; ok { stat.LastFailedRoutePolicyVersion = route.PolicyVersion stat.LastFailedRouteGeneration = route.Generation if stat.RoutePolicyVersion == "" { stat.RoutePolicyVersion = route.PolicyVersion } if stat.RouteGeneration == "" { stat.RouteGeneration = route.Generation } } snapshot.ChannelStats[channelID] = stat } } func (i *FabricClientPacketIngress) UpdateRouteManager(decisions []FabricServiceChannelRouteManagerDecision, generation string, observedAt time.Time) { if i == nil { return } manager := NewFabricServiceChannelRouteManager(decisions, generation, observedAt) i.mu.Lock() transition := newFabricServiceChannelRouteManagerTransition(i.RouteManager, manager, observedAt) i.RouteManager = manager if i.lastSelectedRouteID != "" && manager.isWithdrawn(i.lastSelectedRouteID) { clearedRouteID := i.lastSelectedRouteID transition.ClearedSelectedRouteID = clearedRouteID i.lastSelectedRouteID = manager.replacementRouteID(clearedRouteID) i.lastSelectedNextHop = "" } i.RouteManagerTransition = transition withdrawnRoutes := manager.withdrawnRouteIDs() scheduler := i.FlowScheduler i.mu.Unlock() if scheduler != nil { scheduler.ClearQualityPreferencesForRoutes(withdrawnRoutes) } } func (i *FabricClientPacketIngress) UpdateRouteQualityPreferences(preferences []FabricServiceChannelRouteQualityPreference, observedAt time.Time) { if i == nil { return } now := observedAt.UTC() if now.IsZero() { now = time.Now().UTC() } next := map[string]FabricServiceChannelRouteQualityPreference{} for _, preference := range preferences { preference.RouteID = strings.TrimSpace(preference.RouteID) preference.FeedbackStatus = strings.TrimSpace(preference.FeedbackStatus) if preference.RouteID == "" || preference.ScoreAdjustment <= 0 { continue } if preference.FeedbackStatus != "" && preference.FeedbackStatus != "healthy" { continue } if preference.ExpiresAt != "" { expiresAt, err := time.Parse(time.RFC3339Nano, preference.ExpiresAt) if err == nil && !expiresAt.After(now) { continue } } if preference.RawScoreAdjustment <= 0 { preference.RawScoreAdjustment = preference.ScoreAdjustment } preference.Reasons = dedupeStrings(preference.Reasons) next[preference.RouteID] = preference } i.mu.Lock() i.RouteQualityPreferences = next scheduler := i.FlowScheduler i.mu.Unlock() validRouteIDs := make(map[string]struct{}, len(next)) for routeID := range next { validRouteIDs[routeID] = struct{}{} } if scheduler != nil { scheduler.ClearQualityPreferencesNotIn(validRouteIDs) } } func NewFabricServiceChannelRouteManager(decisions []FabricServiceChannelRouteManagerDecision, generation string, observedAt time.Time) FabricServiceChannelRouteManager { manager := FabricServiceChannelRouteManager{ SchemaVersion: "rap.fabric_service_channel_route_manager_rebuild.v1", Generation: strings.TrimSpace(generation), Decisions: []FabricServiceChannelRouteManagerDecision{}, withdrawnRoutes: map[string]FabricServiceChannelRouteManagerDecision{}, replacements: map[string]string{}, } if !observedAt.IsZero() { manager.LastAppliedAt = observedAt.UTC().Format(time.RFC3339Nano) } for _, decision := range decisions { decision.RouteID = strings.TrimSpace(decision.RouteID) decision.ReplacementRouteID = strings.TrimSpace(decision.ReplacementRouteID) decision.RebuildStatus = strings.TrimSpace(decision.RebuildStatus) decision.RebuildRequestID = strings.TrimSpace(decision.RebuildRequestID) if decision.RouteID == "" || decision.RebuildStatus == "" { continue } decision.EffectiveHops = append([]string{}, decision.EffectiveHops...) manager.Decisions = append(manager.Decisions, decision) manager.RebuildRequestCount++ switch decision.RebuildStatus { case "applied": manager.RebuildAppliedCount++ manager.WithdrawnRouteCount++ manager.withdrawnRoutes[decision.RouteID] = decision if decision.ReplacementRouteID != "" { manager.replacements[decision.RouteID] = decision.ReplacementRouteID } case "pending_degraded_fallback": manager.PendingFallbackCount++ manager.WithdrawnRouteCount++ manager.withdrawnRoutes[decision.RouteID] = decision } } return manager } func (m FabricServiceChannelRouteManager) snapshot() FabricServiceChannelRouteManager { out := m out.Decisions = append([]FabricServiceChannelRouteManagerDecision{}, m.Decisions...) out.withdrawnRoutes = nil out.replacements = nil if out.SchemaVersion == "" { out.SchemaVersion = "rap.fabric_service_channel_route_manager_rebuild.v1" } return out } func newFabricServiceChannelRouteManagerTransition(previous FabricServiceChannelRouteManager, next FabricServiceChannelRouteManager, observedAt time.Time) FabricServiceChannelRouteManagerTransition { transition := FabricServiceChannelRouteManagerTransition{ SchemaVersion: "rap.fabric_service_channel_route_manager_transition.v1", PreviousGeneration: strings.TrimSpace(previous.Generation), Generation: strings.TrimSpace(next.Generation), DecisionCount: len(next.Decisions), WithdrawnRouteCount: next.WithdrawnRouteCount, PendingFallbackCount: next.PendingFallbackCount, RebuildAppliedCount: next.RebuildAppliedCount, } if !observedAt.IsZero() { transition.ObservedAt = observedAt.UTC().Format(time.RFC3339Nano) } previousWithdrawn := previous.withdrawnRouteIDs() nextWithdrawn := next.withdrawnRouteIDs() for routeID := range previousWithdrawn { if _, ok := nextWithdrawn[routeID]; !ok { transition.RestoredRouteCount++ } } switch { case transition.RestoredRouteCount > 0 && transition.WithdrawnRouteCount == 0: transition.Status = "restored_by_new_config" case transition.RebuildAppliedCount > 0: transition.Status = "applied_rebuild" case transition.PendingFallbackCount > 0: transition.Status = "pending_degraded_fallback" case transition.DecisionCount > 0: transition.Status = "decisions_observed" default: transition.Status = "empty" } return transition } func (t FabricServiceChannelRouteManagerTransition) snapshot() FabricServiceChannelRouteManagerTransition { if t.SchemaVersion == "" { t.SchemaVersion = "rap.fabric_service_channel_route_manager_transition.v1" } return t } func (m FabricServiceChannelRouteManager) withdrawnRouteIDs() map[string]struct{} { out := map[string]struct{}{} if m.withdrawnRoutes != nil { for routeID := range m.withdrawnRoutes { routeID = strings.TrimSpace(routeID) if routeID != "" { out[routeID] = struct{}{} } } return out } for _, decision := range m.Decisions { if decision.RouteID == "" { continue } if decision.RebuildStatus == "applied" || decision.RebuildStatus == "pending_degraded_fallback" { out[strings.TrimSpace(decision.RouteID)] = struct{}{} } } return out } func (m FabricServiceChannelRouteManager) isWithdrawn(routeID string) bool { routeID = strings.TrimSpace(routeID) if routeID == "" { return false } if m.withdrawnRoutes != nil { _, ok := m.withdrawnRoutes[routeID] return ok } for _, decision := range m.Decisions { if decision.RouteID == routeID && (decision.RebuildStatus == "applied" || decision.RebuildStatus == "pending_degraded_fallback") { return true } } return false } func (m FabricServiceChannelRouteManager) replacementRouteID(routeID string) string { routeID = strings.TrimSpace(routeID) if routeID == "" { return "" } if m.replacements != nil { return strings.TrimSpace(m.replacements[routeID]) } for _, decision := range m.Decisions { if strings.TrimSpace(decision.RouteID) == routeID && decision.RebuildStatus == "applied" { return strings.TrimSpace(decision.ReplacementRouteID) } } return "" } func (i *FabricClientPacketIngress) forwardTransport() mesh.ProductionForwardTransport { if i == nil { return nil } i.mu.Lock() defer i.mu.Unlock() return i.ForwardTransport } func (i *FabricClientPacketIngress) inbox() *FabricPacketInbox { if i == nil { return nil } i.mu.Lock() defer i.mu.Unlock() return i.Inbox } func (i *FabricClientPacketIngress) localGateway() func(string) bool { if i == nil { return nil } i.mu.Lock() defer i.mu.Unlock() return i.LocalGateway } func (i *FabricClientPacketIngress) routesFunc() func() []mesh.SyntheticRoute { if i == nil { return nil } i.mu.Lock() defer i.mu.Unlock() return i.Routes } func (i *FabricClientPacketIngress) clusterID() string { if i == nil { return "" } i.mu.Lock() defer i.mu.Unlock() return strings.TrimSpace(i.ClusterID) } func (i *FabricClientPacketIngress) localNodeID() string { if i == nil { return "" } i.mu.Lock() defer i.mu.Unlock() return strings.TrimSpace(i.LocalNodeID) } func (i *FabricClientPacketIngress) flowScheduler() *FabricFlowScheduler { if i == nil { return NewFabricFlowScheduler(0, 0) } i.mu.Lock() defer i.mu.Unlock() if i.FlowScheduler == nil { i.FlowScheduler = NewFabricFlowScheduler(0, 0) } return i.FlowScheduler } func (i *FabricClientPacketIngress) maxParallelFlowSends() int { if i == nil { return 1 } i.mu.Lock() defer i.mu.Unlock() return i.maxParallelFlowSendsLocked() } func (i *FabricClientPacketIngress) maxParallelFlowSendsLocked() int { if i == nil || i.MaxParallelFlowSends <= 0 { return 1 } return i.MaxParallelFlowSends } func (i *FabricClientPacketIngress) routeManager() FabricServiceChannelRouteManager { if i == nil { return FabricServiceChannelRouteManager{} } i.mu.Lock() defer i.mu.Unlock() return i.RouteManager } func (i *FabricClientPacketIngress) routeQualityPreferences() map[string]FabricServiceChannelRouteQualityPreference { if i == nil { return nil } i.mu.Lock() defer i.mu.Unlock() out := make(map[string]FabricServiceChannelRouteQualityPreference, len(i.RouteQualityPreferences)) for routeID, preference := range i.RouteQualityPreferences { out[routeID] = preference } return out } func (i *FabricClientPacketIngress) routeQualityPreference(routeID string) (FabricServiceChannelRouteQualityPreference, bool) { routeID = strings.TrimSpace(routeID) if i == nil || routeID == "" { return FabricServiceChannelRouteQualityPreference{}, false } i.mu.Lock() defer i.mu.Unlock() preference, ok := i.RouteQualityPreferences[routeID] return preference, ok } func routeQualityPreferenceSlice(preferences map[string]FabricServiceChannelRouteQualityPreference) []FabricServiceChannelRouteQualityPreference { if len(preferences) == 0 { return nil } out := make([]FabricServiceChannelRouteQualityPreference, 0, len(preferences)) for _, preference := range preferences { preference.Reasons = dedupeStrings(preference.Reasons) out = append(out, preference) } sort.SliceStable(out, func(a, b int) bool { if out[a].ScoreAdjustment != out[b].ScoreAdjustment { return out[a].ScoreAdjustment > out[b].ScoreAdjustment } return out[a].RouteID < out[b].RouteID }) return out } func (i *FabricClientPacketIngress) lastRouteID() string { if i == nil { return "" } i.mu.Lock() defer i.mu.Unlock() return i.lastSelectedRouteID } func (i *FabricClientPacketIngress) recordSendBatch(packetCount int) { i.mu.Lock() defer i.mu.Unlock() i.sendBatches++ i.sendPackets += uint64(packetCount) } func (i *FabricClientPacketIngress) recordRouteAttempt() { i.mu.Lock() defer i.mu.Unlock() i.sendRouteAttempts++ } func (i *FabricClientPacketIngress) recordRouteFailure(err error) { i.mu.Lock() defer i.mu.Unlock() i.sendRouteFailures++ if err != nil { i.lastError = err.Error() } } func (i *FabricClientPacketIngress) recordRouteSuccess(routeID, nextHop string) { i.mu.Lock() defer i.mu.Unlock() i.lastSelectedRouteID = routeID i.lastSelectedNextHop = nextHop i.lastError = "" } func (i *FabricClientPacketIngress) recordLocalFallback() { i.mu.Lock() defer i.mu.Unlock() i.sendFallbackLocal++ i.lastSelectedRouteID = "local_gateway" i.lastSelectedNextHop = i.LocalNodeID i.lastError = "" } func (i *FabricClientPacketIngress) recordFlowBatch(packetCount int) { i.mu.Lock() defer i.mu.Unlock() i.sendFlowBatches++ i.sendFlowPackets += uint64(packetCount) } func (i *FabricClientPacketIngress) recordFlowDropped(packetCount uint64) { i.mu.Lock() defer i.mu.Unlock() i.sendFlowDropped += packetCount } func (i *FabricClientPacketIngress) recordFlowParallel() { i.mu.Lock() defer i.mu.Unlock() i.sendFlowParallel++ } func (i *FabricClientPacketIngress) recordReceiveBatch(packetCount int) { i.mu.Lock() defer i.mu.Unlock() i.receiveBatches++ i.receivePackets += uint64(packetCount) } func (i *FabricClientPacketIngress) recordReceiveEmpty() { i.mu.Lock() defer i.mu.Unlock() i.receiveEmpty++ } func (i *FabricClientPacketIngress) recordError(err error) { if err == nil { return } i.mu.Lock() defer i.mu.Unlock() i.lastError = err.Error() } func firstNonEmpty(values ...string) string { for _, value := range values { if strings.TrimSpace(value) != "" { return strings.TrimSpace(value) } } return "" } func nextHopAfter(path []string, localNodeID string, destinationNodeID string) string { if len(path) == 0 { return destinationNodeID } for index, nodeID := range path { if nodeID == localNodeID { if index+1 < len(path) { return path[index+1] } return localNodeID } } return destinationNodeID } func containsString(values []string, needle string) bool { for _, value := range values { if value == needle { return true } } return false } func dedupeStrings(values []string) []string { if len(values) == 0 { return nil } seen := map[string]struct{}{} out := make([]string, 0, len(values)) for _, value := range values { value = strings.TrimSpace(value) if value == "" { continue } if _, ok := seen[value]; ok { continue } seen[value] = struct{}{} out = append(out, value) } return out } func classifyPacketFlow(packet []byte, shardCount int) (string, int) { if shardCount <= 0 { shardCount = defaultFabricFlowShardCount } key := packetFlowKey(packet) hash := fnv.New32a() _, _ = hash.Write([]byte(key)) shard := int(hash.Sum32() % uint32(shardCount)) return key, shard } func packetFlowKey(packet []byte) string { if len(packet) == 0 { return "empty" } version := packet[0] >> 4 switch version { case 4: return ipv4PacketFlowKey(packet) case 6: return ipv6PacketFlowKey(packet) default: sum := fnv.New64a() _, _ = sum.Write(packet) return fmt.Sprintf("opaque:%x", sum.Sum64()) } } func ipv4PacketFlowKey(packet []byte) string { if len(packet) < 20 { return packetHashFlowKey("ipv4-short", packet) } ihl := int(packet[0]&0x0f) * 4 if ihl < 20 || len(packet) < ihl { return packetHashFlowKey("ipv4-invalid", packet) } proto := packet[9] src := binary.BigEndian.Uint32(packet[12:16]) dst := binary.BigEndian.Uint32(packet[16:20]) srcPort, dstPort := transportPorts(proto, packet[ihl:]) if src > dst || (src == dst && srcPort > dstPort) { src, dst = dst, src srcPort, dstPort = dstPort, srcPort } return fmt.Sprintf("ipv4:%d:%08x:%d:%08x:%d", proto, src, srcPort, dst, dstPort) } func ipv6PacketFlowKey(packet []byte) string { if len(packet) < 40 { return packetHashFlowKey("ipv6-short", packet) } nextHeader := packet[6] src := packet[8:24] dst := packet[24:40] srcPort, dstPort := transportPorts(nextHeader, packet[40:]) srcKey := fmt.Sprintf("%x", src) dstKey := fmt.Sprintf("%x", dst) if srcKey > dstKey || (srcKey == dstKey && srcPort > dstPort) { srcKey, dstKey = dstKey, srcKey srcPort, dstPort = dstPort, srcPort } return fmt.Sprintf("ipv6:%d:%s:%d:%s:%d", nextHeader, srcKey, srcPort, dstKey, dstPort) } func transportPorts(proto byte, payload []byte) (uint16, uint16) { switch proto { case 6, 17: if len(payload) >= 4 { return binary.BigEndian.Uint16(payload[0:2]), binary.BigEndian.Uint16(payload[2:4]) } } return 0, 0 } func packetHashFlowKey(prefix string, packet []byte) string { sum := fnv.New64a() _, _ = sum.Write(packet) return fmt.Sprintf("%s:%x", prefix, sum.Sum64()) } func cleanPacketBatch(packets [][]byte) [][]byte { if len(packets) == 0 { return nil } cleaned := make([][]byte, 0, len(packets)) for _, packet := range packets { if len(packet) == 0 { continue } cleaned = append(cleaned, append([]byte(nil), packet...)) } return cleaned }