Files
rdp-proxy/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go
T
2026-05-16 13:13:49 +03:00

2843 lines
95 KiB
Go

package vpnruntime
import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash/fnv"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
)
const (
FabricDirectionClientToGateway = "client_to_gateway"
FabricDirectionGatewayToClient = "gateway_to_client"
defaultFabricFlowShardCount = 8
defaultFabricFlowQueueCapacity = 1024
defaultFabricFlowParallelSendWindow = 8
defaultFabricFlowQualityWindowCapacity = 32
defaultFabricFlowFailureThreshold = 2
defaultFabricFlowSlowSendThreshold = 2 * time.Second
defaultFabricRouteQualitySwitchThreshold = 30
)
type FabricPacketTransport struct {
ForwardTransport mesh.ProductionForwardTransport
Inbox *FabricPacketInbox
ClusterID string
VPNConnectionID string
RouteID string
LocalNodeID string
RemoteNodeID string
NextHopNodeID string
RoutePath []string
SendDirection string
ReceiveDirection string
}
type FabricClientPacketIngress struct {
ForwardTransport mesh.ProductionForwardTransport
Inbox *FabricPacketInbox
Routes func() []mesh.SyntheticRoute
LocalGateway func(vpnConnectionID string) bool
AllowLegacyLocalGatewayFallback bool
FlowScheduler *FabricFlowScheduler
MaxParallelFlowSends int
RecoveryPolicyFingerprint string
AdaptivePolicyFingerprint string
PreventLastRouteWithdrawal bool
ClusterID string
LocalNodeID string
RouteManager FabricServiceChannelRouteManager
RouteManagerTransition FabricServiceChannelRouteManagerTransition
RouteQualityPreferences map[string]FabricServiceChannelRouteQualityPreference
mu sync.Mutex
lastSelectedRouteID string
lastSelectedNextHop string
lastError string
sendBatches uint64
sendPackets uint64
sendRouteAttempts uint64
sendRouteFailures uint64
sendFallbackLocal uint64
sendFlowBatches uint64
sendFlowPackets uint64
sendFlowDropped uint64
sendFlowParallel uint64
receiveBatches uint64
receivePackets uint64
receiveEmpty uint64
}
type FabricServiceChannelRouteManager struct {
SchemaVersion string `json:"schema_version"`
Generation string `json:"generation,omitempty"`
RebuildRequestCount int `json:"rebuild_request_count"`
RebuildAppliedCount int `json:"rebuild_applied_count"`
WithdrawnRouteCount int `json:"withdrawn_route_count"`
PendingFallbackCount int `json:"pending_degraded_fallback_count"`
LastAppliedAt string `json:"last_applied_at,omitempty"`
Decisions []FabricServiceChannelRouteManagerDecision `json:"decisions,omitempty"`
withdrawnRoutes map[string]FabricServiceChannelRouteManagerDecision
replacements map[string]string
}
type FabricServiceChannelRouteManagerTransition struct {
SchemaVersion string `json:"schema_version"`
PreviousGeneration string `json:"previous_generation,omitempty"`
Generation string `json:"generation,omitempty"`
Status string `json:"status,omitempty"`
ObservedAt string `json:"observed_at,omitempty"`
DecisionCount int `json:"decision_count"`
WithdrawnRouteCount int `json:"withdrawn_route_count"`
RestoredRouteCount int `json:"restored_route_count"`
ClearedSelectedRouteID string `json:"cleared_selected_route_id,omitempty"`
PendingFallbackCount int `json:"pending_degraded_fallback_count"`
RebuildAppliedCount int `json:"rebuild_applied_count"`
}
type FabricServiceChannelRouteManagerDecision struct {
RouteID string `json:"route_id"`
ReplacementRouteID string `json:"replacement_route_id,omitempty"`
RebuildRequestID string `json:"rebuild_request_id,omitempty"`
RebuildStatus string `json:"rebuild_status,omitempty"`
RebuildReason string `json:"rebuild_reason,omitempty"`
RebuildAttempt int `json:"rebuild_attempt,omitempty"`
DecisionSource string `json:"decision_source,omitempty"`
Generation string `json:"generation,omitempty"`
EffectiveHops []string `json:"effective_hops,omitempty"`
}
type FabricServiceChannelRouteQualityPreference struct {
RouteID string `json:"route_id"`
FeedbackStatus string `json:"feedback_status,omitempty"`
ScoreAdjustment int `json:"score_adjustment"`
RawScoreAdjustment int `json:"raw_score_adjustment,omitempty"`
Reasons []string `json:"reasons,omitempty"`
LastSendDurationMs int64 `json:"last_send_duration_ms,omitempty"`
ObservedAt string `json:"observed_at,omitempty"`
ExpiresAt string `json:"expires_at,omitempty"`
}
type FabricFlowScheduler struct {
mu sync.Mutex
shardCount int
queueCapacity int
adaptivePolicy FabricServiceChannelAdaptivePolicy
queues map[string]*fabricFlowQueue
enqueued uint64
dequeued uint64
dropped uint64
highWatermark int
inFlight int
maxInFlight int
}
type FabricServiceChannelAdaptivePolicy struct {
SchemaVersion string
Fingerprint string
MaxParallelWindow int
BulkPressureChannelThreshold int
QueuePressureHighWatermark int
QueuePressureMaxInFlight int
ClassWindows map[string]int
}
const (
FabricTrafficClassControl = "control"
FabricTrafficClassInteractive = "interactive"
FabricTrafficClassReliable = "reliable"
FabricTrafficClassBulk = "bulk"
FabricTrafficClassDroppable = "droppable"
)
type fabricFlowQueue struct {
TrafficClass string
Depth int
Enqueued uint64
Dequeued uint64
Dropped uint64
HighWatermark int
Served uint64
InFlight int
MaxInFlight int
SendAttempts uint64
SendSuccesses uint64
SendFailures uint64
LastServedAt time.Time
LastRouteID string
RoutePolicyVersion string
RouteGeneration string
RecoveryPolicyFingerprint string
LastNextHop string
LastFailedRouteID string
LastFailedRoutePolicyVersion string
LastFailedRouteGeneration string
LastRouteFailureAt time.Time
LastRecoveredFromRouteID string
LastRecoveredNextHop string
LastRouteSwitchAt time.Time
LastRouteRecoveryMillis int64
RouteSwitchCount uint64
LastError string
ConsecutiveFailures uint64
StallCount uint64
LastSendDurationMillis int64
RouteRebuildRecommended bool
DegradedFallbackRecommended bool
QualityPreferenceRouteID string
QualityPreferenceScore int
QualityPreferenceRawScore int
QualityPreferenceReasons []string
LatencyLe10Millis uint64
LatencyLe100Millis uint64
LatencyLe1000Millis uint64
LatencyGt1000Millis uint64
QualityWindow []fabricFlowQualitySample
}
type fabricFlowQualitySample struct {
At time.Time
Success bool
Failure bool
Dropped bool
Slow bool
DurationMillis int64
}
type fabricFlowQualityWindowStats struct {
SampleCount int
SuccessCount int
FailureCount int
SlowCount int
DropCount int
AvgLatencyMs int64
LastUpdatedAt time.Time
}
type FabricScheduledPacketBatch struct {
ChannelID string
FlowID string
Shard int
TrafficClass string
Packets [][]byte
QueueDepth int
Dropped uint64
Classifier string
ServiceMode string
}
type FabricFlowSchedulerSnapshot struct {
SchemaVersion string `json:"schema_version"`
Enabled bool `json:"enabled"`
ServiceNeutral bool `json:"service_neutral"`
Classifier string `json:"classifier"`
ServiceMode string `json:"service_mode"`
ShardCount int `json:"shard_count"`
QueueCapacity int `json:"queue_capacity"`
ChannelCount int `json:"channel_count"`
Enqueued uint64 `json:"enqueued"`
Dequeued uint64 `json:"dequeued"`
Dropped uint64 `json:"dropped"`
HighWatermark int `json:"high_watermark"`
BackpressureActive bool `json:"backpressure_active"`
InFlight int `json:"in_flight"`
MaxInFlight int `json:"max_in_flight"`
AdaptiveBackpressureActive bool `json:"adaptive_backpressure_active,omitempty"`
AdaptiveBackpressureReason string `json:"adaptive_backpressure_reason,omitempty"`
RecommendedParallelWindows map[string]int `json:"recommended_parallel_windows,omitempty"`
AdaptivePolicyFingerprint string `json:"adaptive_policy_fingerprint,omitempty"`
BulkPressureActive bool `json:"bulk_pressure_active,omitempty"`
BulkPressureChannelCount int `json:"bulk_pressure_channel_count,omitempty"`
InteractiveOrControlCount int `json:"interactive_or_control_channel_count,omitempty"`
RouteRecoveredChannelCount int `json:"route_recovered_channel_count,omitempty"`
RouteSwitchCount uint64 `json:"route_switch_count,omitempty"`
RouteRecoveryMaxMillis int64 `json:"route_recovery_max_ms,omitempty"`
RouteRecoveryAvgMillis int64 `json:"route_recovery_avg_ms,omitempty"`
SlowChannelCount int `json:"slow_channel_count"`
FailingChannelCount int `json:"failing_channel_count"`
QualityWindowSampleCount int `json:"quality_window_sample_count"`
QualityWindowFailureCount int `json:"quality_window_failure_count"`
QualityWindowSlowCount int `json:"quality_window_slow_count"`
QualityWindowDropCount int `json:"quality_window_drop_count"`
QueueDepths map[string]int `json:"queue_depths"`
TrafficClassCounts map[string]int `json:"traffic_class_counts,omitempty"`
ChannelStats map[string]FabricFlowStat `json:"channel_stats"`
}
type FabricFlowStat struct {
TrafficClass string `json:"traffic_class,omitempty"`
Depth int `json:"depth"`
Enqueued uint64 `json:"enqueued"`
Dequeued uint64 `json:"dequeued"`
Dropped uint64 `json:"dropped"`
HighWatermark int `json:"high_watermark"`
Served uint64 `json:"served"`
InFlight int `json:"in_flight"`
MaxInFlight int `json:"max_in_flight"`
SendAttempts uint64 `json:"send_attempts"`
SendSuccesses uint64 `json:"send_successes"`
SendFailures uint64 `json:"send_failures"`
LastServedAt string `json:"last_served_at,omitempty"`
LastRouteID string `json:"last_route_id,omitempty"`
LastNextHop string `json:"last_next_hop,omitempty"`
RoutePolicyVersion string `json:"route_policy_version,omitempty"`
RouteGeneration string `json:"route_generation,omitempty"`
RecoveryPolicyFingerprint string `json:"recovery_policy_fingerprint,omitempty"`
LastFailedRouteID string `json:"last_failed_route_id,omitempty"`
LastFailedRoutePolicyVersion string `json:"last_failed_route_policy_version,omitempty"`
LastFailedRouteGeneration string `json:"last_failed_route_generation,omitempty"`
LastRouteFailureAt string `json:"last_route_failure_at,omitempty"`
LastRecoveredFromRouteID string `json:"last_recovered_from_route_id,omitempty"`
LastRecoveredNextHop string `json:"last_recovered_next_hop,omitempty"`
LastRouteSwitchAt string `json:"last_route_switch_at,omitempty"`
LastRouteRecoveryMillis int64 `json:"last_route_recovery_ms,omitempty"`
RouteSwitchCount uint64 `json:"route_switch_count,omitempty"`
LastError string `json:"last_error,omitempty"`
ConsecutiveFailures uint64 `json:"consecutive_failures"`
StallCount uint64 `json:"stall_count"`
LastSendDurationMillis int64 `json:"last_send_duration_ms,omitempty"`
RouteRebuildRecommended bool `json:"route_rebuild_recommended"`
DegradedFallbackRecommended bool `json:"degraded_fallback_recommended"`
QualityPreferenceRouteID string `json:"quality_preference_route_id,omitempty"`
QualityPreferenceScore int `json:"quality_preference_score,omitempty"`
QualityPreferenceRawScore int `json:"quality_preference_raw_score,omitempty"`
QualityPreferenceReasons []string `json:"quality_preference_reasons,omitempty"`
LatencyLe10Millis uint64 `json:"latency_le_10ms"`
LatencyLe100Millis uint64 `json:"latency_le_100ms"`
LatencyLe1000Millis uint64 `json:"latency_le_1000ms"`
LatencyGt1000Millis uint64 `json:"latency_gt_1000ms"`
QualityWindowSampleCount int `json:"quality_window_sample_count"`
QualityWindowSuccessCount int `json:"quality_window_success_count"`
QualityWindowFailureCount int `json:"quality_window_failure_count"`
QualityWindowSlowCount int `json:"quality_window_slow_count"`
QualityWindowDropCount int `json:"quality_window_drop_count"`
QualityWindowAvgLatencyMs int64 `json:"quality_window_avg_latency_ms,omitempty"`
QualityWindowLastUpdatedAt string `json:"quality_window_last_updated_at,omitempty"`
}
func NewFabricFlowScheduler(shardCount int, queueCapacity int) *FabricFlowScheduler {
if shardCount <= 0 {
shardCount = defaultFabricFlowShardCount
}
if queueCapacity <= 0 {
queueCapacity = defaultFabricFlowQueueCapacity
}
return &FabricFlowScheduler{
shardCount: shardCount,
queueCapacity: queueCapacity,
adaptivePolicy: defaultFabricServiceChannelAdaptivePolicy(),
queues: map[string]*fabricFlowQueue{},
}
}
func defaultFabricServiceChannelAdaptivePolicy() FabricServiceChannelAdaptivePolicy {
return normalizeFabricServiceChannelAdaptivePolicy(FabricServiceChannelAdaptivePolicy{
SchemaVersion: "rap.fabric_service_channel_adaptive_policy.v1",
MaxParallelWindow: defaultFabricFlowParallelSendWindow,
BulkPressureChannelThreshold: 16,
QueuePressureHighWatermark: 16,
QueuePressureMaxInFlight: defaultFabricFlowParallelSendWindow * 4,
ClassWindows: map[string]int{
FabricTrafficClassControl: defaultFabricFlowParallelSendWindow,
FabricTrafficClassInteractive: defaultFabricFlowParallelSendWindow,
FabricTrafficClassReliable: 6,
FabricTrafficClassBulk: 4,
FabricTrafficClassDroppable: 1,
},
})
}
func normalizeFabricServiceChannelAdaptivePolicy(policy FabricServiceChannelAdaptivePolicy) FabricServiceChannelAdaptivePolicy {
if policy.SchemaVersion == "" {
policy.SchemaVersion = "rap.fabric_service_channel_adaptive_policy.v1"
}
if policy.MaxParallelWindow <= 0 {
policy.MaxParallelWindow = defaultFabricFlowParallelSendWindow
}
if policy.BulkPressureChannelThreshold <= 0 {
policy.BulkPressureChannelThreshold = 16
}
if policy.QueuePressureHighWatermark <= 0 {
policy.QueuePressureHighWatermark = 16
}
if policy.QueuePressureMaxInFlight <= 0 {
policy.QueuePressureMaxInFlight = defaultFabricFlowParallelSendWindow * 4
}
if policy.ClassWindows == nil {
policy.ClassWindows = map[string]int{}
}
defaults := map[string]int{
FabricTrafficClassControl: policy.MaxParallelWindow,
FabricTrafficClassInteractive: policy.MaxParallelWindow,
FabricTrafficClassReliable: minPositive(policy.MaxParallelWindow, 6),
FabricTrafficClassBulk: minPositive(policy.MaxParallelWindow, 4),
FabricTrafficClassDroppable: 1,
}
next := map[string]int{}
for className, fallback := range defaults {
value := policy.ClassWindows[className]
if value <= 0 {
value = fallback
}
next[className] = clampFabricWindow(value, 1, policy.MaxParallelWindow)
}
policy.ClassWindows = next
return policy
}
func (s *FabricFlowScheduler) ConfigureAdaptivePolicy(policy FabricServiceChannelAdaptivePolicy) {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.adaptivePolicy = normalizeFabricServiceChannelAdaptivePolicy(policy)
}
func (s *FabricFlowScheduler) ScheduleClientPackets(packets [][]byte) []FabricScheduledPacketBatch {
scheduled, _ := s.scheduleClientPackets("", "", packets)
return scheduled
}
func (s *FabricFlowScheduler) ScheduleClientPacketsForConnection(vpnConnectionID string, packets [][]byte) []FabricScheduledPacketBatch {
scheduled, _ := s.scheduleClientPackets(vpnConnectionID, "", packets)
return scheduled
}
func (s *FabricFlowScheduler) ScheduleClientPacketsForConnectionClass(vpnConnectionID string, trafficClass string, packets [][]byte) []FabricScheduledPacketBatch {
scheduled, _ := s.scheduleClientPackets(vpnConnectionID, trafficClass, packets)
return scheduled
}
func (s *FabricFlowScheduler) scheduleClientPackets(vpnConnectionID string, trafficClass string, packets [][]byte) ([]FabricScheduledPacketBatch, uint64) {
packets = cleanPacketBatch(packets)
if len(packets) == 0 {
return nil, 0
}
if s == nil {
s = NewFabricFlowScheduler(0, 0)
}
trafficClass = normalizeFabricTrafficClass(trafficClass)
grouped := map[string]*FabricScheduledPacketBatch{}
var droppedCount uint64
for _, packet := range packets {
flowID, shard := classifyPacketFlow(packet, s.shardCountValue())
channelID := fabricFlowChannelIDForClass(vpnConnectionID, trafficClass, shard)
queueDepth, dropped := s.enqueue(channelID, trafficClass)
if dropped {
droppedCount++
continue
}
batch := grouped[channelID]
if batch == nil {
batch = &FabricScheduledPacketBatch{
ChannelID: channelID,
FlowID: flowID,
Shard: shard,
TrafficClass: trafficClass,
Classifier: "ip_5tuple_or_packet_hash",
ServiceMode: "application_protocol_agnostic",
}
grouped[channelID] = batch
}
batch.Packets = append(batch.Packets, append([]byte(nil), packet...))
batch.QueueDepth = queueDepth
}
out := make([]FabricScheduledPacketBatch, 0, len(grouped))
for _, batch := range grouped {
out = append(out, *batch)
}
s.sortScheduledBatches(out)
return out, droppedCount
}
func fabricFlowChannelID(vpnConnectionID string, shard int) string {
return fabricFlowChannelIDForClass(vpnConnectionID, "", shard)
}
func fabricFlowChannelIDForClass(vpnConnectionID string, trafficClass string, shard int) string {
base := fmt.Sprintf("flow-%02d", shard)
vpnConnectionID = strings.TrimSpace(vpnConnectionID)
if vpnConnectionID == "" {
return base
}
trafficClass = normalizeFabricTrafficClass(trafficClass)
if trafficClass != "" && trafficClass != FabricTrafficClassBulk {
return "vpn:" + vpnConnectionID + ":" + trafficClass + ":" + base
}
return "vpn:" + vpnConnectionID + ":" + base
}
func (s *FabricFlowScheduler) Complete(batch FabricScheduledPacketBatch) {
if s == nil || len(batch.Packets) == 0 {
return
}
s.dequeue(batch.ChannelID, len(batch.Packets))
}
func (s *FabricFlowScheduler) BeginSend(channelID string) {
if s == nil || channelID == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.ensureQueueLocked(channelID)
queue.InFlight++
queue.SendAttempts++
if queue.InFlight > queue.MaxInFlight {
queue.MaxInFlight = queue.InFlight
}
s.inFlight++
if s.inFlight > s.maxInFlight {
s.maxInFlight = s.inFlight
}
}
func (s *FabricFlowScheduler) EndSend(channelID string) {
if s == nil || channelID == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.queues[channelID]
if queue != nil && queue.InFlight > 0 {
queue.InFlight--
}
if s.inFlight > 0 {
s.inFlight--
}
}
func (s *FabricFlowScheduler) RecommendedParallelSendWindow(maxWindow int) int {
return s.RecommendedParallelSendWindowForTrafficClass("", maxWindow)
}
func (s *FabricFlowScheduler) RecommendedParallelSendWindowForTrafficClass(trafficClass string, maxWindow int) int {
if maxWindow <= 1 {
return 1
}
if s == nil {
return maxWindow
}
trafficClass = normalizeFabricTrafficClass(trafficClass)
s.mu.Lock()
defer s.mu.Unlock()
if maxWindow > s.adaptivePolicy.MaxParallelWindow && s.adaptivePolicy.MaxParallelWindow > 0 {
maxWindow = s.adaptivePolicy.MaxParallelWindow
}
global := s.parallelPressureLocked("")
classPressure := s.parallelPressureLocked(trafficClass)
if fabricTrafficClassPriority(trafficClass) <= fabricTrafficClassPriority(FabricTrafficClassInteractive) {
if classPressure.hasDrops {
return boundedParallelWindow(maxWindow - 1)
}
if classPressure.failing > 0 || classPressure.slow > 0 {
return boundedParallelWindow(maxWindow - 1)
}
return maxWindow
}
if trafficClass == FabricTrafficClassReliable {
if classPressure.hasDrops || classPressure.failing > 0 {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
}
if global.hasDrops || global.failing+global.slow > 0 || global.highPressure {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1))
}
return maxWindow
}
if classPressure.hasDrops {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
if global.hasDrops {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
if global.highPressure && global.interactiveOrControlQueues > 0 {
if trafficClass == FabricTrafficClassBulk || trafficClass == FabricTrafficClassDroppable {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
}
if global.highPressure {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
}
if classPressure.failing >= maxWindow || classPressure.slow >= maxWindow {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
if classPressure.failing+classPressure.slow > 0 {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1))
}
return maxWindow
}
type fabricParallelPressure struct {
hasDrops bool
failing int
slow int
highPressure bool
interactiveOrControlQueues int
bulkQueues int
}
func (s *FabricFlowScheduler) parallelPressureLocked(trafficClass string) fabricParallelPressure {
out := fabricParallelPressure{}
if s == nil {
return out
}
trafficClass = strings.TrimSpace(trafficClass)
failing := 0
slow := 0
for _, queue := range s.queues {
if queue == nil {
continue
}
queueClass := normalizeFabricTrafficClass(queue.TrafficClass)
if queueClass == FabricTrafficClassControl || queueClass == FabricTrafficClassInteractive {
out.interactiveOrControlQueues++
}
if queueClass == FabricTrafficClassBulk {
out.bulkQueues++
}
if trafficClass != "" && queueClass != trafficClass {
continue
}
stats := queue.qualityWindowStats()
if stats.DropCount > 0 {
out.hasDrops = true
}
if stats.FailureCount > stats.SuccessCount || (stats.FailureCount > 0 && queue.DegradedFallbackRecommended) {
failing++
}
if stats.SlowCount > 0 {
slow++
}
policy := s.adaptivePolicy
if policy.QueuePressureHighWatermark <= 0 {
policy = defaultFabricServiceChannelAdaptivePolicy()
}
if queue.HighWatermark >= policy.QueuePressureHighWatermark || queue.MaxInFlight >= policy.QueuePressureMaxInFlight {
out.highPressure = true
}
}
policy := s.adaptivePolicy
if policy.QueuePressureHighWatermark <= 0 {
policy = defaultFabricServiceChannelAdaptivePolicy()
}
if s.highWatermark >= policy.QueuePressureHighWatermark || s.maxInFlight >= policy.QueuePressureMaxInFlight {
out.highPressure = true
}
if out.bulkQueues >= policy.BulkPressureChannelThreshold && out.interactiveOrControlQueues > 0 {
out.highPressure = true
}
out.failing = failing
out.slow = slow
return out
}
func boundedParallelWindow(value int) int {
if value < 1 {
return 1
}
return value
}
func (s *FabricFlowScheduler) Snapshot() FabricFlowSchedulerSnapshot {
snapshot := FabricFlowSchedulerSnapshot{
SchemaVersion: "rap.fabric_flow_scheduler.v1",
Enabled: s != nil,
ServiceNeutral: true,
Classifier: "ip_5tuple_or_packet_hash",
ServiceMode: "application_protocol_agnostic",
QueueDepths: map[string]int{},
TrafficClassCounts: map[string]int{},
RecommendedParallelWindows: map[string]int{},
ChannelStats: map[string]FabricFlowStat{},
}
if s == nil {
snapshot.ShardCount = defaultFabricFlowShardCount
snapshot.QueueCapacity = defaultFabricFlowQueueCapacity
return snapshot
}
s.mu.Lock()
defer s.mu.Unlock()
snapshot.ShardCount = s.shardCount
snapshot.QueueCapacity = s.queueCapacity
snapshot.AdaptivePolicyFingerprint = s.adaptivePolicy.Fingerprint
snapshot.ChannelCount = len(s.queues)
snapshot.Enqueued = s.enqueued
snapshot.Dequeued = s.dequeued
snapshot.Dropped = s.dropped
snapshot.HighWatermark = s.highWatermark
snapshot.InFlight = s.inFlight
snapshot.MaxInFlight = s.maxInFlight
var routeRecoveryTotalMillis int64
var routeRecoverySamples int64
for channelID, queue := range s.queues {
qualityStats := queue.qualityWindowStats()
snapshot.QueueDepths[channelID] = queue.Depth
trafficClass := normalizeFabricTrafficClass(queue.TrafficClass)
snapshot.TrafficClassCounts[trafficClass]++
stat := FabricFlowStat{
Depth: queue.Depth,
TrafficClass: trafficClass,
Enqueued: queue.Enqueued,
Dequeued: queue.Dequeued,
Dropped: queue.Dropped,
HighWatermark: queue.HighWatermark,
Served: queue.Served,
InFlight: queue.InFlight,
MaxInFlight: queue.MaxInFlight,
SendAttempts: queue.SendAttempts,
SendSuccesses: queue.SendSuccesses,
SendFailures: queue.SendFailures,
LastRouteID: queue.LastRouteID,
RoutePolicyVersion: queue.RoutePolicyVersion,
RouteGeneration: queue.RouteGeneration,
RecoveryPolicyFingerprint: queue.RecoveryPolicyFingerprint,
LastNextHop: queue.LastNextHop,
LastFailedRouteID: queue.LastFailedRouteID,
LastFailedRoutePolicyVersion: queue.LastFailedRoutePolicyVersion,
LastFailedRouteGeneration: queue.LastFailedRouteGeneration,
LastRecoveredFromRouteID: queue.LastRecoveredFromRouteID,
LastRecoveredNextHop: queue.LastRecoveredNextHop,
LastRouteRecoveryMillis: queue.LastRouteRecoveryMillis,
RouteSwitchCount: queue.RouteSwitchCount,
LastError: queue.LastError,
ConsecutiveFailures: queue.ConsecutiveFailures,
StallCount: queue.StallCount,
LastSendDurationMillis: queue.LastSendDurationMillis,
RouteRebuildRecommended: queue.RouteRebuildRecommended,
DegradedFallbackRecommended: queue.DegradedFallbackRecommended,
QualityPreferenceRouteID: queue.QualityPreferenceRouteID,
QualityPreferenceScore: queue.QualityPreferenceScore,
QualityPreferenceRawScore: queue.QualityPreferenceRawScore,
QualityPreferenceReasons: append([]string{}, queue.QualityPreferenceReasons...),
LatencyLe10Millis: queue.LatencyLe10Millis,
LatencyLe100Millis: queue.LatencyLe100Millis,
LatencyLe1000Millis: queue.LatencyLe1000Millis,
LatencyGt1000Millis: queue.LatencyGt1000Millis,
QualityWindowSampleCount: qualityStats.SampleCount,
QualityWindowSuccessCount: qualityStats.SuccessCount,
QualityWindowFailureCount: qualityStats.FailureCount,
QualityWindowSlowCount: qualityStats.SlowCount,
QualityWindowDropCount: qualityStats.DropCount,
QualityWindowAvgLatencyMs: qualityStats.AvgLatencyMs,
}
if !queue.LastServedAt.IsZero() {
stat.LastServedAt = queue.LastServedAt.UTC().Format(time.RFC3339Nano)
}
if !qualityStats.LastUpdatedAt.IsZero() {
stat.QualityWindowLastUpdatedAt = qualityStats.LastUpdatedAt.UTC().Format(time.RFC3339Nano)
}
if !queue.LastRouteFailureAt.IsZero() {
stat.LastRouteFailureAt = queue.LastRouteFailureAt.UTC().Format(time.RFC3339Nano)
}
if !queue.LastRouteSwitchAt.IsZero() {
stat.LastRouteSwitchAt = queue.LastRouteSwitchAt.UTC().Format(time.RFC3339Nano)
}
snapshot.ChannelStats[channelID] = FabricFlowStat{
Depth: stat.Depth,
TrafficClass: stat.TrafficClass,
Enqueued: stat.Enqueued,
Dequeued: stat.Dequeued,
Dropped: stat.Dropped,
HighWatermark: stat.HighWatermark,
Served: stat.Served,
InFlight: stat.InFlight,
MaxInFlight: stat.MaxInFlight,
SendAttempts: stat.SendAttempts,
SendSuccesses: stat.SendSuccesses,
SendFailures: stat.SendFailures,
LastServedAt: stat.LastServedAt,
LastRouteID: stat.LastRouteID,
LastNextHop: stat.LastNextHop,
RoutePolicyVersion: stat.RoutePolicyVersion,
RouteGeneration: stat.RouteGeneration,
RecoveryPolicyFingerprint: stat.RecoveryPolicyFingerprint,
LastFailedRouteID: stat.LastFailedRouteID,
LastFailedRoutePolicyVersion: stat.LastFailedRoutePolicyVersion,
LastFailedRouteGeneration: stat.LastFailedRouteGeneration,
LastRouteFailureAt: stat.LastRouteFailureAt,
LastRecoveredFromRouteID: stat.LastRecoveredFromRouteID,
LastRecoveredNextHop: stat.LastRecoveredNextHop,
LastRouteSwitchAt: stat.LastRouteSwitchAt,
LastRouteRecoveryMillis: stat.LastRouteRecoveryMillis,
RouteSwitchCount: stat.RouteSwitchCount,
LastError: stat.LastError,
ConsecutiveFailures: stat.ConsecutiveFailures,
StallCount: stat.StallCount,
LastSendDurationMillis: stat.LastSendDurationMillis,
RouteRebuildRecommended: stat.RouteRebuildRecommended,
DegradedFallbackRecommended: stat.DegradedFallbackRecommended,
QualityPreferenceRouteID: stat.QualityPreferenceRouteID,
QualityPreferenceScore: stat.QualityPreferenceScore,
QualityPreferenceRawScore: stat.QualityPreferenceRawScore,
QualityPreferenceReasons: append([]string{}, stat.QualityPreferenceReasons...),
LatencyLe10Millis: stat.LatencyLe10Millis,
LatencyLe100Millis: stat.LatencyLe100Millis,
LatencyLe1000Millis: stat.LatencyLe1000Millis,
LatencyGt1000Millis: stat.LatencyGt1000Millis,
QualityWindowSampleCount: stat.QualityWindowSampleCount,
QualityWindowSuccessCount: stat.QualityWindowSuccessCount,
QualityWindowFailureCount: stat.QualityWindowFailureCount,
QualityWindowSlowCount: stat.QualityWindowSlowCount,
QualityWindowDropCount: stat.QualityWindowDropCount,
QualityWindowAvgLatencyMs: stat.QualityWindowAvgLatencyMs,
QualityWindowLastUpdatedAt: stat.QualityWindowLastUpdatedAt,
}
snapshot.QualityWindowSampleCount += qualityStats.SampleCount
snapshot.QualityWindowFailureCount += qualityStats.FailureCount
snapshot.QualityWindowSlowCount += qualityStats.SlowCount
snapshot.QualityWindowDropCount += qualityStats.DropCount
snapshot.RouteSwitchCount += queue.RouteSwitchCount
if queue.LastRecoveredFromRouteID != "" {
snapshot.RouteRecoveredChannelCount++
if queue.LastRouteRecoveryMillis > snapshot.RouteRecoveryMaxMillis {
snapshot.RouteRecoveryMaxMillis = queue.LastRouteRecoveryMillis
}
routeRecoveryTotalMillis += queue.LastRouteRecoveryMillis
routeRecoverySamples++
}
if queue.Depth >= s.queueCapacity || qualityStats.DropCount > 0 {
snapshot.BackpressureActive = true
}
if (queue.RouteRebuildRecommended || queue.DegradedFallbackRecommended) && qualityStats.FailureCount > 0 {
snapshot.BackpressureActive = true
}
if qualityStats.SlowCount > 0 {
snapshot.SlowChannelCount++
}
if qualityStats.FailureCount > qualityStats.SuccessCount || (qualityStats.FailureCount > 0 && queue.DegradedFallbackRecommended) {
snapshot.FailingChannelCount++
}
}
if snapshot.QualityWindowDropCount > 0 {
snapshot.BackpressureActive = true
}
if routeRecoverySamples > 0 {
snapshot.RouteRecoveryAvgMillis = routeRecoveryTotalMillis / routeRecoverySamples
}
snapshot.BulkPressureChannelCount = snapshot.TrafficClassCounts[FabricTrafficClassBulk]
snapshot.InteractiveOrControlCount = snapshot.TrafficClassCounts[FabricTrafficClassControl] + snapshot.TrafficClassCounts[FabricTrafficClassInteractive]
bulkPressureThreshold := s.adaptivePolicy.BulkPressureChannelThreshold
if bulkPressureThreshold <= 0 {
bulkPressureThreshold = defaultFabricServiceChannelAdaptivePolicy().BulkPressureChannelThreshold
}
if snapshot.BulkPressureChannelCount >= bulkPressureThreshold && snapshot.InteractiveOrControlCount > 0 {
snapshot.BulkPressureActive = true
snapshot.BackpressureActive = true
}
for _, trafficClass := range []string{FabricTrafficClassControl, FabricTrafficClassInteractive, FabricTrafficClassReliable, FabricTrafficClassBulk, FabricTrafficClassDroppable} {
snapshot.RecommendedParallelWindows[trafficClass] = s.recommendedParallelSendWindowForTrafficClassLocked(trafficClass, s.adaptivePolicy.MaxParallelWindow)
}
if len(snapshot.RecommendedParallelWindows) > 0 {
bulkWindow := snapshot.RecommendedParallelWindows[FabricTrafficClassBulk]
interactiveWindow := snapshot.RecommendedParallelWindows[FabricTrafficClassInteractive]
if bulkWindow > 0 && interactiveWindow > 0 && bulkWindow < interactiveWindow {
snapshot.AdaptiveBackpressureActive = true
snapshot.AdaptiveBackpressureReason = "bulk_window_reduced_to_protect_interactive"
}
}
return snapshot
}
func (s *FabricFlowScheduler) recommendedParallelSendWindowForTrafficClassLocked(trafficClass string, maxWindow int) int {
if maxWindow <= 1 {
return 1
}
trafficClass = normalizeFabricTrafficClass(trafficClass)
if s == nil {
return maxWindow
}
if maxWindow > s.adaptivePolicy.MaxParallelWindow && s.adaptivePolicy.MaxParallelWindow > 0 {
maxWindow = s.adaptivePolicy.MaxParallelWindow
}
// The public method cannot be called here because Snapshot already holds the
// scheduler mutex. Keep this wrapper intentionally small by mirroring the
// public policy on already-locked state.
globalPressure := s.parallelPressureLocked("")
classPressure := s.parallelPressureLocked(trafficClass)
if fabricTrafficClassPriority(trafficClass) <= fabricTrafficClassPriority(FabricTrafficClassInteractive) {
if classPressure.hasDrops || classPressure.failing > 0 || classPressure.slow > 0 {
return boundedParallelWindow(maxWindow - 1)
}
return maxWindow
}
if trafficClass == FabricTrafficClassReliable {
if classPressure.hasDrops || classPressure.failing > 0 {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
}
if globalPressure.hasDrops || globalPressure.failing+globalPressure.slow > 0 || globalPressure.highPressure {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1))
}
return maxWindow
}
if classPressure.hasDrops || globalPressure.hasDrops {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
if globalPressure.highPressure && globalPressure.interactiveOrControlQueues > 0 {
if trafficClass == FabricTrafficClassBulk || trafficClass == FabricTrafficClassDroppable {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
}
if globalPressure.highPressure {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
}
if classPressure.failing >= maxWindow || classPressure.slow >= maxWindow {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
if classPressure.failing+classPressure.slow > 0 {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow-1))
}
return maxWindow
}
func classWindowLimit(policy FabricServiceChannelAdaptivePolicy, trafficClass string, maxWindow int) int {
if policy.ClassWindows != nil {
if value := policy.ClassWindows[normalizeFabricTrafficClass(trafficClass)]; value > 0 && value < maxWindow {
return value
}
}
return maxWindow
}
func clampFabricWindow(value, minValue, maxValue int) int {
if value < minValue {
return minValue
}
if value > maxValue {
return maxValue
}
return value
}
func minPositive(a, b int) int {
if a <= 0 {
return b
}
if b <= 0 || a < b {
return a
}
return b
}
func (s *FabricFlowScheduler) Dropped() uint64 {
if s == nil {
return 0
}
s.mu.Lock()
defer s.mu.Unlock()
return s.dropped
}
func (s *FabricFlowScheduler) shardCountValue() int {
if s == nil || s.shardCount <= 0 {
return defaultFabricFlowShardCount
}
return s.shardCount
}
func (s *FabricFlowScheduler) enqueue(channelID string, trafficClass string) (int, bool) {
if s == nil {
return 0, false
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.ensureQueueLocked(channelID)
if queue.TrafficClass == "" {
queue.TrafficClass = normalizeFabricTrafficClass(trafficClass)
}
if queue.Depth >= s.queueCapacity {
queue.Dropped++
s.dropped++
queue.recordQualitySample(fabricFlowQualitySample{
At: time.Now().UTC(),
Dropped: true,
})
return queue.Depth, true
}
queue.Depth++
queue.Enqueued++
s.enqueued++
if queue.Depth > queue.HighWatermark {
queue.HighWatermark = queue.Depth
}
if queue.Depth > s.highWatermark {
s.highWatermark = queue.Depth
}
return queue.Depth, false
}
func (s *FabricFlowScheduler) dequeue(channelID string, count int) {
if s == nil || count <= 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.queues[channelID]
if queue == nil {
return
}
if count > queue.Depth {
count = queue.Depth
}
queue.Depth -= count
queue.Dequeued += uint64(count)
queue.Served++
queue.LastServedAt = time.Now().UTC()
s.dequeued += uint64(count)
}
func (s *FabricFlowScheduler) sortScheduledBatches(batches []FabricScheduledPacketBatch) {
if len(batches) < 2 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
sort.Slice(batches, func(a, b int) bool {
leftPriority := fabricTrafficClassPriority(batches[a].TrafficClass)
rightPriority := fabricTrafficClassPriority(batches[b].TrafficClass)
if leftPriority != rightPriority {
return leftPriority < rightPriority
}
left := s.queues[batches[a].ChannelID]
right := s.queues[batches[b].ChannelID]
leftStalled := left != nil && (left.RouteRebuildRecommended || left.DegradedFallbackRecommended)
rightStalled := right != nil && (right.RouteRebuildRecommended || right.DegradedFallbackRecommended)
if leftStalled != rightStalled {
return !leftStalled
}
leftServed := uint64(0)
rightServed := uint64(0)
if left != nil {
leftServed = left.Served
}
if right != nil {
rightServed = right.Served
}
if leftServed != rightServed {
return leftServed < rightServed
}
leftServedAt := time.Time{}
rightServedAt := time.Time{}
if left != nil {
leftServedAt = left.LastServedAt
}
if right != nil {
rightServedAt = right.LastServedAt
}
if !leftServedAt.Equal(rightServedAt) {
if leftServedAt.IsZero() {
return true
}
if rightServedAt.IsZero() {
return false
}
return leftServedAt.Before(rightServedAt)
}
return batches[a].ChannelID < batches[b].ChannelID
})
}
func normalizeFabricTrafficClass(value string) string {
switch strings.TrimSpace(strings.ToLower(value)) {
case FabricTrafficClassControl:
return FabricTrafficClassControl
case FabricTrafficClassInteractive:
return FabricTrafficClassInteractive
case FabricTrafficClassReliable:
return FabricTrafficClassReliable
case FabricTrafficClassDroppable:
return FabricTrafficClassDroppable
case FabricTrafficClassBulk:
return FabricTrafficClassBulk
default:
return FabricTrafficClassBulk
}
}
func fabricTrafficClassPriority(value string) int {
switch normalizeFabricTrafficClass(value) {
case FabricTrafficClassControl:
return 0
case FabricTrafficClassInteractive:
return 1
case FabricTrafficClassReliable:
return 2
case FabricTrafficClassBulk:
return 3
case FabricTrafficClassDroppable:
return 4
default:
return 3
}
}
func (s *FabricFlowScheduler) RoutePreference(channelID string) (preferredRouteID string, avoidRouteID string) {
if s == nil || channelID == "" {
return "", ""
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.queues[channelID]
if queue == nil {
return "", ""
}
if queue.ConsecutiveFailures == 0 {
preferredRouteID = queue.LastRouteID
}
return preferredRouteID, queue.LastFailedRouteID
}
func (s *FabricFlowScheduler) RecordRouteSuccess(channelID string, routeID string, nextHop string, duration time.Duration, preferences ...FabricServiceChannelRouteQualityPreference) {
s.RecordRouteSuccessWithProvenance(channelID, routeID, nextHop, duration, fabricFlowRouteProvenance{}, preferences...)
}
func (s *FabricFlowScheduler) RecordRouteSuccessWithProvenance(channelID string, routeID string, nextHop string, duration time.Duration, provenance fabricFlowRouteProvenance, preferences ...FabricServiceChannelRouteQualityPreference) {
if s == nil || channelID == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.ensureQueueLocked(channelID)
failedRouteID := strings.TrimSpace(queue.LastFailedRouteID)
failedNextHop := strings.TrimSpace(queue.LastNextHop)
if failedRouteID != "" && strings.TrimSpace(routeID) != "" && failedRouteID != strings.TrimSpace(routeID) {
switchedAt := time.Now().UTC()
queue.LastRecoveredFromRouteID = failedRouteID
queue.LastRecoveredNextHop = failedNextHop
queue.LastRouteSwitchAt = switchedAt
queue.LastRouteRecoveryMillis = 0
if !queue.LastRouteFailureAt.IsZero() {
queue.LastRouteRecoveryMillis = switchedAt.Sub(queue.LastRouteFailureAt).Milliseconds()
if queue.LastRouteRecoveryMillis < 0 {
queue.LastRouteRecoveryMillis = 0
}
}
queue.RouteSwitchCount++
}
queue.LastRouteID = routeID
queue.RoutePolicyVersion = strings.TrimSpace(provenance.PolicyVersion)
queue.RouteGeneration = strings.TrimSpace(provenance.Generation)
queue.RecoveryPolicyFingerprint = strings.TrimSpace(provenance.RecoveryPolicyFingerprint)
queue.LastNextHop = nextHop
queue.LastFailedRouteID = ""
queue.LastFailedRoutePolicyVersion = ""
queue.LastFailedRouteGeneration = ""
queue.LastError = ""
queue.ConsecutiveFailures = 0
queue.LastSendDurationMillis = fabricSendDurationMillis(duration)
queue.SendSuccesses++
queue.recordLatency(duration)
queue.recordQualitySample(fabricFlowQualitySample{
At: time.Now().UTC(),
Success: true,
Slow: duration > defaultFabricFlowSlowSendThreshold,
DurationMillis: queue.LastSendDurationMillis,
})
queue.recordQualityPreference(preferences...)
if duration > defaultFabricFlowSlowSendThreshold {
queue.StallCount++
queue.RouteRebuildRecommended = true
} else {
queue.RouteRebuildRecommended = false
queue.DegradedFallbackRecommended = false
}
}
func (s *FabricFlowScheduler) RecordRouteFailure(channelID string, routeID string, nextHop string, err error, duration time.Duration) {
s.RecordRouteFailureWithProvenance(channelID, routeID, nextHop, err, duration, fabricFlowRouteProvenance{})
}
func (s *FabricFlowScheduler) RecordRouteFailureWithProvenance(channelID string, routeID string, nextHop string, err error, duration time.Duration, provenance fabricFlowRouteProvenance) {
if s == nil || channelID == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.ensureQueueLocked(channelID)
queue.LastFailedRouteID = routeID
queue.LastFailedRoutePolicyVersion = strings.TrimSpace(provenance.PolicyVersion)
queue.LastFailedRouteGeneration = strings.TrimSpace(provenance.Generation)
queue.LastRouteFailureAt = time.Now().UTC()
if fp := strings.TrimSpace(provenance.RecoveryPolicyFingerprint); fp != "" {
queue.RecoveryPolicyFingerprint = fp
}
queue.LastNextHop = nextHop
queue.ConsecutiveFailures++
queue.StallCount++
queue.LastSendDurationMillis = fabricSendDurationMillis(duration)
queue.SendFailures++
queue.recordLatency(duration)
queue.recordQualitySample(fabricFlowQualitySample{
At: time.Now().UTC(),
Failure: true,
Slow: duration > defaultFabricFlowSlowSendThreshold,
DurationMillis: queue.LastSendDurationMillis,
})
if err != nil {
queue.LastError = err.Error()
}
queue.RouteRebuildRecommended = true
if queue.ConsecutiveFailures >= defaultFabricFlowFailureThreshold {
queue.DegradedFallbackRecommended = true
}
queue.clearQualityPreference()
}
func (s *FabricFlowScheduler) RecordLocalFallback(channelID string) {
if s == nil || channelID == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
queue := s.ensureQueueLocked(channelID)
queue.LastRouteID = "local_gateway"
queue.RoutePolicyVersion = ""
queue.RouteGeneration = ""
queue.RecoveryPolicyFingerprint = ""
queue.LastNextHop = "local_gateway"
queue.LastFailedRouteID = ""
queue.LastFailedRoutePolicyVersion = ""
queue.LastFailedRouteGeneration = ""
queue.LastError = ""
queue.ConsecutiveFailures = 0
queue.RouteRebuildRecommended = false
queue.DegradedFallbackRecommended = false
queue.clearQualityPreference()
}
func (s *FabricFlowScheduler) ClearQualityPreferencesNotIn(validRouteIDs map[string]struct{}) {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, queue := range s.queues {
if queue == nil || queue.QualityPreferenceRouteID == "" {
continue
}
if _, ok := validRouteIDs[queue.QualityPreferenceRouteID]; !ok {
queue.clearQualityPreference()
}
}
}
func (s *FabricFlowScheduler) ClearQualityPreferencesForRoutes(routeIDs map[string]struct{}) {
if s == nil || len(routeIDs) == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for _, queue := range s.queues {
if queue == nil || queue.QualityPreferenceRouteID == "" {
continue
}
if _, ok := routeIDs[queue.QualityPreferenceRouteID]; ok {
queue.clearQualityPreference()
}
}
}
func (q *fabricFlowQueue) recordLatency(duration time.Duration) {
if q == nil {
return
}
millis := fabricSendDurationMillis(duration)
switch {
case millis <= 10:
q.LatencyLe10Millis++
case millis <= 100:
q.LatencyLe100Millis++
case millis <= 1000:
q.LatencyLe1000Millis++
default:
q.LatencyGt1000Millis++
}
}
func (q *fabricFlowQueue) recordQualitySample(sample fabricFlowQualitySample) {
if q == nil {
return
}
if sample.At.IsZero() {
sample.At = time.Now().UTC()
}
q.QualityWindow = append(q.QualityWindow, sample)
if len(q.QualityWindow) > defaultFabricFlowQualityWindowCapacity {
keepFrom := len(q.QualityWindow) - defaultFabricFlowQualityWindowCapacity
copy(q.QualityWindow, q.QualityWindow[keepFrom:])
q.QualityWindow = q.QualityWindow[:defaultFabricFlowQualityWindowCapacity]
}
}
func (q *fabricFlowQueue) qualityWindowStats() fabricFlowQualityWindowStats {
stats := fabricFlowQualityWindowStats{}
if q == nil || len(q.QualityWindow) == 0 {
return stats
}
var latencyTotal int64
var latencySamples int64
for _, sample := range q.QualityWindow {
stats.SampleCount++
if sample.Success {
stats.SuccessCount++
}
if sample.Failure {
stats.FailureCount++
}
if sample.Slow {
stats.SlowCount++
}
if sample.Dropped {
stats.DropCount++
}
if sample.DurationMillis > 0 {
latencyTotal += sample.DurationMillis
latencySamples++
}
if sample.At.After(stats.LastUpdatedAt) {
stats.LastUpdatedAt = sample.At
}
}
if latencySamples > 0 {
stats.AvgLatencyMs = latencyTotal / latencySamples
}
return stats
}
func (q *fabricFlowQueue) recordQualityPreference(preferences ...FabricServiceChannelRouteQualityPreference) {
if q == nil {
return
}
if len(preferences) == 0 || strings.TrimSpace(preferences[0].RouteID) == "" || preferences[0].ScoreAdjustment <= 0 {
q.clearQualityPreference()
return
}
preference := preferences[0]
q.QualityPreferenceRouteID = preference.RouteID
q.QualityPreferenceScore = preference.ScoreAdjustment
q.QualityPreferenceRawScore = preference.RawScoreAdjustment
if q.QualityPreferenceRawScore <= 0 {
q.QualityPreferenceRawScore = preference.ScoreAdjustment
}
q.QualityPreferenceReasons = dedupeStrings(preference.Reasons)
}
func (q *fabricFlowQueue) clearQualityPreference() {
if q == nil {
return
}
q.QualityPreferenceRouteID = ""
q.QualityPreferenceScore = 0
q.QualityPreferenceRawScore = 0
q.QualityPreferenceReasons = nil
}
func fabricSendDurationMillis(duration time.Duration) int64 {
if duration <= 0 {
return 0
}
millis := duration.Milliseconds()
if millis == 0 {
return 1
}
return millis
}
func (s *FabricFlowScheduler) ensureQueueLocked(channelID string) *fabricFlowQueue {
if s.queues == nil {
s.queues = map[string]*fabricFlowQueue{}
}
queue := s.queues[channelID]
if queue == nil {
queue = &fabricFlowQueue{}
s.queues[channelID] = queue
}
return queue
}
type LocalPacketTransport struct {
Inbox *FabricPacketInbox
VPNConnectionID string
}
type AdaptivePacketTransport struct {
Primary PacketTransport
Fallback PacketTransport
PrimaryTimeout time.Duration
lastReceive atomic.Int32
}
const (
adaptiveTransportPrimary int32 = iota
adaptiveTransportFallback
)
func (t *LocalPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error {
packets = cleanPacketBatch(packets)
if len(packets) == 0 {
return nil
}
if t == nil || t.Inbox == nil {
return mesh.ErrForwardRuntimeUnavailable
}
return t.Inbox.DeliverLocalPacketBatch(t.VPNConnectionID, FabricDirectionGatewayToClient, packets)
}
func (t *LocalPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) {
if t == nil || t.Inbox == nil {
return nil, mesh.ErrForwardRuntimeUnavailable
}
return t.Inbox.Receive(ctx, t.VPNConnectionID, FabricDirectionClientToGateway, timeout)
}
func (t *AdaptivePacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error {
packets = cleanPacketBatch(packets)
if len(packets) == 0 {
return nil
}
if t == nil || (t.Primary == nil && t.Fallback == nil) {
return mesh.ErrForwardRuntimeUnavailable
}
preferred, alternate := t.preferredSendOrder()
if preferred != nil {
if err := preferred.SendGatewayPacketBatch(ctx, packets); err == nil {
return nil
} else if alternate == nil {
return err
}
}
if alternate != nil {
return alternate.SendGatewayPacketBatch(ctx, packets)
}
return mesh.ErrForwardRuntimeUnavailable
}
func (t *AdaptivePacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) {
if t == nil || (t.Primary == nil && t.Fallback == nil) {
return nil, mesh.ErrForwardRuntimeUnavailable
}
if t.Primary == nil {
t.lastReceive.Store(adaptiveTransportFallback)
return t.Fallback.ReceiveGatewayPacketBatch(ctx, timeout)
}
if t.Fallback == nil {
t.lastReceive.Store(adaptiveTransportPrimary)
return t.Primary.ReceiveGatewayPacketBatch(ctx, timeout)
}
primaryTimeout := t.PrimaryTimeout
if primaryTimeout <= 0 {
primaryTimeout = 50 * time.Millisecond
}
if timeout > 0 && primaryTimeout > timeout {
primaryTimeout = timeout
}
packets, err := t.Primary.ReceiveGatewayPacketBatch(ctx, primaryTimeout)
if err == nil && len(packets) > 0 {
t.lastReceive.Store(adaptiveTransportPrimary)
return packets, nil
}
if err != nil && timeout <= 0 {
return nil, err
}
fallbackTimeout := timeout
if timeout > primaryTimeout {
fallbackTimeout = timeout - primaryTimeout
}
packets, fallbackErr := t.Fallback.ReceiveGatewayPacketBatch(ctx, fallbackTimeout)
if fallbackErr == nil && len(packets) > 0 {
t.lastReceive.Store(adaptiveTransportFallback)
}
if fallbackErr != nil {
return nil, fallbackErr
}
return packets, nil
}
func (t *AdaptivePacketTransport) preferredSendOrder() (PacketTransport, PacketTransport) {
if t == nil {
return nil, nil
}
if t.lastReceive.Load() == adaptiveTransportFallback {
return t.Fallback, t.Primary
}
return t.Primary, t.Fallback
}
func (t *FabricPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error {
packets = cleanPacketBatch(packets)
if len(packets) == 0 {
return nil
}
if t == nil || t.ForwardTransport == nil {
return mesh.ErrForwardRuntimeUnavailable
}
if t.ClusterID == "" || t.VPNConnectionID == "" || t.RouteID == "" || t.LocalNodeID == "" || t.RemoteNodeID == "" {
return errors.New("fabric packet transport route identity is incomplete")
}
nextHop := t.NextHopNodeID
if nextHop == "" {
nextHop = t.RemoteNodeID
}
envelopeCurrentHop := nextHop
envelopeNextHop := nextHopAfter(t.RoutePath, envelopeCurrentHop, t.RemoteNodeID)
direction := t.SendDirection
if direction == "" {
direction = FabricDirectionGatewayToClient
}
envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{
RouteID: t.RouteID,
ClusterID: t.ClusterID,
SourceNodeID: t.LocalNodeID,
DestinationNodeID: t.RemoteNodeID,
CurrentHopNodeID: envelopeCurrentHop,
NextHopNodeID: envelopeNextHop,
RoutePath: t.RoutePath,
VPNConnectionID: t.VPNConnectionID,
Direction: direction,
Packets: packets,
})
if err != nil {
return err
}
_, err = t.ForwardTransport.SendProduction(ctx, nextHop, envelope)
return err
}
func (i *FabricClientPacketIngress) SendClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, packets [][]byte) error {
return i.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, "", packets)
}
func (i *FabricClientPacketIngress) SendClientPacketBatchWithTrafficClass(ctx context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte) error {
packets = cleanPacketBatch(packets)
if len(packets) == 0 {
return nil
}
if i == nil {
return mesh.ErrForwardRuntimeUnavailable
}
i.recordSendBatch(len(packets))
scheduler := i.flowScheduler()
scheduled, droppedCount := scheduler.scheduleClientPackets(vpnConnectionID, trafficClass, packets)
if droppedCount > 0 {
i.recordFlowDropped(droppedCount)
}
if len(scheduled) == 0 {
i.recordError(mesh.ErrSyntheticRelayQueueFull)
return mesh.ErrSyntheticRelayQueueFull
}
maxParallel := scheduler.RecommendedParallelSendWindowForTrafficClass(trafficClass, i.maxParallelFlowSends())
if maxParallel > 1 && len(scheduled) > 1 {
return i.sendScheduledClientPacketBatchesParallel(ctx, clusterID, vpnConnectionID, scheduled, maxParallel)
}
for _, batch := range scheduled {
i.recordFlowBatch(len(batch.Packets))
if err := i.sendScheduledClientPacketBatch(ctx, clusterID, vpnConnectionID, batch); err != nil {
return err
}
}
return nil
}
func (i *FabricClientPacketIngress) sendScheduledClientPacketBatchesParallel(ctx context.Context, clusterID string, vpnConnectionID string, scheduled []FabricScheduledPacketBatch, maxParallel int) error {
if maxParallel <= 1 || len(scheduled) <= 1 {
for _, batch := range scheduled {
i.recordFlowBatch(len(batch.Packets))
if err := i.sendScheduledClientPacketBatch(ctx, clusterID, vpnConnectionID, batch); err != nil {
return err
}
}
return nil
}
if maxParallel > len(scheduled) {
maxParallel = len(scheduled)
}
i.recordFlowParallel()
sem := make(chan struct{}, maxParallel)
var wg sync.WaitGroup
var errMu sync.Mutex
var firstErr error
for _, scheduledBatch := range scheduled {
batch := scheduledBatch
i.recordFlowBatch(len(batch.Packets))
sem <- struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
defer func() { <-sem }()
if err := i.sendScheduledClientPacketBatch(ctx, clusterID, vpnConnectionID, batch); err != nil {
errMu.Lock()
if firstErr == nil {
firstErr = err
}
errMu.Unlock()
}
}()
}
wg.Wait()
return firstErr
}
func (i *FabricClientPacketIngress) PreferClientRoute(routeID string) {
routeID = strings.TrimSpace(routeID)
if i == nil || routeID == "" {
return
}
i.mu.Lock()
defer i.mu.Unlock()
i.lastSelectedRouteID = routeID
i.lastSelectedNextHop = ""
}
func (i *FabricClientPacketIngress) sendScheduledClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, batch FabricScheduledPacketBatch) error {
scheduler := i.flowScheduler()
scheduler.BeginSend(batch.ChannelID)
defer scheduler.EndSend(batch.ChannelID)
defer scheduler.Complete(batch)
packets := cleanPacketBatch(batch.Packets)
if len(packets) == 0 {
return nil
}
candidates := i.routeCandidatesForChannel(clusterID, batch.ChannelID)
if len(candidates) == 0 && i.localGatewayReady(vpnConnectionID) {
if err := i.inbox().DeliverLocalPacketBatch(vpnConnectionID, FabricDirectionClientToGateway, packets); err != nil {
i.recordError(err)
return err
}
scheduler.RecordLocalFallback(batch.ChannelID)
i.recordLocalFallback()
return nil
}
if len(candidates) == 0 {
i.recordError(mesh.ErrRouteNotFound)
return mesh.ErrRouteNotFound
}
transport := i.forwardTransport()
if transport == nil {
i.recordError(mesh.ErrForwardRuntimeUnavailable)
return mesh.ErrForwardRuntimeUnavailable
}
var lastErr error
for _, candidate := range candidates {
startedAt := time.Now()
i.recordRouteAttempt()
envelopeCurrentHop := candidate.NextHop
envelopeNextHop := nextHopAfter(candidate.Route.Hops, envelopeCurrentHop, candidate.Route.DestinationNodeID)
envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{
RouteID: candidate.Route.RouteID,
ClusterID: candidate.Route.ClusterID,
SourceNodeID: candidate.Route.SourceNodeID,
DestinationNodeID: candidate.Route.DestinationNodeID,
CurrentHopNodeID: envelopeCurrentHop,
NextHopNodeID: envelopeNextHop,
RoutePath: candidate.Route.Hops,
TTL: candidate.Route.MaxTTL,
ExpiresAt: candidate.Route.ExpiresAt,
VPNConnectionID: vpnConnectionID,
Direction: FabricDirectionClientToGateway,
Packets: packets,
})
if err != nil {
lastErr = err
scheduler.RecordRouteFailureWithProvenance(batch.ChannelID, candidate.Route.RouteID, candidate.NextHop, err, time.Since(startedAt), i.routeProvenanceFor(candidate.Route))
i.recordRouteFailure(err)
continue
}
if _, err = transport.SendProduction(ctx, candidate.NextHop, envelope); err != nil {
lastErr = err
scheduler.RecordRouteFailureWithProvenance(batch.ChannelID, candidate.Route.RouteID, candidate.NextHop, err, time.Since(startedAt), i.routeProvenanceFor(candidate.Route))
i.recordRouteFailure(err)
continue
}
preference, _ := i.routeQualityPreference(candidate.Route.RouteID)
scheduler.RecordRouteSuccessWithProvenance(batch.ChannelID, candidate.Route.RouteID, candidate.NextHop, time.Since(startedAt), i.routeProvenanceFor(candidate.Route), preference)
i.recordRouteSuccess(candidate.Route.RouteID, candidate.NextHop)
return nil
}
if i.localGatewayReady(vpnConnectionID) {
if err := i.inbox().DeliverLocalPacketBatch(vpnConnectionID, FabricDirectionClientToGateway, packets); err != nil {
i.recordError(err)
return err
}
scheduler.RecordLocalFallback(batch.ChannelID)
i.recordLocalFallback()
return nil
}
if lastErr == nil {
lastErr = mesh.ErrRouteNotFound
}
i.recordError(lastErr)
return lastErr
}
func (i *FabricClientPacketIngress) ReceiveClientPacketBatch(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration) ([][]byte, error) {
inbox := i.inbox()
if i == nil || inbox == nil {
return nil, mesh.ErrForwardRuntimeUnavailable
}
if _, _, ok := i.selectRoute(clusterID); !ok {
if !i.localGatewayReady(vpnConnectionID) {
i.recordReceiveEmpty()
return nil, mesh.ErrRouteNotFound
}
}
packets, err := inbox.Receive(ctx, vpnConnectionID, FabricDirectionGatewayToClient, timeout)
if err != nil {
i.recordError(err)
return nil, err
}
if len(packets) == 0 {
i.recordReceiveEmpty()
return nil, nil
}
i.recordReceiveBatch(len(packets))
return packets, nil
}
func (i *FabricClientPacketIngress) localGatewayReady(vpnConnectionID string) bool {
if i == nil || !i.AllowLegacyLocalGatewayFallback || i.inbox() == nil || vpnConnectionID == "" {
return false
}
localGateway := i.localGateway()
return localGateway != nil && localGateway(vpnConnectionID)
}
func (i *FabricClientPacketIngress) selectRoute(clusterID string) (mesh.SyntheticRoute, string, bool) {
candidates := i.routeCandidates(clusterID)
if len(candidates) == 0 {
return mesh.SyntheticRoute{}, "", false
}
return candidates[0].Route, candidates[0].NextHop, true
}
type fabricClientRouteCandidate struct {
Route mesh.SyntheticRoute
NextHop string
}
func (i *FabricClientPacketIngress) routeCandidates(clusterID string) []fabricClientRouteCandidate {
return i.routeCandidatesWithPreference(clusterID, i.lastRouteID(), "")
}
func (i *FabricClientPacketIngress) routeCandidatesForChannel(clusterID string, channelID string) []fabricClientRouteCandidate {
preferredRouteID, avoidRouteID := i.flowScheduler().RoutePreference(channelID)
if preferredRouteID == "" && avoidRouteID == "" {
preferredRouteID = i.lastRouteID()
}
return i.routeCandidatesWithPreference(clusterID, preferredRouteID, avoidRouteID)
}
func (i *FabricClientPacketIngress) routeCandidatesWithPreference(clusterID string, preferredRouteID string, avoidRouteID string) []fabricClientRouteCandidate {
routesFunc := i.routesFunc()
if i == nil || routesFunc == nil {
return nil
}
localClusterID := i.clusterID()
localNodeID := i.localNodeID()
if clusterID == "" {
clusterID = localClusterID
}
now := time.Now().UTC()
var preferred []fabricClientRouteCandidate
var alternates []fabricClientRouteCandidate
var deferred []fabricClientRouteCandidate
var withdrawn []fabricClientRouteCandidate
manager := i.routeManager()
if preferredRouteID != "" && manager.isWithdrawn(preferredRouteID) {
if replacementRouteID := manager.replacementRouteID(preferredRouteID); replacementRouteID != "" {
preferredRouteID = replacementRouteID
} else {
if avoidRouteID == "" {
avoidRouteID = preferredRouteID
}
preferredRouteID = ""
}
}
for _, route := range routesFunc() {
if route.ClusterID != clusterID || route.SourceNodeID != localNodeID || !containsString(route.AllowedChannels, mesh.ProductionChannelVPNPacket) {
continue
}
if !route.ExpiresAt.IsZero() && !route.ExpiresAt.After(now) {
continue
}
nextHop := nextHopAfter(route.Hops, localNodeID, route.DestinationNodeID)
if nextHop == "" || nextHop == localNodeID {
continue
}
candidate := fabricClientRouteCandidate{Route: route, NextHop: nextHop}
if manager.isWithdrawn(route.RouteID) {
withdrawn = append(withdrawn, candidate)
continue
}
if preferredRouteID != "" && route.RouteID == preferredRouteID {
preferred = append(preferred, candidate)
} else if avoidRouteID != "" && route.RouteID == avoidRouteID {
deferred = append(deferred, candidate)
} else {
alternates = append(alternates, candidate)
}
}
if len(preferred) > 0 {
destinationNodeID := strings.TrimSpace(preferred[0].Route.DestinationNodeID)
alternates = filterRouteCandidatesByDestination(alternates, destinationNodeID)
deferred = filterRouteCandidatesByDestination(deferred, destinationNodeID)
}
out := append(preferred, alternates...)
out = i.applyRouteQualityPreferences(out, preferredRouteID)
out = append(out, deferred...)
if len(out) == 0 && i.preventLastRouteWithdrawal() {
return withdrawn
}
return out
}
func filterRouteCandidatesByDestination(candidates []fabricClientRouteCandidate, destinationNodeID string) []fabricClientRouteCandidate {
destinationNodeID = strings.TrimSpace(destinationNodeID)
if destinationNodeID == "" || len(candidates) == 0 {
return candidates
}
out := candidates[:0]
for _, candidate := range candidates {
if strings.TrimSpace(candidate.Route.DestinationNodeID) == destinationNodeID {
out = append(out, candidate)
}
}
return out
}
func (i *FabricClientPacketIngress) applyRouteQualityPreferences(candidates []fabricClientRouteCandidate, preferredRouteID string) []fabricClientRouteCandidate {
if len(candidates) < 2 {
return candidates
}
preferences := i.routeQualityPreferences()
if len(preferences) == 0 {
return candidates
}
preferredScore := 0
if preferredRouteID != "" {
if preference, ok := preferences[preferredRouteID]; ok {
preferredScore = preference.ScoreAdjustment
}
}
bestIndex := -1
bestScore := 0
for index, candidate := range candidates {
preference, ok := preferences[candidate.Route.RouteID]
if !ok || preference.ScoreAdjustment <= 0 {
continue
}
if bestIndex == -1 || preference.ScoreAdjustment > bestScore {
bestIndex = index
bestScore = preference.ScoreAdjustment
}
}
if bestIndex <= 0 || bestScore < preferredScore+defaultFabricRouteQualitySwitchThreshold {
return candidates
}
out := make([]fabricClientRouteCandidate, 0, len(candidates))
out = append(out, candidates[bestIndex])
out = append(out, candidates[:bestIndex]...)
out = append(out, candidates[bestIndex+1:]...)
return out
}
func (i *FabricClientPacketIngress) preventLastRouteWithdrawal() bool {
if i == nil {
return false
}
i.mu.Lock()
defer i.mu.Unlock()
return i.PreventLastRouteWithdrawal
}
func (t *FabricPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) {
if t == nil || t.Inbox == nil {
return nil, mesh.ErrForwardRuntimeUnavailable
}
direction := t.ReceiveDirection
if direction == "" {
direction = FabricDirectionClientToGateway
}
return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, timeout)
}
type FabricPacketInbox struct {
capacity int
mu sync.Mutex
queues map[string]*fabricPacketInboxQueue
dropped uint64
}
type fabricPacketInboxQueue struct {
normal chan mesh.VPNPacketBatchPayload
priority chan mesh.VPNPacketBatchPayload
}
func NewFabricPacketInbox(capacity int) *FabricPacketInbox {
if capacity <= 0 {
capacity = 4096
}
return &FabricPacketInbox{
capacity: capacity,
queues: map[string]*fabricPacketInboxQueue{},
}
}
func (i *FabricPacketInbox) DeliverProductionEnvelope(_ context.Context, envelope mesh.ProductionEnvelope) error {
if i == nil {
return mesh.ErrForwardRuntimeUnavailable
}
if envelope.ChannelClass != mesh.ProductionChannelVPNPacket || envelope.MessageType != mesh.ProductionMessageVPNPacketBatch {
return nil
}
payload, err := mesh.DecodeProductionVPNPacketBatch(envelope)
if err != nil {
return err
}
payload.Packets = cleanPacketBatch(payload.Packets)
if len(payload.Packets) == 0 {
return nil
}
return i.enqueue(payload)
}
func (i *FabricPacketInbox) DeliverLocalPacketBatch(vpnConnectionID, direction string, packets [][]byte) error {
if i == nil {
return mesh.ErrForwardRuntimeUnavailable
}
if vpnConnectionID == "" || direction == "" {
return mesh.ErrForwardEnvelopeInvalid
}
packets = cleanPacketBatch(packets)
if len(packets) == 0 {
return nil
}
return i.enqueue(mesh.VPNPacketBatchPayload{
SchemaVersion: "rap.vpn_packet_batch.v1",
VPNConnectionID: vpnConnectionID,
Direction: direction,
Packets: packets,
SentAt: time.Now().UTC(),
})
}
func (i *FabricPacketInbox) Receive(ctx context.Context, vpnConnectionID, direction string, timeout time.Duration) ([][]byte, error) {
if i == nil {
return nil, mesh.ErrForwardRuntimeUnavailable
}
if vpnConnectionID == "" || direction == "" {
return nil, mesh.ErrForwardEnvelopeInvalid
}
if timeout <= 0 {
timeout = 25 * time.Second
}
timer := time.NewTimer(timeout)
defer timer.Stop()
queue := i.queue(vpnConnectionID, direction)
for {
select {
case payload := <-queue.priority:
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
}
return packets, nil
default:
}
if len(queue.normal) > 0 {
priorityTimer := time.NewTimer(2 * time.Millisecond)
select {
case <-ctx.Done():
priorityTimer.Stop()
return nil, ctx.Err()
case <-timer.C:
priorityTimer.Stop()
return nil, nil
case payload := <-queue.priority:
priorityTimer.Stop()
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
}
return packets, nil
case <-priorityTimer.C:
}
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
return nil, nil
case payload := <-queue.priority:
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
}
return packets, nil
case payload := <-queue.normal:
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
}
return packets, nil
}
}
}
func (i *FabricPacketInbox) enqueue(payload mesh.VPNPacketBatchPayload) error {
queue := i.queue(payload.VPNConnectionID, payload.Direction)
target := queue.normal
if payload.Direction == FabricDirectionGatewayToClient && batchHasTCPControlPacket(payload.Packets) {
target = queue.priority
}
select {
case target <- payload:
default:
i.mu.Lock()
i.dropped++
i.mu.Unlock()
}
return nil
}
func (i *FabricPacketInbox) queue(vpnConnectionID, direction string) *fabricPacketInboxQueue {
key := vpnConnectionID + "\x00" + direction
i.mu.Lock()
defer i.mu.Unlock()
if i.queues == nil {
i.queues = map[string]*fabricPacketInboxQueue{}
}
queue, ok := i.queues[key]
if !ok {
priorityCapacity := maxInt(1, i.capacity/4)
queue = &fabricPacketInboxQueue{
normal: make(chan mesh.VPNPacketBatchPayload, i.capacity),
priority: make(chan mesh.VPNPacketBatchPayload, priorityCapacity),
}
i.queues[key] = queue
}
return queue
}
func batchHasTCPControlPacket(packets [][]byte) bool {
for _, packet := range packets {
if isTCPControlPacket(packet) {
return true
}
}
return false
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
func (i *FabricPacketInbox) Dropped() uint64 {
if i == nil {
return 0
}
i.mu.Lock()
defer i.mu.Unlock()
return i.dropped
}
type FabricPacketInboxSnapshot struct {
SchemaVersion string `json:"schema_version"`
Capacity int `json:"capacity"`
Dropped uint64 `json:"dropped"`
QueueDepths map[string]int `json:"queue_depths"`
QueueCount int `json:"queue_count"`
}
func (i *FabricPacketInbox) Snapshot() FabricPacketInboxSnapshot {
snapshot := FabricPacketInboxSnapshot{
SchemaVersion: "rap.fabric_packet_inbox.v1",
QueueDepths: map[string]int{},
}
if i == nil {
return snapshot
}
i.mu.Lock()
defer i.mu.Unlock()
snapshot.Capacity = i.capacity
snapshot.Dropped = i.dropped
snapshot.QueueCount = len(i.queues)
for key, queue := range i.queues {
snapshot.QueueDepths[strings.ReplaceAll(key, "\x00", ":")] = len(queue.normal) + len(queue.priority)
}
return snapshot
}
type FabricClientPacketIngressSnapshot struct {
SchemaVersion string `json:"schema_version"`
ClusterID string `json:"cluster_id,omitempty"`
LocalNodeID string `json:"local_node_id,omitempty"`
RouteCandidateCount int `json:"route_candidate_count"`
LastSelectedRouteID string `json:"last_selected_route_id,omitempty"`
LastSelectedNextHop string `json:"last_selected_next_hop,omitempty"`
LastError string `json:"last_error,omitempty"`
SendBatches uint64 `json:"send_batches"`
SendPackets uint64 `json:"send_packets"`
SendRouteAttempts uint64 `json:"send_route_attempts"`
SendRouteFailures uint64 `json:"send_route_failures"`
SendFallbackLocal uint64 `json:"send_fallback_local"`
SendFlowBatches uint64 `json:"send_flow_batches"`
SendFlowPackets uint64 `json:"send_flow_packets"`
SendFlowDropped uint64 `json:"send_flow_dropped"`
SendFlowParallel uint64 `json:"send_flow_parallel_batches"`
MaxParallelFlowSends int `json:"max_parallel_flow_sends"`
RecommendedParallelFlowSends int `json:"recommended_parallel_flow_sends"`
ReceiveBatches uint64 `json:"receive_batches"`
ReceivePackets uint64 `json:"receive_packets"`
ReceiveEmpty uint64 `json:"receive_empty"`
Inbox FabricPacketInboxSnapshot `json:"inbox"`
FlowScheduler FabricFlowSchedulerSnapshot `json:"flow_scheduler"`
RouteManager FabricServiceChannelRouteManager `json:"route_manager"`
RouteManagerTransition FabricServiceChannelRouteManagerTransition `json:"route_manager_transition"`
RouteQualityPreferenceCount int `json:"route_quality_preference_count"`
RouteQualityPreferences []FabricServiceChannelRouteQualityPreference `json:"route_quality_preferences,omitempty"`
}
func (i *FabricClientPacketIngress) Snapshot(clusterID string) FabricClientPacketIngressSnapshot {
snapshot := FabricClientPacketIngressSnapshot{
SchemaVersion: "rap.fabric_service_channel_route_manager.v1",
}
if i == nil {
return snapshot
}
i.mu.Lock()
snapshot.ClusterID = firstNonEmpty(clusterID, i.ClusterID)
snapshot.LocalNodeID = i.LocalNodeID
snapshot.LastSelectedRouteID = i.lastSelectedRouteID
snapshot.LastSelectedNextHop = i.lastSelectedNextHop
snapshot.LastError = i.lastError
snapshot.SendBatches = i.sendBatches
snapshot.SendPackets = i.sendPackets
snapshot.SendRouteAttempts = i.sendRouteAttempts
snapshot.SendRouteFailures = i.sendRouteFailures
snapshot.SendFallbackLocal = i.sendFallbackLocal
snapshot.SendFlowBatches = i.sendFlowBatches
snapshot.SendFlowPackets = i.sendFlowPackets
snapshot.SendFlowDropped = i.sendFlowDropped
snapshot.SendFlowParallel = i.sendFlowParallel
snapshot.MaxParallelFlowSends = i.maxParallelFlowSendsLocked()
snapshot.ReceiveBatches = i.receiveBatches
snapshot.ReceivePackets = i.receivePackets
snapshot.ReceiveEmpty = i.receiveEmpty
snapshot.RouteManager = i.RouteManager.snapshot()
snapshot.RouteManagerTransition = i.RouteManagerTransition.snapshot()
snapshot.RouteQualityPreferenceCount = len(i.RouteQualityPreferences)
snapshot.RouteQualityPreferences = routeQualityPreferenceSlice(i.RouteQualityPreferences)
recoveryPolicyFingerprint := strings.TrimSpace(i.RecoveryPolicyFingerprint)
i.mu.Unlock()
snapshot.RouteCandidateCount = len(i.routeCandidates(snapshot.ClusterID))
snapshot.Inbox = i.inbox().Snapshot()
snapshot.FlowScheduler = i.flowScheduler().Snapshot()
annotateFabricFlowSchedulerProvenance(&snapshot.FlowScheduler, i.routeProvenance(snapshot.ClusterID), recoveryPolicyFingerprint)
snapshot.RecommendedParallelFlowSends = i.flowScheduler().RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassBulk, snapshot.MaxParallelFlowSends)
return snapshot
}
func (i *FabricClientPacketIngress) UpdateRuntime(forwardTransport mesh.ProductionForwardTransport, inbox *FabricPacketInbox, clusterID string, localNodeID string, localGateway func(string) bool, routes func() []mesh.SyntheticRoute, recoveryPolicyFingerprint string, adaptivePolicies ...FabricServiceChannelAdaptivePolicy) {
if i == nil {
return
}
i.mu.Lock()
defer i.mu.Unlock()
i.ForwardTransport = forwardTransport
i.Inbox = inbox
i.ClusterID = clusterID
i.LocalNodeID = localNodeID
i.LocalGateway = localGateway
i.Routes = routes
i.RecoveryPolicyFingerprint = strings.TrimSpace(recoveryPolicyFingerprint)
adaptivePolicy := defaultFabricServiceChannelAdaptivePolicy()
if len(adaptivePolicies) > 0 {
adaptivePolicy = adaptivePolicies[0]
}
i.AdaptivePolicyFingerprint = strings.TrimSpace(adaptivePolicy.Fingerprint)
if i.FlowScheduler == nil {
i.FlowScheduler = NewFabricFlowScheduler(0, 0)
}
i.FlowScheduler.ConfigureAdaptivePolicy(adaptivePolicy)
if i.MaxParallelFlowSends <= 0 {
i.MaxParallelFlowSends = defaultFabricFlowParallelSendWindow
}
}
type fabricRouteProvenance struct {
PolicyVersion string
Generation string
}
type fabricFlowRouteProvenance struct {
PolicyVersion string
Generation string
RecoveryPolicyFingerprint string
}
func (i *FabricClientPacketIngress) routeProvenanceFor(route mesh.SyntheticRoute) fabricFlowRouteProvenance {
policyVersion := strings.TrimSpace(route.PolicyVersion)
if policyVersion == "" {
policyVersion = strings.TrimSpace(route.RouteVersion)
}
generation := policyVersion
return fabricFlowRouteProvenance{
PolicyVersion: policyVersion,
Generation: generation,
RecoveryPolicyFingerprint: strings.TrimSpace(i.RecoveryPolicyFingerprint),
}
}
func (i *FabricClientPacketIngress) routeProvenance(clusterID string) map[string]fabricRouteProvenance {
out := map[string]fabricRouteProvenance{}
routesFunc := i.routesFunc()
if i == nil || routesFunc == nil {
return out
}
localNodeID := i.localNodeID()
for _, route := range routesFunc() {
if strings.TrimSpace(route.RouteID) == "" {
continue
}
if clusterID != "" && route.ClusterID != clusterID {
continue
}
if localNodeID != "" && route.SourceNodeID != localNodeID {
continue
}
policyVersion := strings.TrimSpace(route.PolicyVersion)
if policyVersion == "" {
policyVersion = strings.TrimSpace(route.RouteVersion)
}
out[route.RouteID] = fabricRouteProvenance{
PolicyVersion: policyVersion,
Generation: policyVersion,
}
}
return out
}
func annotateFabricFlowSchedulerProvenance(snapshot *FabricFlowSchedulerSnapshot, routes map[string]fabricRouteProvenance, recoveryPolicyFingerprint string) {
if snapshot == nil || len(snapshot.ChannelStats) == 0 {
return
}
for channelID, stat := range snapshot.ChannelStats {
if recoveryPolicyFingerprint != "" {
stat.RecoveryPolicyFingerprint = recoveryPolicyFingerprint
}
if route, ok := routes[stat.LastRouteID]; ok {
stat.RoutePolicyVersion = route.PolicyVersion
stat.RouteGeneration = route.Generation
}
if route, ok := routes[stat.LastFailedRouteID]; ok {
stat.LastFailedRoutePolicyVersion = route.PolicyVersion
stat.LastFailedRouteGeneration = route.Generation
if stat.RoutePolicyVersion == "" {
stat.RoutePolicyVersion = route.PolicyVersion
}
if stat.RouteGeneration == "" {
stat.RouteGeneration = route.Generation
}
}
snapshot.ChannelStats[channelID] = stat
}
}
func (i *FabricClientPacketIngress) UpdateRouteManager(decisions []FabricServiceChannelRouteManagerDecision, generation string, observedAt time.Time) {
if i == nil {
return
}
manager := NewFabricServiceChannelRouteManager(decisions, generation, observedAt)
i.mu.Lock()
transition := newFabricServiceChannelRouteManagerTransition(i.RouteManager, manager, observedAt)
i.RouteManager = manager
if i.lastSelectedRouteID != "" && manager.isWithdrawn(i.lastSelectedRouteID) {
clearedRouteID := i.lastSelectedRouteID
transition.ClearedSelectedRouteID = clearedRouteID
i.lastSelectedRouteID = manager.replacementRouteID(clearedRouteID)
i.lastSelectedNextHop = ""
}
i.RouteManagerTransition = transition
withdrawnRoutes := manager.withdrawnRouteIDs()
scheduler := i.FlowScheduler
i.mu.Unlock()
if scheduler != nil {
scheduler.ClearQualityPreferencesForRoutes(withdrawnRoutes)
}
}
func (i *FabricClientPacketIngress) UpdateRouteQualityPreferences(preferences []FabricServiceChannelRouteQualityPreference, observedAt time.Time) {
if i == nil {
return
}
now := observedAt.UTC()
if now.IsZero() {
now = time.Now().UTC()
}
next := map[string]FabricServiceChannelRouteQualityPreference{}
for _, preference := range preferences {
preference.RouteID = strings.TrimSpace(preference.RouteID)
preference.FeedbackStatus = strings.TrimSpace(preference.FeedbackStatus)
if preference.RouteID == "" || preference.ScoreAdjustment <= 0 {
continue
}
if preference.FeedbackStatus != "" && preference.FeedbackStatus != "healthy" {
continue
}
if preference.ExpiresAt != "" {
expiresAt, err := time.Parse(time.RFC3339Nano, preference.ExpiresAt)
if err == nil && !expiresAt.After(now) {
continue
}
}
if preference.RawScoreAdjustment <= 0 {
preference.RawScoreAdjustment = preference.ScoreAdjustment
}
preference.Reasons = dedupeStrings(preference.Reasons)
next[preference.RouteID] = preference
}
i.mu.Lock()
i.RouteQualityPreferences = next
scheduler := i.FlowScheduler
i.mu.Unlock()
validRouteIDs := make(map[string]struct{}, len(next))
for routeID := range next {
validRouteIDs[routeID] = struct{}{}
}
if scheduler != nil {
scheduler.ClearQualityPreferencesNotIn(validRouteIDs)
}
}
func NewFabricServiceChannelRouteManager(decisions []FabricServiceChannelRouteManagerDecision, generation string, observedAt time.Time) FabricServiceChannelRouteManager {
manager := FabricServiceChannelRouteManager{
SchemaVersion: "rap.fabric_service_channel_route_manager_rebuild.v1",
Generation: strings.TrimSpace(generation),
Decisions: []FabricServiceChannelRouteManagerDecision{},
withdrawnRoutes: map[string]FabricServiceChannelRouteManagerDecision{},
replacements: map[string]string{},
}
if !observedAt.IsZero() {
manager.LastAppliedAt = observedAt.UTC().Format(time.RFC3339Nano)
}
for _, decision := range decisions {
decision.RouteID = strings.TrimSpace(decision.RouteID)
decision.ReplacementRouteID = strings.TrimSpace(decision.ReplacementRouteID)
decision.RebuildStatus = strings.TrimSpace(decision.RebuildStatus)
decision.RebuildRequestID = strings.TrimSpace(decision.RebuildRequestID)
if decision.RouteID == "" || decision.RebuildStatus == "" {
continue
}
decision.EffectiveHops = append([]string{}, decision.EffectiveHops...)
manager.Decisions = append(manager.Decisions, decision)
manager.RebuildRequestCount++
switch decision.RebuildStatus {
case "applied":
manager.RebuildAppliedCount++
manager.WithdrawnRouteCount++
manager.withdrawnRoutes[decision.RouteID] = decision
if decision.ReplacementRouteID != "" {
manager.replacements[decision.RouteID] = decision.ReplacementRouteID
}
case "pending_degraded_fallback":
manager.PendingFallbackCount++
manager.WithdrawnRouteCount++
manager.withdrawnRoutes[decision.RouteID] = decision
}
}
return manager
}
func (m FabricServiceChannelRouteManager) snapshot() FabricServiceChannelRouteManager {
out := m
out.Decisions = append([]FabricServiceChannelRouteManagerDecision{}, m.Decisions...)
out.withdrawnRoutes = nil
out.replacements = nil
if out.SchemaVersion == "" {
out.SchemaVersion = "rap.fabric_service_channel_route_manager_rebuild.v1"
}
return out
}
func newFabricServiceChannelRouteManagerTransition(previous FabricServiceChannelRouteManager, next FabricServiceChannelRouteManager, observedAt time.Time) FabricServiceChannelRouteManagerTransition {
transition := FabricServiceChannelRouteManagerTransition{
SchemaVersion: "rap.fabric_service_channel_route_manager_transition.v1",
PreviousGeneration: strings.TrimSpace(previous.Generation),
Generation: strings.TrimSpace(next.Generation),
DecisionCount: len(next.Decisions),
WithdrawnRouteCount: next.WithdrawnRouteCount,
PendingFallbackCount: next.PendingFallbackCount,
RebuildAppliedCount: next.RebuildAppliedCount,
}
if !observedAt.IsZero() {
transition.ObservedAt = observedAt.UTC().Format(time.RFC3339Nano)
}
previousWithdrawn := previous.withdrawnRouteIDs()
nextWithdrawn := next.withdrawnRouteIDs()
for routeID := range previousWithdrawn {
if _, ok := nextWithdrawn[routeID]; !ok {
transition.RestoredRouteCount++
}
}
switch {
case transition.RestoredRouteCount > 0 && transition.WithdrawnRouteCount == 0:
transition.Status = "restored_by_new_config"
case transition.RebuildAppliedCount > 0:
transition.Status = "applied_rebuild"
case transition.PendingFallbackCount > 0:
transition.Status = "pending_degraded_fallback"
case transition.DecisionCount > 0:
transition.Status = "decisions_observed"
default:
transition.Status = "empty"
}
return transition
}
func (t FabricServiceChannelRouteManagerTransition) snapshot() FabricServiceChannelRouteManagerTransition {
if t.SchemaVersion == "" {
t.SchemaVersion = "rap.fabric_service_channel_route_manager_transition.v1"
}
return t
}
func (m FabricServiceChannelRouteManager) withdrawnRouteIDs() map[string]struct{} {
out := map[string]struct{}{}
if m.withdrawnRoutes != nil {
for routeID := range m.withdrawnRoutes {
routeID = strings.TrimSpace(routeID)
if routeID != "" {
out[routeID] = struct{}{}
}
}
return out
}
for _, decision := range m.Decisions {
if decision.RouteID == "" {
continue
}
if decision.RebuildStatus == "applied" || decision.RebuildStatus == "pending_degraded_fallback" {
out[strings.TrimSpace(decision.RouteID)] = struct{}{}
}
}
return out
}
func (m FabricServiceChannelRouteManager) isWithdrawn(routeID string) bool {
routeID = strings.TrimSpace(routeID)
if routeID == "" {
return false
}
if m.withdrawnRoutes != nil {
_, ok := m.withdrawnRoutes[routeID]
return ok
}
for _, decision := range m.Decisions {
if decision.RouteID == routeID && (decision.RebuildStatus == "applied" || decision.RebuildStatus == "pending_degraded_fallback") {
return true
}
}
return false
}
func (m FabricServiceChannelRouteManager) replacementRouteID(routeID string) string {
routeID = strings.TrimSpace(routeID)
if routeID == "" {
return ""
}
if m.replacements != nil {
return strings.TrimSpace(m.replacements[routeID])
}
for _, decision := range m.Decisions {
if strings.TrimSpace(decision.RouteID) == routeID && decision.RebuildStatus == "applied" {
return strings.TrimSpace(decision.ReplacementRouteID)
}
}
return ""
}
func (i *FabricClientPacketIngress) forwardTransport() mesh.ProductionForwardTransport {
if i == nil {
return nil
}
i.mu.Lock()
defer i.mu.Unlock()
return i.ForwardTransport
}
func (i *FabricClientPacketIngress) inbox() *FabricPacketInbox {
if i == nil {
return nil
}
i.mu.Lock()
defer i.mu.Unlock()
return i.Inbox
}
func (i *FabricClientPacketIngress) localGateway() func(string) bool {
if i == nil {
return nil
}
i.mu.Lock()
defer i.mu.Unlock()
return i.LocalGateway
}
func (i *FabricClientPacketIngress) routesFunc() func() []mesh.SyntheticRoute {
if i == nil {
return nil
}
i.mu.Lock()
defer i.mu.Unlock()
return i.Routes
}
func (i *FabricClientPacketIngress) clusterID() string {
if i == nil {
return ""
}
i.mu.Lock()
defer i.mu.Unlock()
return strings.TrimSpace(i.ClusterID)
}
func (i *FabricClientPacketIngress) localNodeID() string {
if i == nil {
return ""
}
i.mu.Lock()
defer i.mu.Unlock()
return strings.TrimSpace(i.LocalNodeID)
}
func (i *FabricClientPacketIngress) flowScheduler() *FabricFlowScheduler {
if i == nil {
return NewFabricFlowScheduler(0, 0)
}
i.mu.Lock()
defer i.mu.Unlock()
if i.FlowScheduler == nil {
i.FlowScheduler = NewFabricFlowScheduler(0, 0)
}
return i.FlowScheduler
}
func (i *FabricClientPacketIngress) maxParallelFlowSends() int {
if i == nil {
return 1
}
i.mu.Lock()
defer i.mu.Unlock()
return i.maxParallelFlowSendsLocked()
}
func (i *FabricClientPacketIngress) maxParallelFlowSendsLocked() int {
if i == nil || i.MaxParallelFlowSends <= 0 {
return 1
}
return i.MaxParallelFlowSends
}
func (i *FabricClientPacketIngress) routeManager() FabricServiceChannelRouteManager {
if i == nil {
return FabricServiceChannelRouteManager{}
}
i.mu.Lock()
defer i.mu.Unlock()
return i.RouteManager
}
func (i *FabricClientPacketIngress) routeQualityPreferences() map[string]FabricServiceChannelRouteQualityPreference {
if i == nil {
return nil
}
i.mu.Lock()
defer i.mu.Unlock()
out := make(map[string]FabricServiceChannelRouteQualityPreference, len(i.RouteQualityPreferences))
for routeID, preference := range i.RouteQualityPreferences {
out[routeID] = preference
}
return out
}
func (i *FabricClientPacketIngress) routeQualityPreference(routeID string) (FabricServiceChannelRouteQualityPreference, bool) {
routeID = strings.TrimSpace(routeID)
if i == nil || routeID == "" {
return FabricServiceChannelRouteQualityPreference{}, false
}
i.mu.Lock()
defer i.mu.Unlock()
preference, ok := i.RouteQualityPreferences[routeID]
return preference, ok
}
func routeQualityPreferenceSlice(preferences map[string]FabricServiceChannelRouteQualityPreference) []FabricServiceChannelRouteQualityPreference {
if len(preferences) == 0 {
return nil
}
out := make([]FabricServiceChannelRouteQualityPreference, 0, len(preferences))
for _, preference := range preferences {
preference.Reasons = dedupeStrings(preference.Reasons)
out = append(out, preference)
}
sort.SliceStable(out, func(a, b int) bool {
if out[a].ScoreAdjustment != out[b].ScoreAdjustment {
return out[a].ScoreAdjustment > out[b].ScoreAdjustment
}
return out[a].RouteID < out[b].RouteID
})
return out
}
func (i *FabricClientPacketIngress) lastRouteID() string {
if i == nil {
return ""
}
i.mu.Lock()
defer i.mu.Unlock()
return i.lastSelectedRouteID
}
func (i *FabricClientPacketIngress) recordSendBatch(packetCount int) {
i.mu.Lock()
defer i.mu.Unlock()
i.sendBatches++
i.sendPackets += uint64(packetCount)
}
func (i *FabricClientPacketIngress) recordRouteAttempt() {
i.mu.Lock()
defer i.mu.Unlock()
i.sendRouteAttempts++
}
func (i *FabricClientPacketIngress) recordRouteFailure(err error) {
i.mu.Lock()
defer i.mu.Unlock()
i.sendRouteFailures++
if err != nil {
i.lastError = err.Error()
}
}
func (i *FabricClientPacketIngress) recordRouteSuccess(routeID, nextHop string) {
i.mu.Lock()
defer i.mu.Unlock()
i.lastSelectedRouteID = routeID
i.lastSelectedNextHop = nextHop
i.lastError = ""
}
func (i *FabricClientPacketIngress) recordLocalFallback() {
i.mu.Lock()
defer i.mu.Unlock()
i.sendFallbackLocal++
i.lastSelectedRouteID = "local_gateway"
i.lastSelectedNextHop = i.LocalNodeID
i.lastError = ""
}
func (i *FabricClientPacketIngress) recordFlowBatch(packetCount int) {
i.mu.Lock()
defer i.mu.Unlock()
i.sendFlowBatches++
i.sendFlowPackets += uint64(packetCount)
}
func (i *FabricClientPacketIngress) recordFlowDropped(packetCount uint64) {
i.mu.Lock()
defer i.mu.Unlock()
i.sendFlowDropped += packetCount
}
func (i *FabricClientPacketIngress) recordFlowParallel() {
i.mu.Lock()
defer i.mu.Unlock()
i.sendFlowParallel++
}
func (i *FabricClientPacketIngress) recordReceiveBatch(packetCount int) {
i.mu.Lock()
defer i.mu.Unlock()
i.receiveBatches++
i.receivePackets += uint64(packetCount)
}
func (i *FabricClientPacketIngress) recordReceiveEmpty() {
i.mu.Lock()
defer i.mu.Unlock()
i.receiveEmpty++
}
func (i *FabricClientPacketIngress) recordError(err error) {
if err == nil {
return
}
i.mu.Lock()
defer i.mu.Unlock()
i.lastError = err.Error()
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
}
return ""
}
func nextHopAfter(path []string, localNodeID string, destinationNodeID string) string {
if len(path) == 0 {
return destinationNodeID
}
for index, nodeID := range path {
if nodeID == localNodeID {
if index+1 < len(path) {
return path[index+1]
}
return localNodeID
}
}
return destinationNodeID
}
func containsString(values []string, needle string) bool {
for _, value := range values {
if value == needle {
return true
}
}
return false
}
func dedupeStrings(values []string) []string {
if len(values) == 0 {
return nil
}
seen := map[string]struct{}{}
out := make([]string, 0, len(values))
for _, value := range values {
value = strings.TrimSpace(value)
if value == "" {
continue
}
if _, ok := seen[value]; ok {
continue
}
seen[value] = struct{}{}
out = append(out, value)
}
return out
}
func classifyPacketFlow(packet []byte, shardCount int) (string, int) {
if shardCount <= 0 {
shardCount = defaultFabricFlowShardCount
}
key := packetFlowKey(packet)
hash := fnv.New32a()
_, _ = hash.Write([]byte(key))
shard := int(hash.Sum32() % uint32(shardCount))
return key, shard
}
func packetFlowKey(packet []byte) string {
if len(packet) == 0 {
return "empty"
}
version := packet[0] >> 4
switch version {
case 4:
return ipv4PacketFlowKey(packet)
case 6:
return ipv6PacketFlowKey(packet)
default:
sum := fnv.New64a()
_, _ = sum.Write(packet)
return fmt.Sprintf("opaque:%x", sum.Sum64())
}
}
func ipv4PacketFlowKey(packet []byte) string {
if len(packet) < 20 {
return packetHashFlowKey("ipv4-short", packet)
}
ihl := int(packet[0]&0x0f) * 4
if ihl < 20 || len(packet) < ihl {
return packetHashFlowKey("ipv4-invalid", packet)
}
proto := packet[9]
src := binary.BigEndian.Uint32(packet[12:16])
dst := binary.BigEndian.Uint32(packet[16:20])
srcPort, dstPort := transportPorts(proto, packet[ihl:])
if src > dst || (src == dst && srcPort > dstPort) {
src, dst = dst, src
srcPort, dstPort = dstPort, srcPort
}
return fmt.Sprintf("ipv4:%d:%08x:%d:%08x:%d", proto, src, srcPort, dst, dstPort)
}
func ipv6PacketFlowKey(packet []byte) string {
if len(packet) < 40 {
return packetHashFlowKey("ipv6-short", packet)
}
nextHeader := packet[6]
src := packet[8:24]
dst := packet[24:40]
srcPort, dstPort := transportPorts(nextHeader, packet[40:])
srcKey := fmt.Sprintf("%x", src)
dstKey := fmt.Sprintf("%x", dst)
if srcKey > dstKey || (srcKey == dstKey && srcPort > dstPort) {
srcKey, dstKey = dstKey, srcKey
srcPort, dstPort = dstPort, srcPort
}
return fmt.Sprintf("ipv6:%d:%s:%d:%s:%d", nextHeader, srcKey, srcPort, dstKey, dstPort)
}
func transportPorts(proto byte, payload []byte) (uint16, uint16) {
switch proto {
case 6, 17:
if len(payload) >= 4 {
return binary.BigEndian.Uint16(payload[0:2]), binary.BigEndian.Uint16(payload[2:4])
}
}
return 0, 0
}
func packetHashFlowKey(prefix string, packet []byte) string {
sum := fnv.New64a()
_, _ = sum.Write(packet)
return fmt.Sprintf("%s:%x", prefix, sum.Sum64())
}
func cleanPacketBatch(packets [][]byte) [][]byte {
if len(packets) == 0 {
return nil
}
cleaned := make([][]byte, 0, len(packets))
for _, packet := range packets {
if len(packet) == 0 {
continue
}
cleaned = append(cleaned, append([]byte(nil), packet...))
}
return cleaned
}