diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bfa33bb..02039b3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -17,6 +17,22 @@ jobs: go-version: '1.23.8' - run: go mod download - run: go build ./... + - run: go test ./... + + node-agent: + runs-on: ubuntu-24.04 + defaults: + run: + working-directory: agents/rap-node-agent + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: '1.23.8' + - run: go mod download + - run: go test ./... + - name: Anti-flake check (vpnruntime) + run: go test ./internal/vpnruntime -count=20 worker: runs-on: ubuntu-24.04 diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go new file mode 100644 index 0000000..4a6509f --- /dev/null +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -0,0 +1,2022 @@ +package vpnruntime + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" +) + +type captureProductionTransport struct { + nextNodeID string + envelope mesh.ProductionEnvelope +} + +func (t *captureProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { + t.nextNodeID = nextNodeID + t.envelope = envelope + return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil +} + +type failoverProductionTransport struct { + failNextHop string + calls []string + envelope mesh.ProductionEnvelope +} + +func (t *failoverProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { + t.calls = append(t.calls, nextNodeID) + if nextNodeID == t.failNextHop { + return mesh.ProductionForwardResult{}, mesh.ErrForwardPeerUnavailable + } + t.envelope = envelope + return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil +} + +type captureManyProductionTransport struct { + calls []string + envelopes []mesh.ProductionEnvelope +} + +func (t *captureManyProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { + t.calls = append(t.calls, nextNodeID) + t.envelopes = append(t.envelopes, envelope) + return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil +} + +type routeFailingProductionTransport struct { + failRouteID string + callsByRoute map[string]int + envelopes []mesh.ProductionEnvelope +} + +func (t *routeFailingProductionTransport) SendProduction(_ context.Context, _ string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { + if t.callsByRoute == nil { + t.callsByRoute = map[string]int{} + } + t.callsByRoute[envelope.RouteID]++ + if envelope.RouteID == t.failRouteID { + return mesh.ProductionForwardResult{}, mesh.ErrForwardPeerUnavailable + } + t.envelopes = append(t.envelopes, envelope) + return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil +} + +type blockingProductionTransport struct { + mu sync.Mutex + calls []string + envelopes []mesh.ProductionEnvelope + slowPort uint16 + slowStarted chan struct{} + releaseSlow chan struct{} + fastDone chan struct{} +} + +func (t *blockingProductionTransport) SendProduction(ctx context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { + payload, err := mesh.DecodeProductionVPNPacketBatch(envelope) + if err != nil { + return mesh.ProductionForwardResult{}, err + } + isSlow := false + for _, packet := range payload.Packets { + if packetSourcePort(packet) == t.slowPort { + isSlow = true + break + } + } + if isSlow { + closeOnce(t.slowStarted) + select { + case <-t.releaseSlow: + case <-ctx.Done(): + return mesh.ProductionForwardResult{}, ctx.Err() + } + } else if t.slowStarted != nil { + // Keep fast sends behind the observed slow-start boundary so the test + // deterministically exercises overlapping in-flight sends. + select { + case <-t.slowStarted: + case <-ctx.Done(): + return mesh.ProductionForwardResult{}, ctx.Err() + } + } + t.mu.Lock() + t.calls = append(t.calls, nextNodeID) + t.envelopes = append(t.envelopes, envelope) + t.mu.Unlock() + if !isSlow { + closeOnce(t.fastDone) + } + return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil +} + +type memoryPacketTransport struct { + sendErr error + recvErr error + sent [][]byte + recv [][]byte +} + +func (t *memoryPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error { + if t.sendErr != nil { + return t.sendErr + } + t.sent = append(t.sent, packets...) + return nil +} + +func (t *memoryPacketTransport) ReceiveGatewayPacketBatch(_ context.Context, _ time.Duration) ([][]byte, error) { + if t.recvErr != nil { + return nil, t.recvErr + } + packets := t.recv + t.recv = nil + return packets, nil +} + +func TestFabricPacketTransportSendsVPNPacketBatchEnvelope(t *testing.T) { + capture := &captureProductionTransport{} + transport := &FabricPacketTransport{ + ForwardTransport: capture, + ClusterID: "cluster-1", + VPNConnectionID: "vpn-1", + RouteID: "route-1", + LocalNodeID: "exit-1", + RemoteNodeID: "entry-1", + NextHopNodeID: "relay-1", + RoutePath: []string{"exit-1", "relay-1", "entry-1"}, + SendDirection: FabricDirectionGatewayToClient, + } + + if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send fabric packet batch: %v", err) + } + if capture.nextNodeID != "relay-1" { + t.Fatalf("next node = %q", capture.nextNodeID) + } + if capture.envelope.CurrentHopNodeID != "relay-1" || capture.envelope.NextHopNodeID != "entry-1" { + t.Fatalf("envelope hop = %s -> %s, want relay-1 -> entry-1", capture.envelope.CurrentHopNodeID, capture.envelope.NextHopNodeID) + } + payload, err := mesh.DecodeProductionVPNPacketBatch(capture.envelope) + if err != nil { + t.Fatalf("decode envelope payload: %v", err) + } + if payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionGatewayToClient || string(payload.Packets[0]) != "packet" { + t.Fatalf("payload = %+v", payload) + } +} + +func TestFabricPacketInboxReceivesMatchingDirection(t *testing.T) { + inbox := NewFabricPacketInbox(4) + envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{ + RouteID: "route-1", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + CurrentHopNodeID: "exit-1", + NextHopNodeID: "exit-1", + RoutePath: []string{"entry-1", "exit-1"}, + VPNConnectionID: "vpn-1", + Direction: FabricDirectionClientToGateway, + Packets: [][]byte{[]byte("packet")}, + }) + if err != nil { + t.Fatalf("new envelope: %v", err) + } + if err := inbox.DeliverProductionEnvelope(context.Background(), envelope); err != nil { + t.Fatalf("deliver envelope: %v", err) + } + + packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second) + if err != nil { + t.Fatalf("receive: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "packet" { + t.Fatalf("packets = %#v", packets) + } +} + +func TestFabricPacketInboxKeepsDirectionsIsolated(t *testing.T) { + inbox := NewFabricPacketInbox(4) + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{[]byte("reply")}); err != nil { + t.Fatalf("deliver gateway packet: %v", err) + } + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionClientToGateway, [][]byte{[]byte("request")}); err != nil { + t.Fatalf("deliver client packet: %v", err) + } + + clientPackets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second) + if err != nil { + t.Fatalf("receive client direction: %v", err) + } + gatewayPackets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) + if err != nil { + t.Fatalf("receive gateway direction: %v", err) + } + if len(clientPackets) != 1 || string(clientPackets[0]) != "request" { + t.Fatalf("client packets = %#v", clientPackets) + } + if len(gatewayPackets) != 1 || string(gatewayPackets[0]) != "reply" { + t.Fatalf("gateway packets = %#v", gatewayPackets) + } +} + +func TestFabricPacketInboxDropsEmptyPackets(t *testing.T) { + inbox := NewFabricPacketInbox(4) + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{nil, []byte{}, []byte("reply")}); err != nil { + t.Fatalf("deliver gateway packet: %v", err) + } + + packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) + if err != nil { + t.Fatalf("receive gateway packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "reply" { + t.Fatalf("packets = %#v", packets) + } +} + +func TestLocalPacketTransportUsesFabricInboxDirections(t *testing.T) { + inbox := NewFabricPacketInbox(4) + transport := &LocalPacketTransport{Inbox: inbox, VPNConnectionID: "vpn-1"} + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionClientToGateway, [][]byte{[]byte("request")}); err != nil { + t.Fatalf("deliver client packet: %v", err) + } + packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) + if err != nil { + t.Fatalf("receive gateway packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "request" { + t.Fatalf("received packets = %#v", packets) + } + if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil { + t.Fatalf("send gateway packet: %v", err) + } + replies, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) + if err != nil { + t.Fatalf("receive client reply: %v", err) + } + if len(replies) != 1 || string(replies[0]) != "reply" { + t.Fatalf("reply packets = %#v", replies) + } +} + +func TestFabricFlowSchedulerKeepsReverseFiveTupleTogether(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + forward := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + reverse := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000) + + batches := scheduler.ScheduleClientPackets([][]byte{forward, reverse}) + if len(batches) != 1 { + t.Fatalf("batches = %#v, want one logical flow channel", batches) + } + if len(batches[0].Packets) != 2 { + t.Fatalf("batch packets = %d", len(batches[0].Packets)) + } + snapshot := scheduler.Snapshot() + if snapshot.Enqueued != 2 || snapshot.ChannelCount != 1 || snapshot.QueueDepths[batches[0].ChannelID] != 2 { + t.Fatalf("snapshot before complete = %+v", snapshot) + } + scheduler.Complete(batches[0]) + snapshot = scheduler.Snapshot() + if snapshot.Dequeued != 2 || snapshot.QueueDepths[batches[0].ChannelID] != 0 { + t.Fatalf("snapshot after complete = %+v", snapshot) + } +} + +func TestFabricFlowSchedulerPrioritizesExplicitTrafficClass(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + bulk := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassBulk, [][]byte{packet}) + interactive := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassInteractive, [][]byte{packet}) + control := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassControl, [][]byte{packet}) + combined := append(append(bulk, interactive...), control...) + + scheduler.sortScheduledBatches(combined) + if len(combined) != 3 { + t.Fatalf("scheduled count = %d, want 3", len(combined)) + } + if combined[0].TrafficClass != FabricTrafficClassControl || combined[1].TrafficClass != FabricTrafficClassInteractive || combined[2].TrafficClass != FabricTrafficClassBulk { + t.Fatalf("scheduled classes = %s, %s, %s", combined[0].TrafficClass, combined[1].TrafficClass, combined[2].TrafficClass) + } + if !strings.Contains(combined[0].ChannelID, ":control:") || !strings.Contains(combined[1].ChannelID, ":interactive:") { + t.Fatalf("priority channel ids = %q %q", combined[0].ChannelID, combined[1].ChannelID) + } + snapshot := scheduler.Snapshot() + if snapshot.ChannelStats[combined[0].ChannelID].TrafficClass != FabricTrafficClassControl { + t.Fatalf("control stat = %+v", snapshot.ChannelStats[combined[0].ChannelID]) + } + if snapshot.ChannelStats[combined[2].ChannelID].TrafficClass != FabricTrafficClassBulk { + t.Fatalf("bulk stat = %+v", snapshot.ChannelStats[combined[2].ChannelID]) + } + if snapshot.TrafficClassCounts[FabricTrafficClassControl] != 1 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 || snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 1 { + t.Fatalf("traffic class counts = %+v", snapshot.TrafficClassCounts) + } +} + +func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) { + scheduler := NewFabricFlowScheduler(1, 1) + packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + packetB := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + + batches := scheduler.ScheduleClientPackets([][]byte{packetA, packetB}) + if len(batches) != 1 || len(batches[0].Packets) != 1 { + t.Fatalf("batches = %#v, want one accepted packet", batches) + } + snapshot := scheduler.Snapshot() + if snapshot.Dropped != 1 || !snapshot.BackpressureActive { + t.Fatalf("snapshot = %+v, want one dropped packet and active backpressure", snapshot) + } +} + +func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + channelID := "flow-01" + + scheduler.RecordRouteSuccess(channelID, "route-fast", "node-b", 200*time.Microsecond) + snapshot := scheduler.Snapshot() + stat := snapshot.ChannelStats[channelID] + if stat.LastSendDurationMillis != 1 { + t.Fatalf("success last send duration = %d, want 1ms minimum for positive samples", stat.LastSendDurationMillis) + } + + scheduler.RecordRouteFailure(channelID, "route-fast", "node-b", mesh.ErrForwardPeerUnavailable, 300*time.Microsecond) + snapshot = scheduler.Snapshot() + stat = snapshot.ChannelStats[channelID] + if stat.LastSendDurationMillis != 1 { + t.Fatalf("failure last send duration = %d, want 1ms minimum for positive samples", stat.LastSendDurationMillis) + } +} + +func TestAdaptivePacketTransportReturnsRepliesToLastReceiveSide(t *testing.T) { + primary := &memoryPacketTransport{} + fallback := &memoryPacketTransport{recv: [][]byte{[]byte("request")}} + transport := &AdaptivePacketTransport{ + Primary: primary, + Fallback: fallback, + PrimaryTimeout: time.Millisecond, + } + packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) + if err != nil { + t.Fatalf("receive adaptive packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "request" { + t.Fatalf("packets = %#v", packets) + } + if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil { + t.Fatalf("send adaptive reply: %v", err) + } + if len(primary.sent) != 0 { + t.Fatalf("primary unexpectedly received reply: %#v", primary.sent) + } + if len(fallback.sent) != 1 || string(fallback.sent[0]) != "reply" { + t.Fatalf("fallback reply = %#v", fallback.sent) + } +} + +func TestAdaptivePacketTransportFallsBackWhenPreferredSendFails(t *testing.T) { + sendErr := errors.New("send failed") + primary := &memoryPacketTransport{sendErr: sendErr} + fallback := &memoryPacketTransport{} + transport := &AdaptivePacketTransport{Primary: primary, Fallback: fallback} + if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil { + t.Fatalf("send adaptive reply: %v", err) + } + if len(fallback.sent) != 1 || string(fallback.sent[0]) != "reply" { + t.Fatalf("fallback sent = %#v", fallback.sent) + } +} + +func TestFabricClientPacketIngressSendsClientPacketsOverSelectedRoute(t *testing.T) { + capture := &captureProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: capture, + Inbox: NewFabricPacketInbox(4), + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-entry-exit", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }} + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + if capture.nextNodeID != "relay-1" { + t.Fatalf("next node = %q", capture.nextNodeID) + } + if capture.envelope.CurrentHopNodeID != "relay-1" || capture.envelope.NextHopNodeID != "exit-1" { + t.Fatalf("envelope hop = %s -> %s, want relay-1 -> exit-1", capture.envelope.CurrentHopNodeID, capture.envelope.NextHopNodeID) + } + payload, err := mesh.DecodeProductionVPNPacketBatch(capture.envelope) + if err != nil { + t.Fatalf("decode envelope payload: %v", err) + } + if payload.Direction != FabricDirectionClientToGateway || payload.VPNConnectionID != "vpn-1" { + t.Fatalf("payload = %+v", payload) + } +} + +func TestFabricClientPacketIngressUsesLeasePreferredRouteBeforeConfigOrder(t *testing.T) { + capture := &captureProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: capture, + Inbox: NewFabricPacketInbox(4), + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-alternate-exit", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-b", + Hops: []string{"entry-1", "exit-b"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-primary-exit", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-a", + Hops: []string{"entry-1", "exit-a"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + ingress.PreferClientRoute("route-primary-exit") + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + if capture.envelope.RouteID != "route-primary-exit" || capture.nextNodeID != "exit-a" { + t.Fatalf("selected route = %s next=%s, want route-primary-exit via exit-a", capture.envelope.RouteID, capture.nextNodeID) + } +} + +func TestFabricClientPacketIngressTriesAlternateRouteBeforeBackendFallback(t *testing.T) { + transport := &failoverProductionTransport{failNextHop: "relay-bad"} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-bad", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-bad", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-good", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-good", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + if len(transport.calls) != 2 || transport.calls[0] != "relay-bad" || transport.calls[1] != "relay-good" { + t.Fatalf("route attempts = %#v, want relay-bad then relay-good", transport.calls) + } + if transport.envelope.RouteID != "route-good" || transport.envelope.CurrentHopNodeID != "relay-good" { + t.Fatalf("selected envelope = %+v", transport.envelope) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.SendRouteAttempts != 2 || snapshot.SendRouteFailures != 1 || snapshot.LastSelectedRouteID != "route-good" { + t.Fatalf("snapshot = %+v", snapshot) + } +} + +func TestFabricClientPacketIngressAvoidsChannelFailedRouteOnNextSend(t *testing.T) { + transport := &captureManyProductionTransport{} + scheduler := NewFabricFlowScheduler(8, 16) + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) + channelID := fabricFlowChannelID("vpn-1", shard) + scheduler.RecordRouteFailure(channelID, "route-bad", "relay-bad", mesh.ErrForwardPeerUnavailable, time.Millisecond) + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-bad", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-bad", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-good", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-good", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + if len(transport.calls) != 1 || transport.calls[0] != "relay-good" { + t.Fatalf("route calls = %#v, want relay-good first without retrying stale route", transport.calls) + } + snapshot := ingress.Snapshot("cluster-1") + stat := snapshot.FlowScheduler.ChannelStats[channelID] + if stat.LastRouteID != "route-good" || stat.ConsecutiveFailures != 0 || stat.RouteRebuildRecommended { + t.Fatalf("channel stat = %+v", stat) + } +} + +func TestFabricClientPacketIngressWithdrawsRouteAfterRebuildDecision(t *testing.T) { + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-bad", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-bad", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-good", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-good", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ + RouteID: "route-bad", + ReplacementRouteID: "route-good", + RebuildRequestID: "route-bad-rebuild", + RebuildStatus: "applied", + RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate", + RebuildAttempt: 3, + DecisionSource: "service_channel_feedback_replacement", + Generation: "config-v2", + }}, "config-v2", time.Now().UTC()) + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + if len(transport.calls) != 1 || transport.calls[0] != "relay-good" { + t.Fatalf("route calls = %#v, want only rebuild replacement route", transport.calls) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.RouteCandidateCount != 1 || + snapshot.RouteManager.RebuildRequestCount != 1 || + snapshot.RouteManager.RebuildAppliedCount != 1 || + snapshot.RouteManager.WithdrawnRouteCount != 1 { + t.Fatalf("snapshot route manager = %+v", snapshot.RouteManager) + } + if snapshot.LastSelectedRouteID != "route-good" { + t.Fatalf("selected route = %q, want route-good", snapshot.LastSelectedRouteID) + } +} + +func TestFabricClientPacketIngressPrefersControlPlaneReplacementOverConfigOrder(t *testing.T) { + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-bad", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-slow", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-standby", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-standby", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-fast", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-fast", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send initial packet batch: %v", err) + } + if ingress.Snapshot("cluster-1").LastSelectedRouteID != "route-bad" { + t.Fatalf("initial selected route = %q, want route-bad", ingress.Snapshot("cluster-1").LastSelectedRouteID) + } + + ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ + RouteID: "route-bad", + ReplacementRouteID: "route-fast", + RebuildRequestID: "route-bad-rebuild", + RebuildStatus: "applied", + RebuildReason: "service_channel_feedback_rebuild_applied_to_fastest_pool_route", + DecisionSource: "service_channel_feedback_replacement", + Generation: "config-v2", + }}, "config-v2", time.Now().UTC()) + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send packet batch after rebuild: %v", err) + } + if got := transport.envelopes[len(transport.envelopes)-1].RouteID; got != "route-fast" { + t.Fatalf("post-rebuild selected route = %q, want Control Plane replacement route-fast; calls=%v", got, transport.calls) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.LastSelectedRouteID != "route-fast" || snapshot.RouteCandidateCount != 2 { + t.Fatalf("post-rebuild snapshot = %+v", snapshot) + } +} + +func TestFabricClientPacketIngressRestoresWithdrawnRouteAfterFreshConfig(t *testing.T) { + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-bad", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-bad", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-good", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-good", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send initial packet batch: %v", err) + } + if ingress.Snapshot("cluster-1").LastSelectedRouteID != "route-bad" { + t.Fatalf("initial selected route = %q, want route-bad", ingress.Snapshot("cluster-1").LastSelectedRouteID) + } + + ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ + RouteID: "route-bad", + ReplacementRouteID: "route-good", + RebuildRequestID: "route-bad-rebuild", + RebuildStatus: "applied", + RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate", + RebuildAttempt: 4, + DecisionSource: "service_channel_feedback_replacement", + Generation: "config-v2", + }}, "config-v2", time.Now().UTC()) + snapshot := ingress.Snapshot("cluster-1") + if snapshot.RouteManagerTransition.Status != "applied_rebuild" || + snapshot.RouteManagerTransition.ClearedSelectedRouteID != "route-bad" || + snapshot.RouteManagerTransition.RebuildAppliedCount != 1 { + t.Fatalf("apply transition = %+v", snapshot.RouteManagerTransition) + } + if snapshot.RouteCandidateCount != 1 { + t.Fatalf("route candidate count after rebuild = %d, want 1", snapshot.RouteCandidateCount) + } + + ingress.UpdateRouteManager(nil, "config-v3", time.Now().UTC()) + snapshot = ingress.Snapshot("cluster-1") + if snapshot.RouteManagerTransition.Status != "restored_by_new_config" || + snapshot.RouteManagerTransition.RestoredRouteCount != 1 || + snapshot.RouteManager.WithdrawnRouteCount != 0 || + snapshot.RouteCandidateCount != 2 { + t.Fatalf("restore transition/snapshot = %+v / %+v", snapshot.RouteManagerTransition, snapshot.RouteManager) + } +} + +func TestFabricClientPacketIngressPendingDegradedFallbackWithdrawsRouteWithoutAlternate(t *testing.T) { + ingress := &FabricClientPacketIngress{ + ForwardTransport: &captureManyProductionTransport{}, + Inbox: NewFabricPacketInbox(4), + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-bad", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-bad", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }} + }, + } + ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ + RouteID: "route-bad", + RebuildRequestID: "route-bad-rebuild", + RebuildStatus: "pending_degraded_fallback", + RebuildReason: "service_channel_feedback_rebuild_pending_backend_relay", + RebuildAttempt: 2, + DecisionSource: "service_channel_feedback_no_alternate", + Generation: "config-v2", + }}, "config-v2", time.Now().UTC()) + + err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}) + if !errors.Is(err, mesh.ErrRouteNotFound) { + t.Fatalf("send error = %v, want route not found after pending degraded withdrawal", err) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.RouteCandidateCount != 0 || + snapshot.RouteManager.PendingFallbackCount != 1 || + snapshot.RouteManager.WithdrawnRouteCount != 1 || + snapshot.RouteManagerTransition.Status != "pending_degraded_fallback" { + t.Fatalf("pending fallback snapshot = %+v transition=%+v", snapshot.RouteManager, snapshot.RouteManagerTransition) + } +} + +func TestFabricClientPacketIngressMarksChannelForRebuildAfterRepeatedRouteFailures(t *testing.T) { + transport := &failoverProductionTransport{failNextHop: "relay-bad"} + scheduler := NewFabricFlowScheduler(8, 16) + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) + channelID := fabricFlowChannelID("vpn-1", shard) + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-bad-1", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-bad", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-bad-2", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-bad", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err == nil { + t.Fatalf("send unexpectedly succeeded") + } + snapshot := ingress.Snapshot("cluster-1") + stat := snapshot.FlowScheduler.ChannelStats[channelID] + if !stat.RouteRebuildRecommended || !stat.DegradedFallbackRecommended || stat.ConsecutiveFailures < 2 { + t.Fatalf("channel stat = %+v, want rebuild and degraded fallback recommendation", stat) + } +} + +func TestFabricClientPacketIngressSplitsIndependentFlowsIntoLogicalChannels(t *testing.T) { + transport := &captureManyProductionTransport{} + scheduler := NewFabricFlowScheduler(8, 16) + packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + packetB := packetWithDifferentShard(packetA, 8) + expectedChannels := expectedScheduledChannelCount(scheduler, [][]byte{packetA, packetB}) + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-entry-exit", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }} + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + if len(transport.envelopes) != expectedChannels { + t.Fatalf("envelopes = %d, want %d", len(transport.envelopes), expectedChannels) + } + for _, envelope := range transport.envelopes { + payload, err := mesh.DecodeProductionVPNPacketBatch(envelope) + if err != nil { + t.Fatalf("decode envelope: %v", err) + } + if len(payload.Packets) == 0 { + t.Fatalf("empty logical channel envelope") + } + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.SendFlowBatches != uint64(expectedChannels) || snapshot.FlowScheduler.Enqueued != 2 || snapshot.FlowScheduler.Dequeued != 2 { + t.Fatalf("snapshot = %+v", snapshot) + } +} + +func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *testing.T) { + transport := &captureManyProductionTransport{} + scheduler := NewFabricFlowScheduler(8, 16) + packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + packetB := packetWithDifferentShard(packetA, 8) + _, shardA := classifyPacketFlow(packetA, scheduler.shardCountValue()) + _, shardB := classifyPacketFlow(packetB, scheduler.shardCountValue()) + channelA := fabricFlowChannelID("vpn-1", shardA) + channelB := fabricFlowChannelID("vpn-1", shardB) + if channelA == channelB { + t.Fatalf("test packets mapped to same channel %s", channelA) + } + scheduler.RecordRouteFailure(channelA, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, time.Millisecond) + scheduler.RecordRouteSuccess(channelB, "route-primary", "relay-primary", time.Millisecond) + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-primary", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-alternate", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-alternate", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + if len(transport.envelopes) != 2 { + t.Fatalf("envelopes = %d, want 2 independent logical channel sends", len(transport.envelopes)) + } + routeByPacket := map[string]string{} + for _, envelope := range transport.envelopes { + payload, err := mesh.DecodeProductionVPNPacketBatch(envelope) + if err != nil { + t.Fatalf("decode envelope: %v", err) + } + if len(payload.Packets) != 1 { + t.Fatalf("payload packets = %d, want one packet per logical channel", len(payload.Packets)) + } + routeByPacket[string(payload.Packets[0])] = envelope.RouteID + } + if routeByPacket[string(packetA)] != "route-alternate" { + t.Fatalf("channel A route = %q, want alternate after isolated failure", routeByPacket[string(packetA)]) + } + if routeByPacket[string(packetB)] != "route-primary" { + t.Fatalf("channel B route = %q, want primary route memory preserved", routeByPacket[string(packetB)]) + } + snapshot := ingress.Snapshot("cluster-1") + statA := snapshot.FlowScheduler.ChannelStats[channelA] + statB := snapshot.FlowScheduler.ChannelStats[channelB] + if statA.LastRouteID != "route-alternate" || statA.LastFailedRouteID != "" || statA.ConsecutiveFailures != 0 { + t.Fatalf("channel A stat = %+v, want recovered on alternate route", statA) + } + if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { + t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB) + } + if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 || snapshot.FlowScheduler.HighWatermark > 1 { + t.Fatalf("bounded scheduler telemetry = %+v send_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped) + } +} + +func TestFabricClientPacketIngressIsolatesRouteMemoryPerVPNConnection(t *testing.T) { + transport := &captureManyProductionTransport{} + scheduler := NewFabricFlowScheduler(8, 16) + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + channelA := testFlowChannelID("vpn-a", packet, scheduler.shardCountValue()) + channelB := testFlowChannelID("vpn-b", packet, scheduler.shardCountValue()) + if channelA == channelB { + t.Fatalf("session-scoped channels collapsed to %s", channelA) + } + scheduler.RecordRouteFailure(channelA, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, time.Millisecond) + scheduler.RecordRouteSuccess(channelB, "route-primary", "relay-primary", time.Millisecond) + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-primary", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-alternate", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-alternate", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-a", [][]byte{packet}); err != nil { + t.Fatalf("send vpn-a packet: %v", err) + } + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-b", [][]byte{packet}); err != nil { + t.Fatalf("send vpn-b packet: %v", err) + } + if len(transport.envelopes) != 2 { + t.Fatalf("envelopes = %d, want one send per vpn session", len(transport.envelopes)) + } + if transport.envelopes[0].RouteID != "route-alternate" { + t.Fatalf("vpn-a route = %s, want alternate after session-local failure", transport.envelopes[0].RouteID) + } + if transport.envelopes[1].RouteID != "route-primary" { + t.Fatalf("vpn-b route = %s, want primary preserved from separate session memory", transport.envelopes[1].RouteID) + } + snapshot := ingress.Snapshot("cluster-1") + statA := snapshot.FlowScheduler.ChannelStats[channelA] + statB := snapshot.FlowScheduler.ChannelStats[channelB] + if statA.LastRouteID != "route-alternate" || statA.LastFailedRouteID != "" { + t.Fatalf("vpn-a stat = %+v, want recovered alternate route without leaking failure", statA) + } + if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" { + t.Fatalf("vpn-b stat = %+v, want primary route preserved", statB) + } + if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { + t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped) + } +} + +func TestFabricClientPacketIngressParallelFlowWindowDoesNotBlockIndependentChannel(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 16) + slowPacket, fastPacket := packetsForOrderedDistinctChannels(scheduler.shardCountValue()) + transport := &blockingProductionTransport{ + slowPort: packetSourcePort(slowPacket), + slowStarted: make(chan struct{}), + releaseSlow: make(chan struct{}), + fastDone: make(chan struct{}), + } + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + MaxParallelFlowSends: 2, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-primary", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }} + }, + } + + done := make(chan error, 1) + go func() { + done <- ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{slowPacket, fastPacket}) + }() + select { + case <-transport.slowStarted: + case err := <-done: + t.Fatalf("send completed before slow channel started: %v", err) + case <-time.After(time.Second): + t.Fatalf("timed out waiting for slow channel to start") + } + select { + case <-transport.fastDone: + case err := <-done: + t.Fatalf("send completed before independent fast channel completed: %v", err) + case <-time.After(200 * time.Millisecond): + t.Fatalf("fast independent channel was blocked behind slow channel") + } + close(transport.releaseSlow) + if err := <-done; err != nil { + t.Fatalf("parallel send returned error: %v", err) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.SendFlowParallel != 1 || snapshot.MaxParallelFlowSends != 2 { + t.Fatalf("parallel telemetry = batches:%d max:%d", snapshot.SendFlowParallel, snapshot.MaxParallelFlowSends) + } + if snapshot.RecommendedParallelFlowSends != 2 || snapshot.FlowScheduler.MaxInFlight != 2 || snapshot.FlowScheduler.InFlight != 0 { + t.Fatalf("window/in-flight telemetry = recommended:%d in_flight:%d max_in_flight:%d", snapshot.RecommendedParallelFlowSends, snapshot.FlowScheduler.InFlight, snapshot.FlowScheduler.MaxInFlight) + } + if snapshot.SendFlowBatches != 2 || snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { + t.Fatalf("flow telemetry = %+v send_flow_batches=%d send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowBatches, snapshot.SendFlowDropped) + } + for channelID, stat := range snapshot.FlowScheduler.ChannelStats { + if stat.SendAttempts != 1 || stat.SendSuccesses != 1 || stat.SendFailures != 0 || stat.MaxInFlight != 1 { + t.Fatalf("channel %s stat = %+v, want one successful in-flight send", channelID, stat) + } + if stat.LatencyLe10Millis+stat.LatencyLe100Millis+stat.LatencyLe1000Millis+stat.LatencyGt1000Millis != 1 { + t.Fatalf("channel %s latency buckets = %+v, want one sample", channelID, stat) + } + } +} + +func TestFabricClientPacketIngressInteractiveClassCompletesWhileBulkInFlight(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 16) + bulkPacket, interactivePacket := packetsForOrderedDistinctChannels(scheduler.shardCountValue()) + transport := &blockingProductionTransport{ + slowPort: packetSourcePort(bulkPacket), + slowStarted: make(chan struct{}), + releaseSlow: make(chan struct{}), + fastDone: make(chan struct{}), + } + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + MaxParallelFlowSends: 1, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-primary", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }} + }, + } + + bulkDone := make(chan error, 1) + go func() { + bulkDone <- ingress.SendClientPacketBatchWithTrafficClass(context.Background(), "cluster-1", "vpn-1", FabricTrafficClassBulk, [][]byte{bulkPacket}) + }() + select { + case <-transport.slowStarted: + case err := <-bulkDone: + t.Fatalf("bulk send completed before blocking: %v", err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for bulk send to block") + } + + interactiveDone := make(chan error, 1) + go func() { + interactiveDone <- ingress.SendClientPacketBatchWithTrafficClass(context.Background(), "cluster-1", "vpn-1", FabricTrafficClassInteractive, [][]byte{interactivePacket}) + }() + select { + case err := <-interactiveDone: + if err != nil { + t.Fatalf("interactive send returned error while bulk in flight: %v", err) + } + case <-time.After(200 * time.Millisecond): + t.Fatal("interactive traffic-class send was blocked behind in-flight bulk") + } + select { + case <-transport.fastDone: + default: + t.Fatal("interactive transport send did not complete") + } + close(transport.releaseSlow) + if err := <-bulkDone; err != nil { + t.Fatalf("bulk send returned error after release: %v", err) + } + + bulkChannel := testFlowChannelID("vpn-1", bulkPacket, scheduler.shardCountValue()) + interactiveChannel := fabricFlowChannelIDForClass("vpn-1", FabricTrafficClassInteractive, packetShard(interactivePacket, scheduler.shardCountValue())) + snapshot := ingress.Snapshot("cluster-1") + if snapshot.FlowScheduler.MaxInFlight < 2 { + t.Fatalf("max in-flight = %d, want concurrent bulk+interactive", snapshot.FlowScheduler.MaxInFlight) + } + bulkStat := snapshot.FlowScheduler.ChannelStats[bulkChannel] + interactiveStat := snapshot.FlowScheduler.ChannelStats[interactiveChannel] + if bulkStat.TrafficClass != FabricTrafficClassBulk || bulkStat.SendSuccesses != 1 || bulkStat.SendFailures != 0 { + t.Fatalf("bulk stat = %+v", bulkStat) + } + if interactiveStat.TrafficClass != FabricTrafficClassInteractive || interactiveStat.SendSuccesses != 1 || interactiveStat.SendFailures != 0 { + t.Fatalf("interactive stat = %+v", interactiveStat) + } + if snapshot.FlowScheduler.TrafficClassCounts[FabricTrafficClassBulk] != 1 || snapshot.FlowScheduler.TrafficClassCounts[FabricTrafficClassInteractive] != 1 { + t.Fatalf("traffic class counts = %+v", snapshot.FlowScheduler.TrafficClassCounts) + } + if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { + t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped) + } +} + +func TestFabricFlowSchedulerRecommendsSmallerWindowUnderPressure(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 1) + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + if got := scheduler.RecommendedParallelSendWindow(4); got != 4 { + t.Fatalf("clean recommended window = %d, want 4", got) + } + scheduled := scheduler.ScheduleClientPacketsForConnection("vpn-1", [][]byte{packet, packetWithSameShard(packet, scheduler.shardCountValue())}) + if len(scheduled) != 1 { + t.Fatalf("scheduled = %d, want one accepted channel after bounded drop", len(scheduled)) + } + if got := scheduler.RecommendedParallelSendWindow(4); got != 1 { + t.Fatalf("drop-pressure recommended window = %d, want 1", got) + } + channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) + scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 2500*time.Millisecond) + snapshot := scheduler.Snapshot() + stat := snapshot.ChannelStats[channelID] + if snapshot.FailingChannelCount != 1 || snapshot.SlowChannelCount != 1 { + t.Fatalf("pressure counts = failing:%d slow:%d stat=%+v", snapshot.FailingChannelCount, snapshot.SlowChannelCount, stat) + } + if stat.SendFailures != 1 || stat.LatencyGt1000Millis != 1 || !stat.RouteRebuildRecommended { + t.Fatalf("failure/latency stat = %+v, want retry-window telemetry", stat) + } + if stat.QualityWindowDropCount != 1 || stat.QualityWindowFailureCount != 1 || stat.QualityWindowSlowCount != 1 { + t.Fatalf("rolling quality stat = %+v, want fresh drop+failure+slow sample", stat) + } +} + +func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testing.T) { + scheduler := NewFabricFlowScheduler(32, 16) + bulkPackets := packetsForDistinctShards(16, scheduler.shardCountValue()) + if len(bulkPackets) != 16 { + t.Fatalf("bulk packet count = %d, want 16 distinct shards", len(bulkPackets)) + } + interactivePacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 61000, 3389) + if scheduled := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassBulk, bulkPackets); len(scheduled) != 16 { + t.Fatalf("bulk scheduled = %d, want 16", len(scheduled)) + } + if scheduled := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassInteractive, [][]byte{interactivePacket}); len(scheduled) != 1 { + t.Fatalf("interactive scheduled = %d, want 1", len(scheduled)) + } + if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassBulk, 4); got != 1 { + t.Fatalf("bulk adaptive window = %d, want 1", got) + } + if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassInteractive, 4); got != 4 { + t.Fatalf("interactive adaptive window = %d, want 4", got) + } + snapshot := scheduler.Snapshot() + if !snapshot.AdaptiveBackpressureActive || snapshot.AdaptiveBackpressureReason != "bulk_window_reduced_to_protect_interactive" { + t.Fatalf("adaptive snapshot = %+v", snapshot) + } + if snapshot.RecommendedParallelWindows[FabricTrafficClassBulk] != 1 || snapshot.RecommendedParallelWindows[FabricTrafficClassInteractive] != 4 { + t.Fatalf("recommended class windows = %+v", snapshot.RecommendedParallelWindows) + } + if snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 16 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 { + t.Fatalf("traffic class counts = %+v", snapshot.TrafficClassCounts) + } +} + +func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + channelID := fabricFlowChannelID("vpn-1", 0) + + scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 1500*time.Millisecond) + scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 1500*time.Millisecond) + if got := scheduler.RecommendedParallelSendWindow(4); got >= 4 { + t.Fatalf("pressure recommended window = %d, want reduced", got) + } + + for i := 0; i < defaultFabricFlowQualityWindowCapacity; i++ { + scheduler.RecordRouteSuccess(channelID, "route-primary", "relay-primary", time.Millisecond) + } + if got := scheduler.RecommendedParallelSendWindow(4); got != 4 { + t.Fatalf("fresh-success recommended window = %d, want 4", got) + } + snapshot := scheduler.Snapshot() + stat := snapshot.ChannelStats[channelID] + if stat.SendFailures != 2 || stat.SendSuccesses != defaultFabricFlowQualityWindowCapacity { + t.Fatalf("lifetime counters = failures:%d successes:%d", stat.SendFailures, stat.SendSuccesses) + } + if stat.QualityWindowSampleCount != defaultFabricFlowQualityWindowCapacity || stat.QualityWindowFailureCount != 0 || stat.QualityWindowSuccessCount != defaultFabricFlowQualityWindowCapacity { + t.Fatalf("rolling quality stat = %+v, want old failures rolled out", stat) + } + if snapshot.FailingChannelCount != 0 || snapshot.SlowChannelCount != 0 || snapshot.BackpressureActive { + t.Fatalf("rolling pressure snapshot = %+v, want clean fresh window", snapshot) + } +} + +func TestFabricClientPacketIngressReportsBoundedBackpressurePerLogicalChannel(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 1) + packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + packetB := packetWithSameShard(packetA, 8) + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(4), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-primary", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }} + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil { + t.Fatalf("send client packet batch: %v", err) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.FlowScheduler.QueueCapacity != 1 || snapshot.FlowScheduler.Dropped != 1 || snapshot.SendFlowDropped != 1 { + t.Fatalf("drop telemetry = %+v send_dropped=%d, want one bounded drop", snapshot.FlowScheduler, snapshot.SendFlowDropped) + } + if snapshot.FlowScheduler.HighWatermark != 1 || len(transport.envelopes) != 1 { + t.Fatalf("high watermark/envelopes = %d/%d, want bounded single queued send", snapshot.FlowScheduler.HighWatermark, len(transport.envelopes)) + } +} + +func TestFabricClientPacketIngressBoundedLoadRebuildsAwayFromWithdrawnRoute(t *testing.T) { + scheduler := NewFabricFlowScheduler(16, 8) + packets := packetsForDistinctShards(12, scheduler.shardCountValue()) + if len(packets) != 12 { + t.Fatalf("distinct packet count = %d", len(packets)) + } + channels := map[string]struct{}{} + for _, packet := range packets { + _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) + channelID := fabricFlowChannelID("vpn-1", shard) + channels[channelID] = struct{}{} + scheduler.RecordRouteSuccess(channelID, "route-primary", "relay-primary", time.Millisecond) + } + transport := &routeFailingProductionTransport{failRouteID: "route-primary"} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(32), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-primary", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-alternate", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-alternate", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil { + t.Fatalf("send first load batch: %v", err) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.SendFlowBatches != uint64(len(channels)) || snapshot.FlowScheduler.Dropped != 0 || snapshot.FlowScheduler.HighWatermark != 1 { + t.Fatalf("first load scheduler snapshot = %+v", snapshot.FlowScheduler) + } + if snapshot.SendRouteFailures != uint64(len(channels)) { + t.Fatalf("route failures = %d, want %d", snapshot.SendRouteFailures, len(channels)) + } + if transport.callsByRoute["route-primary"] != len(channels) || transport.callsByRoute["route-alternate"] != len(channels) { + t.Fatalf("route calls = %+v, want primary and alternate per channel", transport.callsByRoute) + } + for channelID, stat := range snapshot.FlowScheduler.ChannelStats { + if _, ok := channels[channelID]; !ok { + continue + } + if stat.LastRouteID != "route-alternate" || stat.LastFailedRouteID != "" || stat.ConsecutiveFailures != 0 { + t.Fatalf("channel %s stat after first load = %+v", channelID, stat) + } + } + + ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ + RouteID: "route-primary", + ReplacementRouteID: "route-alternate", + RebuildRequestID: "rebuild-load-1", + RebuildStatus: "applied", + RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate", + DecisionSource: "service_channel_feedback_replacement", + Generation: "c18z-load", + EffectiveHops: []string{"entry-1", "relay-alternate", "exit-1"}, + }}, "c18z-load", time.Now().UTC()) + + primaryCallsBefore := transport.callsByRoute["route-primary"] + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil { + t.Fatalf("send second load batch: %v", err) + } + snapshot = ingress.Snapshot("cluster-1") + if transport.callsByRoute["route-primary"] != primaryCallsBefore { + t.Fatalf("primary route was retried after withdrawal: before=%d after=%d calls=%+v", primaryCallsBefore, transport.callsByRoute["route-primary"], transport.callsByRoute) + } + if snapshot.RouteCandidateCount != 1 || snapshot.RouteManager.WithdrawnRouteCount != 1 || snapshot.RouteManagerTransition.Status != "applied_rebuild" { + t.Fatalf("route manager snapshot = manager:%+v transition:%+v candidates=%d", snapshot.RouteManager, snapshot.RouteManagerTransition, snapshot.RouteCandidateCount) + } + if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 || snapshot.FlowScheduler.HighWatermark > 1 { + t.Fatalf("second load scheduler snapshot = %+v send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped) + } +} + +func TestFabricClientPacketIngressQualityPreferenceOverridesStickyRoute(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) + _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) + channelID := fabricFlowChannelID("vpn-1", shard) + scheduler.RecordRouteSuccess(channelID, "route-slow", "relay-slow", time.Millisecond) + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-slow", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-slow", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-fast", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ + RouteID: "route-fast", + FeedbackStatus: "healthy", + ScoreAdjustment: 90, + Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, + LastSendDurationMs: 1, + ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), + }}, time.Now().UTC()) + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send packet: %v", err) + } + if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-fast" { + t.Fatalf("route = %+v, want quality-preferred fast route over sticky slow route", transport.envelopes) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.RouteQualityPreferenceCount != 1 { + t.Fatalf("quality preference count = %d, want 1", snapshot.RouteQualityPreferenceCount) + } + stat := snapshot.FlowScheduler.ChannelStats[channelID] + if stat.LastRouteID != "route-fast" || stat.LastFailedRouteID != "" { + t.Fatalf("flow stat = %+v, want moved to fast healthy route", stat) + } + if stat.QualityPreferenceRouteID != "route-fast" || stat.QualityPreferenceScore != 90 || !containsString(stat.QualityPreferenceReasons, "service_channel_quality_latency_le_10ms") { + t.Fatalf("quality preference stat = %+v, want applied fast route preference", stat) + } +} + +func TestFabricClientPacketIngressQualityPreferenceUsesEffectiveScore(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + packet := testIPv4TCPPacket([4]byte{10, 78, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389) + _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) + channelID := fabricFlowChannelID("vpn-1", shard) + scheduler.RecordRouteSuccess(channelID, "route-sticky", "relay-sticky", time.Millisecond) + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-sticky", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-sticky", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-decayed-fast", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ + RouteID: "route-decayed-fast", + FeedbackStatus: "healthy", + ScoreAdjustment: 20, + RawScoreAdjustment: 90, + Reasons: []string{"service_channel_recent_success", "service_channel_feedback_age_decay"}, + LastSendDurationMs: 1, + ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), + }}, time.Now().UTC()) + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send packet: %v", err) + } + if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-sticky" { + t.Fatalf("route = %+v, want decayed quality score to preserve sticky route", transport.envelopes) + } + snapshot := ingress.Snapshot("cluster-1") + if len(snapshot.RouteQualityPreferences) != 1 { + t.Fatalf("quality preferences = %+v, want one visible preference", snapshot.RouteQualityPreferences) + } + preference := snapshot.RouteQualityPreferences[0] + if preference.ScoreAdjustment != 20 || preference.RawScoreAdjustment != 90 || !containsString(preference.Reasons, "service_channel_feedback_age_decay") { + t.Fatalf("preference snapshot = %+v, want effective/raw score and decay reason", preference) + } +} + +func TestFabricClientPacketIngressQualityPreferencePreservesMultiChannelFairness(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + firstPacket := testIPv4TCPPacket([4]byte{10, 79, 0, 2}, [4]byte{192, 168, 200, 95}, 51002, 3389) + secondPacket := packetWithDifferentShard(firstPacket, scheduler.shardCountValue()) + firstChannel := testFlowChannelID("vpn-1", firstPacket, scheduler.shardCountValue()) + secondChannel := testFlowChannelID("vpn-1", secondPacket, scheduler.shardCountValue()) + if firstChannel == secondChannel { + t.Fatalf("test packets unexpectedly map to same channel %s", firstChannel) + } + scheduler.RecordRouteSuccess(firstChannel, "route-slow", "relay-slow", time.Millisecond) + scheduler.RecordRouteSuccess(secondChannel, "route-slow", "relay-slow", time.Millisecond) + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(16), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-slow", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-slow", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-fast", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ + RouteID: "route-fast", + FeedbackStatus: "healthy", + ScoreAdjustment: 90, + RawScoreAdjustment: 90, + Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, + LastSendDurationMs: 1, + ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), + }}, time.Now().UTC()) + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{firstPacket, secondPacket}); err != nil { + t.Fatalf("send packet batch: %v", err) + } + if len(transport.envelopes) != 2 { + t.Fatalf("envelope count = %d, want one envelope per logical channel", len(transport.envelopes)) + } + for _, envelope := range transport.envelopes { + if envelope.RouteID != "route-fast" { + t.Fatalf("envelope route = %s, want quality-preferred fast route", envelope.RouteID) + } + } + snapshot := ingress.Snapshot("cluster-1") + for _, channelID := range []string{firstChannel, secondChannel} { + stat := snapshot.FlowScheduler.ChannelStats[channelID] + if stat.LastRouteID != "route-fast" || stat.Served != 1 || stat.Dropped != 0 { + t.Fatalf("channel %s stat = %+v, want fair fast-route service without drops", channelID, stat) + } + if stat.QualityPreferenceRouteID != "route-fast" || stat.QualityPreferenceScore != 90 { + t.Fatalf("channel %s quality stat = %+v, want applied quality preference", channelID, stat) + } + } + if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { + t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped) + } +} + +func TestFabricClientPacketIngressClearsStaleQualityPreferenceMarkers(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + packet := testIPv4TCPPacket([4]byte{10, 80, 0, 2}, [4]byte{192, 168, 200, 95}, 51003, 3389) + channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-slow", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-slow", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-fast", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + ingress.PreferClientRoute("route-slow") + ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ + RouteID: "route-fast", + FeedbackStatus: "healthy", + ScoreAdjustment: 90, + RawScoreAdjustment: 90, + Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, + LastSendDurationMs: 1, + ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), + }}, time.Now().UTC()) + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send packet: %v", err) + } + stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID] + if stat.QualityPreferenceRouteID != "route-fast" { + t.Fatalf("quality preference marker = %+v, want route-fast", stat) + } + + ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ + RouteID: "route-fast", + FeedbackStatus: "healthy", + ScoreAdjustment: 90, + RawScoreAdjustment: 90, + ExpiresAt: time.Now().UTC().Add(-time.Second).Format(time.RFC3339Nano), + }}, time.Now().UTC()) + snapshot := ingress.Snapshot("cluster-1") + if snapshot.RouteQualityPreferenceCount != 0 { + t.Fatalf("quality preference count = %d, want expired preference removed", snapshot.RouteQualityPreferenceCount) + } + stat = snapshot.FlowScheduler.ChannelStats[channelID] + if stat.QualityPreferenceRouteID != "" || stat.QualityPreferenceScore != 0 || len(stat.QualityPreferenceReasons) != 0 { + t.Fatalf("stale quality marker = %+v, want cleared", stat) + } +} + +func TestFabricClientPacketIngressClearsWithdrawnQualityPreferenceMarkers(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + packet := testIPv4TCPPacket([4]byte{10, 81, 0, 2}, [4]byte{192, 168, 200, 95}, 51004, 3389) + channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{ + { + RouteID: "route-slow", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-slow", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + { + RouteID: "route-fast", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }, + } + }, + } + ingress.PreferClientRoute("route-slow") + ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ + RouteID: "route-fast", + FeedbackStatus: "healthy", + ScoreAdjustment: 90, + RawScoreAdjustment: 90, + Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, + LastSendDurationMs: 1, + ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), + }}, time.Now().UTC()) + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send packet: %v", err) + } + ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ + RouteID: "route-fast", + ReplacementRouteID: "route-slow", + RebuildStatus: "applied", + DecisionSource: "test_withdraw_quality_route", + }}, "config-withdraw-quality", time.Now().UTC()) + stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID] + if stat.QualityPreferenceRouteID != "" || stat.QualityPreferenceScore != 0 { + t.Fatalf("withdrawn quality marker = %+v, want cleared", stat) + } +} + +func TestFabricClientPacketIngressReportsRoutePolicyProvenance(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + packet := testIPv4TCPPacket([4]byte{10, 91, 0, 2}, [4]byte{192, 168, 200, 95}, 51014, 443) + channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(8), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + RecoveryPolicyFingerprint: "policy-fp-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + RouteVersion: "route-v1", + PolicyVersion: "policy-v1", + }} + }, + } + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { + t.Fatalf("send packet: %v", err) + } + stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID] + if stat.LastRouteID != "route-primary" || + stat.RoutePolicyVersion != "policy-v1" || + stat.RouteGeneration != "policy-v1" || + stat.RecoveryPolicyFingerprint != "policy-fp-1" { + t.Fatalf("route provenance stat = %+v", stat) + } +} + +func TestFabricClientPacketIngressBoundedLoadReportsPerChannelDrops(t *testing.T) { + scheduler := NewFabricFlowScheduler(8, 8) + packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443) + packets := make([][]byte, 0, 32) + for i := 0; i < 32; i++ { + packets = append(packets, packetWithSameShard(packet, scheduler.shardCountValue())) + } + transport := &captureManyProductionTransport{} + ingress := &FabricClientPacketIngress{ + ForwardTransport: transport, + Inbox: NewFabricPacketInbox(32), + FlowScheduler: scheduler, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + Routes: func() []mesh.SyntheticRoute { + return []mesh.SyntheticRoute{{ + RouteID: "route-primary", + ClusterID: "cluster-1", + SourceNodeID: "entry-1", + DestinationNodeID: "exit-1", + Hops: []string{"entry-1", "relay-primary", "exit-1"}, + AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, + ExpiresAt: time.Now().UTC().Add(time.Minute), + MaxTTL: 8, + }} + }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil { + t.Fatalf("send congested load batch: %v", err) + } + snapshot := ingress.Snapshot("cluster-1") + if snapshot.FlowScheduler.QueueCapacity != 8 || snapshot.FlowScheduler.HighWatermark != 8 { + t.Fatalf("capacity/high watermark = %d/%d, want 8/8", snapshot.FlowScheduler.QueueCapacity, snapshot.FlowScheduler.HighWatermark) + } + if snapshot.FlowScheduler.Dropped != 24 || snapshot.SendFlowDropped != 24 || !snapshot.FlowScheduler.BackpressureActive { + t.Fatalf("drop telemetry = %+v send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped) + } + if snapshot.SendFlowPackets != 8 || len(transport.envelopes) != 1 { + t.Fatalf("sent packets/envelopes = %d/%d, want bounded 8/1", snapshot.SendFlowPackets, len(transport.envelopes)) + } +} + +func TestFabricClientPacketIngressUsesLocalGatewayShortcutWithoutRoute(t *testing.T) { + inbox := NewFabricPacketInbox(4) + ingress := &FabricClientPacketIngress{ + Inbox: inbox, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + LocalGateway: func(vpnConnectionID string) bool { + return vpnConnectionID == "vpn-1" + }, + Routes: func() []mesh.SyntheticRoute { return nil }, + } + + if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { + t.Fatalf("send local gateway packet: %v", err) + } + packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second) + if err != nil { + t.Fatalf("receive local gateway packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "packet" { + t.Fatalf("packets = %#v", packets) + } +} + +func TestFabricClientPacketIngressReceivesLocalGatewayReplyWithoutRoute(t *testing.T) { + inbox := NewFabricPacketInbox(4) + ingress := &FabricClientPacketIngress{ + Inbox: inbox, + ClusterID: "cluster-1", + LocalNodeID: "entry-1", + LocalGateway: func(vpnConnectionID string) bool { + return vpnConnectionID == "vpn-1" + }, + Routes: func() []mesh.SyntheticRoute { return nil }, + } + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{[]byte("reply")}); err != nil { + t.Fatalf("deliver local gateway reply: %v", err) + } + + packets, err := ingress.ReceiveClientPacketBatch(context.Background(), "cluster-1", "vpn-1", time.Second) + if err != nil { + t.Fatalf("receive local gateway reply: %v", err) + } + if len(packets) != 1 || string(packets[0]) != "reply" { + t.Fatalf("packets = %#v", packets) + } +} + +func testIPv4TCPPacket(src [4]byte, dst [4]byte, srcPort uint16, dstPort uint16) []byte { + packet := make([]byte, 40) + packet[0] = 0x45 + packet[2] = 0 + packet[3] = 40 + packet[8] = 64 + packet[9] = 6 + copy(packet[12:16], src[:]) + copy(packet[16:20], dst[:]) + packet[20] = byte(srcPort >> 8) + packet[21] = byte(srcPort) + packet[22] = byte(dstPort >> 8) + packet[23] = byte(dstPort) + packet[32] = 0x50 + return packet +} + +func packetWithDifferentShard(reference []byte, shardCount int) []byte { + _, referenceShard := classifyPacketFlow(reference, shardCount) + for port := uint16(10000); port < 11000; port++ { + candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443) + _, shard := classifyPacketFlow(candidate, shardCount) + if shard != referenceShard { + return candidate + } + } + return testIPv4TCPPacket([4]byte{10, 77, 0, 3}, [4]byte{192, 168, 200, 96}, 52000, 443) +} + +func packetShard(packet []byte, shardCount int) int { + _, shard := classifyPacketFlow(packet, shardCount) + return shard +} + +func packetSourcePort(packet []byte) uint16 { + if len(packet) < 22 { + return 0 + } + return uint16(packet[20])<<8 | uint16(packet[21]) +} + +func testFlowChannelID(vpnConnectionID string, packet []byte, shardCount int) string { + return fabricFlowChannelID(vpnConnectionID, packetShard(packet, shardCount)) +} + +func packetWithSameShard(reference []byte, shardCount int) []byte { + _, referenceShard := classifyPacketFlow(reference, shardCount) + for port := uint16(11000); port < 12000; port++ { + candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 3389) + _, shard := classifyPacketFlow(candidate, shardCount) + if shard == referenceShard { + return candidate + } + } + return append([]byte{}, reference...) +} + +func packetsForDistinctShards(count int, shardCount int) [][]byte { + packets := make([][]byte, 0, count) + seen := map[int]struct{}{} + for port := uint16(10000); port < 65000 && len(packets) < count; port++ { + candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443) + _, shard := classifyPacketFlow(candidate, shardCount) + if _, ok := seen[shard]; ok { + continue + } + seen[shard] = struct{}{} + packets = append(packets, candidate) + } + return packets +} + +func packetsForOrderedDistinctChannels(shardCount int) ([]byte, []byte) { + packets := packetsForDistinctShards(shardCount, shardCount) + if len(packets) < 2 { + panic("not enough distinct channel packets") + } + for _, left := range packets { + leftChannel := testFlowChannelID("vpn-1", left, shardCount) + for _, right := range packets { + rightChannel := testFlowChannelID("vpn-1", right, shardCount) + if leftChannel < rightChannel { + return left, right + } + } + } + panic("failed to find ordered distinct channel packets") +} + +func expectedScheduledChannelCount(scheduler *FabricFlowScheduler, packets [][]byte) int { + channels := map[string]struct{}{} + for _, packet := range packets { + _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) + channels[fmt.Sprintf("flow-%02d", shard)] = struct{}{} + } + return len(channels) +} + +func closeOnce(ch chan struct{}) { + defer func() { _ = recover() }() + close(ch) +}