package vpnruntime import ( "context" "errors" "fmt" "strings" "sync" "testing" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh" ) type captureProductionTransport struct { nextNodeID string envelope mesh.ProductionEnvelope } func (t *captureProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { t.nextNodeID = nextNodeID t.envelope = envelope return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil } type failoverProductionTransport struct { failNextHop string calls []string envelope mesh.ProductionEnvelope } func (t *failoverProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { t.calls = append(t.calls, nextNodeID) if nextNodeID == t.failNextHop { return mesh.ProductionForwardResult{}, mesh.ErrForwardPeerUnavailable } t.envelope = envelope return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil } type captureManyProductionTransport struct { calls []string envelopes []mesh.ProductionEnvelope } func (t *captureManyProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { t.calls = append(t.calls, nextNodeID) t.envelopes = append(t.envelopes, envelope) return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil } type routeFailingProductionTransport struct { failRouteID string callsByRoute map[string]int envelopes []mesh.ProductionEnvelope } func (t *routeFailingProductionTransport) SendProduction(_ context.Context, _ string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { if t.callsByRoute == nil { t.callsByRoute = map[string]int{} } t.callsByRoute[envelope.RouteID]++ if envelope.RouteID == t.failRouteID { return mesh.ProductionForwardResult{}, mesh.ErrForwardPeerUnavailable } t.envelopes = append(t.envelopes, envelope) return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil } type blockingProductionTransport struct { mu sync.Mutex calls []string envelopes []mesh.ProductionEnvelope slowPort uint16 slowStarted chan struct{} releaseSlow chan struct{} fastDone chan struct{} } func (t *blockingProductionTransport) SendProduction(ctx context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { payload, err := mesh.DecodeProductionVPNPacketBatch(envelope) if err != nil { return mesh.ProductionForwardResult{}, err } isSlow := false for _, packet := range payload.Packets { if packetSourcePort(packet) == t.slowPort { isSlow = true break } } if isSlow { closeOnce(t.slowStarted) select { case <-t.releaseSlow: case <-ctx.Done(): return mesh.ProductionForwardResult{}, ctx.Err() } } else if t.slowStarted != nil { // Keep fast sends behind the observed slow-start boundary so the test // deterministically exercises overlapping in-flight sends. select { case <-t.slowStarted: case <-ctx.Done(): return mesh.ProductionForwardResult{}, ctx.Err() } } t.mu.Lock() t.calls = append(t.calls, nextNodeID) t.envelopes = append(t.envelopes, envelope) t.mu.Unlock() if !isSlow { closeOnce(t.fastDone) } return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil } type memoryPacketTransport struct { sendErr error recvErr error sent [][]byte recv [][]byte } type captureFabricSessionSender struct { err error frames []fabricproto.Frame closed bool } func (s *captureFabricSessionSender) Send(_ context.Context, frame fabricproto.Frame) error { if s.err != nil { return s.err } s.frames = append(s.frames, frame) return nil } func (s *captureFabricSessionSender) Close() error { s.closed = true return nil } type memoryFabricSessionReceiver struct { frames chan fabricproto.Frame errors chan error } func (r memoryFabricSessionReceiver) Frames() <-chan fabricproto.Frame { return r.frames } func (r memoryFabricSessionReceiver) Errors() <-chan error { return r.errors } func (t *memoryPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error { if t.sendErr != nil { return t.sendErr } t.sent = append(t.sent, packets...) return nil } func (t *memoryPacketTransport) ReceiveGatewayPacketBatch(_ context.Context, _ time.Duration) ([][]byte, error) { if t.recvErr != nil { return nil, t.recvErr } packets := t.recv t.recv = nil return packets, nil } func TestFabricPacketTransportSendsVPNPacketBatchEnvelope(t *testing.T) { capture := &captureProductionTransport{} transport := &FabricPacketTransport{ ForwardTransport: capture, ClusterID: "cluster-1", VPNConnectionID: "vpn-1", RouteID: "route-1", LocalNodeID: "exit-1", RemoteNodeID: "entry-1", NextHopNodeID: "relay-1", RoutePath: []string{"exit-1", "relay-1", "entry-1"}, SendDirection: FabricDirectionGatewayToClient, } if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send fabric packet batch: %v", err) } if capture.nextNodeID != "relay-1" { t.Fatalf("next node = %q", capture.nextNodeID) } if capture.envelope.CurrentHopNodeID != "relay-1" || capture.envelope.NextHopNodeID != "entry-1" { t.Fatalf("envelope hop = %s -> %s, want relay-1 -> entry-1", capture.envelope.CurrentHopNodeID, capture.envelope.NextHopNodeID) } payload, err := mesh.DecodeProductionVPNPacketBatch(capture.envelope) if err != nil { t.Fatalf("decode envelope payload: %v", err) } if payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionGatewayToClient || string(payload.Packets[0]) != "packet" { t.Fatalf("payload = %+v", payload) } } func TestFabricSessionPacketTransportSendsDataFrame(t *testing.T) { sender := &captureFabricSessionSender{} transport := &FabricSessionPacketTransport{ Sender: sender, StreamID: 700, VPNConnectionID: "vpn-1", SendDirection: FabricDirectionClientToGateway, TrafficClass: FabricTrafficClassInteractive, } if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send fabric session packet batch: %v", err) } if len(sender.frames) != 1 { t.Fatalf("sent frames = %d, want 1", len(sender.frames)) } frame := sender.frames[0] if frame.Type != fabricproto.FrameData || frame.StreamID != 700 || frame.Sequence != 1 || frame.TrafficClass != fabricproto.TrafficClassInteractive { t.Fatalf("frame = %+v", frame) } payload, err := DecodeFabricVPNPacketDataFrame(frame) if err != nil { t.Fatalf("decode sent frame: %v", err) } if payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionClientToGateway || len(payload.Packets) != 1 || string(payload.Packets[0]) != "packet" { t.Fatalf("payload = %+v", payload) } } func TestFabricSessionPacketTransportShardsStreamsByTrafficClass(t *testing.T) { sender := &captureFabricSessionSender{} transport := &FabricSessionPacketTransport{ Sender: sender, StreamID: 700, VPNConnectionID: "vpn-1", SendDirection: FabricDirectionClientToGateway, StreamIDsByTrafficClass: map[string][]uint64{ FabricTrafficClassInteractive: []uint64{801, 802}, FabricTrafficClassBulk: []uint64{901, 902}, }, } bulkPacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443) controlPacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389) controlPacket[33] = 0x02 if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{bulkPacket}); err != nil { t.Fatalf("send bulk packet: %v", err) } if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{controlPacket}); err != nil { t.Fatalf("send control packet: %v", err) } if len(sender.frames) != 2 { t.Fatalf("sent frames = %d, want 2", len(sender.frames)) } if sender.frames[0].TrafficClass != fabricproto.TrafficClassBulk || sender.frames[0].StreamID < 901 || sender.frames[0].StreamID > 902 { t.Fatalf("bulk frame did not use bulk shard: %+v", sender.frames[0]) } if sender.frames[1].TrafficClass != fabricproto.TrafficClassInteractive || sender.frames[1].StreamID < 801 || sender.frames[1].StreamID > 802 { t.Fatalf("control frame did not use interactive shard: %+v", sender.frames[1]) } if sender.frames[0].Sequence != 1 || sender.frames[1].Sequence != 1 { t.Fatalf("per-stream sequences = %d/%d, want 1/1", sender.frames[0].Sequence, sender.frames[1].Sequence) } snapshot := transport.Snapshot() if snapshot["schema_version"] != "rap.vpn_fabric_session_packet_transport.v1" { t.Fatalf("snapshot schema missing: %+v", snapshot) } if snapshot["stream_class_count"] != 2 || snapshot["stream_shard_count"] != 4 || snapshot["send_class_count"] != 2 || snapshot["send_stream_count"] != 2 || snapshot["sharding_active"] != true { t.Fatalf("unexpected shard summary: %+v", snapshot) } framesByClass := snapshot["send_frames_by_class"].(map[string]uint64) if framesByClass[FabricTrafficClassBulk] != 1 || framesByClass[FabricTrafficClassInteractive] != 1 { t.Fatalf("send frames by class = %+v", framesByClass) } } func TestFabricSessionPacketTransportSplitsMixedBatchByStream(t *testing.T) { sender := &captureFabricSessionSender{} transport := &FabricSessionPacketTransport{ Sender: sender, VPNConnectionID: "vpn-1", SendDirection: FabricDirectionClientToGateway, StreamIDsByTrafficClass: map[string][]uint64{ FabricTrafficClassInteractive: []uint64{801}, FabricTrafficClassBulk: []uint64{901, 902}, }, } bulkA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443) bulkB := packetWithDifferentShard(bulkA, 2) control := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389) control[33] = 0x02 if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{bulkA, bulkB, control}); err != nil { t.Fatalf("send mixed batch: %v", err) } if len(sender.frames) != 3 { t.Fatalf("sent frames = %d, want 3: %+v", len(sender.frames), sender.frames) } streams := map[uint64]fabricproto.TrafficClass{} for _, frame := range sender.frames { streams[frame.StreamID] = frame.TrafficClass } if streams[801] != fabricproto.TrafficClassInteractive || streams[901] != fabricproto.TrafficClassBulk || streams[902] != fabricproto.TrafficClassBulk { t.Fatalf("unexpected stream/class split: %+v", sender.frames) } snapshot := transport.Snapshot() if snapshot["send_stream_count"] != 3 || snapshot["send_class_count"] != 2 || snapshot["split_batch_count"] != uint64(1) || snapshot["last_batch_frame_count"] != uint64(3) || snapshot["max_batch_frame_count"] != uint64(3) { t.Fatalf("unexpected mixed-batch shard summary: %+v", snapshot) } packetsByStream := snapshot["send_packets_by_stream_id"].(map[string]uint64) if packetsByStream["801"] != 1 || packetsByStream["901"] != 1 || packetsByStream["902"] != 1 { t.Fatalf("unexpected packets by stream: %+v", packetsByStream) } } func TestFabricSessionPacketTransportFanoutBoundedByConfiguredStreams(t *testing.T) { sender := &captureFabricSessionSender{} transport := &FabricSessionPacketTransport{ Sender: sender, VPNConnectionID: "vpn-1", SendDirection: FabricDirectionClientToGateway, StreamIDsByTrafficClass: map[string][]uint64{ FabricTrafficClassBulk: []uint64{901, 902, 903, 904}, }, } var packets [][]byte for port := uint16(10000); port < 10100; port++ { packets = append(packets, testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443)) } if err := transport.SendGatewayPacketBatch(context.Background(), packets); err != nil { t.Fatalf("send large flow batch: %v", err) } if len(sender.frames) > 4 { t.Fatalf("fanout = %d, want bounded by 4 streams", len(sender.frames)) } snapshot := transport.Snapshot() if snapshot["max_batch_frame_count"].(uint64) > 4 { t.Fatalf("max fanout not bounded: %+v", snapshot) } } func TestFabricSessionPacketTransportClosesAllStreamShards(t *testing.T) { sender := &captureFabricSessionSender{} transport := &FabricSessionPacketTransport{ Sender: sender, StreamID: 700, StreamIDsByTrafficClass: map[string][]uint64{ FabricTrafficClassInteractive: []uint64{801, 802}, FabricTrafficClassBulk: []uint64{901, 902}, }, } if err := transport.Close(); err != nil { t.Fatalf("close transport: %v", err) } if !sender.closed { t.Fatal("underlying fabric session was not closed") } closed := map[uint64]bool{} for _, frame := range sender.frames { if frame.Type == fabricproto.FrameCloseStream { closed[frame.StreamID] = true } } for _, streamID := range []uint64{700, 801, 802, 901, 902} { if !closed[streamID] { t.Fatalf("stream %d was not closed; frames=%+v", streamID, sender.frames) } } snapshot := transport.Snapshot() if snapshot["close_stream_frames"] != uint64(5) || snapshot["close_errors"] != uint64(0) { t.Fatalf("unexpected close counters: %+v", snapshot) } } func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) { inbox := NewFabricPacketInbox(4) receiver := memoryFabricSessionReceiver{ frames: make(chan fabricproto.Frame, 2), errors: make(chan error, 1), } transport := &FabricSessionPacketTransport{ Receiver: receiver, Inbox: inbox, StreamID: 701, VPNConnectionID: "vpn-1", } frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ StreamID: 701, Sequence: 1, VPNConnectionID: "vpn-1", Direction: FabricDirectionClientToGateway, Packets: [][]byte{[]byte("packet")}, }) if err != nil { t.Fatalf("new fabric vpn frame: %v", err) } receiver.frames <- fabricproto.Frame{Type: fabricproto.FrameAck, StreamID: 701, Sequence: 1} receiver.frames <- frame close(receiver.frames) if err := transport.RunFrameIngress(context.Background()); err != nil { t.Fatalf("run frame ingress: %v", err) } packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) if err != nil { t.Fatalf("receive gateway packet: %v", err) } if len(packets) != 1 || string(packets[0]) != "packet" { t.Fatalf("packets = %#v", packets) } } func TestFabricSessionPacketTransportReceiveReadsPumpFrames(t *testing.T) { inbox := NewFabricPacketInbox(4) receiver := memoryFabricSessionReceiver{ frames: make(chan fabricproto.Frame, 1), errors: make(chan error, 1), } transport := &FabricSessionPacketTransport{ Receiver: receiver, Inbox: inbox, StreamID: 711, VPNConnectionID: "vpn-1", ReceiveDirection: FabricDirectionClientToGateway, } frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ StreamID: 711, Sequence: 1, VPNConnectionID: "vpn-1", Direction: FabricDirectionClientToGateway, Packets: [][]byte{[]byte("request")}, }) if err != nil { t.Fatalf("new fabric vpn frame: %v", err) } receiver.frames <- frame packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) if err != nil { t.Fatalf("receive gateway packet: %v", err) } if len(packets) != 1 || string(packets[0]) != "request" { t.Fatalf("packets = %#v", packets) } snapshot := transport.Snapshot() framesByClass := snapshot["receive_frames_by_class"].(map[string]uint64) packetsByStream := snapshot["receive_packets_by_stream_id"].(map[string]uint64) if framesByClass[FabricTrafficClassBulk] != 1 || packetsByStream["711"] != 1 { t.Fatalf("unexpected receive counters: %+v", snapshot) } } func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) { inbox := NewFabricPacketInbox(4) receiver := memoryFabricSessionReceiver{ frames: make(chan fabricproto.Frame, 1), errors: make(chan error, 1), } transport := &FabricSessionPacketTransport{ Receiver: receiver, Inbox: inbox, StreamID: 701, VPNConnectionID: "vpn-1", } frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ StreamID: 702, Sequence: 1, VPNConnectionID: "vpn-1", Direction: FabricDirectionClientToGateway, Packets: [][]byte{[]byte("packet")}, }) if err != nil { t.Fatalf("new fabric vpn frame: %v", err) } receiver.frames <- frame close(receiver.frames) if err := transport.RunFrameIngress(context.Background()); err != nil { t.Fatalf("run frame ingress: %v", err) } packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), 10*time.Millisecond) if err != nil { t.Fatalf("receive gateway packet: %v", err) } if len(packets) != 0 { t.Fatalf("packets = %#v, want none", packets) } } func TestFabricPacketInboxReceivesMatchingDirection(t *testing.T) { inbox := NewFabricPacketInbox(4) envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{ RouteID: "route-1", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", CurrentHopNodeID: "exit-1", NextHopNodeID: "exit-1", RoutePath: []string{"entry-1", "exit-1"}, VPNConnectionID: "vpn-1", Direction: FabricDirectionClientToGateway, Packets: [][]byte{[]byte("packet")}, }) if err != nil { t.Fatalf("new envelope: %v", err) } if err := inbox.DeliverProductionEnvelope(context.Background(), envelope); err != nil { t.Fatalf("deliver envelope: %v", err) } packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second) if err != nil { t.Fatalf("receive: %v", err) } if len(packets) != 1 || string(packets[0]) != "packet" { t.Fatalf("packets = %#v", packets) } } func TestFabricPacketInboxKeepsDirectionsIsolated(t *testing.T) { inbox := NewFabricPacketInbox(4) if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{[]byte("reply")}); err != nil { t.Fatalf("deliver gateway packet: %v", err) } if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionClientToGateway, [][]byte{[]byte("request")}); err != nil { t.Fatalf("deliver client packet: %v", err) } clientPackets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second) if err != nil { t.Fatalf("receive client direction: %v", err) } gatewayPackets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) if err != nil { t.Fatalf("receive gateway direction: %v", err) } if len(clientPackets) != 1 || string(clientPackets[0]) != "request" { t.Fatalf("client packets = %#v", clientPackets) } if len(gatewayPackets) != 1 || string(gatewayPackets[0]) != "reply" { t.Fatalf("gateway packets = %#v", gatewayPackets) } } func TestFabricPacketInboxDropsEmptyPackets(t *testing.T) { inbox := NewFabricPacketInbox(4) if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{nil, []byte{}, []byte("reply")}); err != nil { t.Fatalf("deliver gateway packet: %v", err) } packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) if err != nil { t.Fatalf("receive gateway packet: %v", err) } if len(packets) != 1 || string(packets[0]) != "reply" { t.Fatalf("packets = %#v", packets) } } func TestFabricVPNPacketDataFrameRoundTrip(t *testing.T) { packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 57032, 3389) packet[33] = 0x18 now := time.Unix(1700000000, 123).UTC() frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ StreamID: 55, Sequence: 9, VPNConnectionID: "vpn-1", Direction: FabricDirectionClientToGateway, TrafficClass: FabricTrafficClassInteractive, Packets: [][]byte{packet}, Now: now, }) if err != nil { t.Fatalf("new fabric vpn frame: %v", err) } if frame.Type != fabricproto.FrameData || frame.StreamID != 55 || frame.Sequence != 9 || frame.TrafficClass != fabricproto.TrafficClassInteractive { t.Fatalf("frame = %+v", frame) } payload, err := DecodeFabricVPNPacketDataFrame(frame) if err != nil { t.Fatalf("decode fabric vpn frame: %v", err) } if payload.SchemaVersion != "rap.vpn_packet_batch.fabric.v1" || payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionClientToGateway || !payload.SentAt.Equal(now) || len(payload.Packets) != 1 || string(payload.Packets[0]) != string(packet) { t.Fatalf("payload = %+v", payload) } } func TestFabricPacketInboxReceivesFabricSessionFrame(t *testing.T) { inbox := NewFabricPacketInbox(4) frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ StreamID: 88, Sequence: 1, VPNConnectionID: "vpn-1", Direction: FabricDirectionGatewayToClient, Packets: [][]byte{[]byte("reply")}, }) if err != nil { t.Fatalf("new fabric vpn frame: %v", err) } if err := inbox.DeliverFabricSessionFrame(context.Background(), frame); err != nil { t.Fatalf("deliver fabric session frame: %v", err) } packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) if err != nil { t.Fatalf("receive fabric session packet: %v", err) } if len(packets) != 1 || string(packets[0]) != "reply" { t.Fatalf("packets = %#v", packets) } } func TestFabricVPNPacketDataFrameInfersInteractiveTCPControl(t *testing.T) { packet := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032) packet[33] = 0x12 frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ StreamID: 91, Sequence: 1, VPNConnectionID: "vpn-1", Direction: FabricDirectionGatewayToClient, TrafficClass: FabricTrafficClassBulk, Packets: [][]byte{packet}, }) if err != nil { t.Fatalf("new fabric vpn frame: %v", err) } if frame.TrafficClass != fabricproto.TrafficClassInteractive { t.Fatalf("traffic class = %v, want interactive", frame.TrafficClass) } } func TestFabricPacketInboxPrioritizesGatewayTCPControlPackets(t *testing.T) { inbox := NewFabricPacketInbox(4) normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000) priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032) priority[33] = 0x12 if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil { t.Fatalf("deliver normal gateway packet: %v", err) } if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority}); err != nil { t.Fatalf("deliver priority gateway packet: %v", err) } packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) if err != nil { t.Fatalf("receive gateway packet: %v", err) } if len(packets) != 1 || string(packets[0]) != string(priority) { t.Fatalf("first packets = %#v, want priority tcp control packet", packets) } } func TestFabricPacketInboxWaitsBrieflyForGatewayTCPControlPackets(t *testing.T) { inbox := NewFabricPacketInbox(4) normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000) priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032) priority[33] = 0x12 if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil { t.Fatalf("deliver normal gateway packet: %v", err) } go func() { time.Sleep(time.Millisecond) _ = inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority}) }() packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) if err != nil { t.Fatalf("receive gateway packet: %v", err) } if len(packets) != 1 || string(packets[0]) != string(priority) { t.Fatalf("first packets = %#v, want delayed priority tcp control packet", packets) } } func TestLocalPacketTransportUsesFabricInboxDirections(t *testing.T) { inbox := NewFabricPacketInbox(4) transport := &LocalPacketTransport{Inbox: inbox, VPNConnectionID: "vpn-1"} if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionClientToGateway, [][]byte{[]byte("request")}); err != nil { t.Fatalf("deliver client packet: %v", err) } packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) if err != nil { t.Fatalf("receive gateway packet: %v", err) } if len(packets) != 1 || string(packets[0]) != "request" { t.Fatalf("received packets = %#v", packets) } if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil { t.Fatalf("send gateway packet: %v", err) } replies, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) if err != nil { t.Fatalf("receive client reply: %v", err) } if len(replies) != 1 || string(replies[0]) != "reply" { t.Fatalf("reply packets = %#v", replies) } } func TestFabricFlowSchedulerKeepsReverseFiveTupleTogether(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) forward := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) reverse := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000) batches := scheduler.ScheduleClientPackets([][]byte{forward, reverse}) if len(batches) != 1 { t.Fatalf("batches = %#v, want one logical flow channel", batches) } if len(batches[0].Packets) != 2 { t.Fatalf("batch packets = %d", len(batches[0].Packets)) } snapshot := scheduler.Snapshot() if snapshot.Enqueued != 2 || snapshot.ChannelCount != 1 || snapshot.QueueDepths[batches[0].ChannelID] != 2 { t.Fatalf("snapshot before complete = %+v", snapshot) } scheduler.Complete(batches[0]) snapshot = scheduler.Snapshot() if snapshot.Dequeued != 2 || snapshot.QueueDepths[batches[0].ChannelID] != 0 { t.Fatalf("snapshot after complete = %+v", snapshot) } } func TestFabricFlowSchedulerPrioritizesExplicitTrafficClass(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) bulk := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassBulk, [][]byte{packet}) interactive := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassInteractive, [][]byte{packet}) control := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassControl, [][]byte{packet}) combined := append(append(bulk, interactive...), control...) scheduler.sortScheduledBatches(combined) if len(combined) != 3 { t.Fatalf("scheduled count = %d, want 3", len(combined)) } if combined[0].TrafficClass != FabricTrafficClassControl || combined[1].TrafficClass != FabricTrafficClassInteractive || combined[2].TrafficClass != FabricTrafficClassBulk { t.Fatalf("scheduled classes = %s, %s, %s", combined[0].TrafficClass, combined[1].TrafficClass, combined[2].TrafficClass) } if !strings.Contains(combined[0].ChannelID, ":control:") || !strings.Contains(combined[1].ChannelID, ":interactive:") { t.Fatalf("priority channel ids = %q %q", combined[0].ChannelID, combined[1].ChannelID) } snapshot := scheduler.Snapshot() if snapshot.ChannelStats[combined[0].ChannelID].TrafficClass != FabricTrafficClassControl { t.Fatalf("control stat = %+v", snapshot.ChannelStats[combined[0].ChannelID]) } if snapshot.ChannelStats[combined[2].ChannelID].TrafficClass != FabricTrafficClassBulk { t.Fatalf("bulk stat = %+v", snapshot.ChannelStats[combined[2].ChannelID]) } if snapshot.TrafficClassCounts[FabricTrafficClassControl] != 1 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 || snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 1 { t.Fatalf("traffic class counts = %+v", snapshot.TrafficClassCounts) } } func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) { scheduler := NewFabricFlowScheduler(1, 1) packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) packetB := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) batches, dropped := scheduler.scheduleClientPackets("", "", [][]byte{packetA, packetB}) if len(batches) != 1 || len(batches[0].Packets) != 1 { t.Fatalf("batches = %#v, want one accepted packet", batches) } if dropped != 1 { t.Fatalf("dropped = %d, want per-call drop count 1", dropped) } snapshot := scheduler.Snapshot() if snapshot.Dropped != 1 || !snapshot.BackpressureActive { t.Fatalf("snapshot = %+v, want one dropped packet and active backpressure", snapshot) } if snapshot.PressureLevel != "critical" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "drops") { t.Fatalf("pressure = %s score=%d reasons=%v, want critical drops", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons) } if snapshot.RecommendedAction != "shed_or_reroute" { t.Fatalf("recommended action = %q, want shed_or_reroute", snapshot.RecommendedAction) } } func TestFabricFlowSchedulerSnapshotReportsNominalAction(t *testing.T) { snapshot := NewFabricFlowScheduler(8, 8).Snapshot() if snapshot.PressureLevel != "nominal" || snapshot.PressureScore != 0 || len(snapshot.PressureReasons) != 0 || snapshot.RecommendedAction != "observe" { t.Fatalf("nominal pressure snapshot = %+v", snapshot) } } func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) channelID := "flow-01" scheduler.RecordRouteSuccess(channelID, "route-fast", "node-b", 200*time.Microsecond) snapshot := scheduler.Snapshot() stat := snapshot.ChannelStats[channelID] if stat.LastSendDurationMillis != 1 { t.Fatalf("success last send duration = %d, want 1ms minimum for positive samples", stat.LastSendDurationMillis) } scheduler.RecordRouteFailure(channelID, "route-fast", "node-b", mesh.ErrForwardPeerUnavailable, 300*time.Microsecond) snapshot = scheduler.Snapshot() stat = snapshot.ChannelStats[channelID] if stat.LastSendDurationMillis != 1 { t.Fatalf("failure last send duration = %d, want 1ms minimum for positive samples", stat.LastSendDurationMillis) } } func TestAdaptivePacketTransportReturnsRepliesToLastReceiveSide(t *testing.T) { primary := &memoryPacketTransport{} fallback := &memoryPacketTransport{recv: [][]byte{[]byte("request")}} transport := &AdaptivePacketTransport{ Primary: primary, Fallback: fallback, PrimaryTimeout: time.Millisecond, } packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second) if err != nil { t.Fatalf("receive adaptive packet: %v", err) } if len(packets) != 1 || string(packets[0]) != "request" { t.Fatalf("packets = %#v", packets) } if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil { t.Fatalf("send adaptive reply: %v", err) } if len(primary.sent) != 0 { t.Fatalf("primary unexpectedly received reply: %#v", primary.sent) } if len(fallback.sent) != 1 || string(fallback.sent[0]) != "reply" { t.Fatalf("fallback reply = %#v", fallback.sent) } } func TestAdaptivePacketTransportFallsBackWhenPreferredSendFails(t *testing.T) { sendErr := errors.New("send failed") primary := &memoryPacketTransport{sendErr: sendErr} fallback := &memoryPacketTransport{} transport := &AdaptivePacketTransport{Primary: primary, Fallback: fallback} if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil { t.Fatalf("send adaptive reply: %v", err) } if len(fallback.sent) != 1 || string(fallback.sent[0]) != "reply" { t.Fatalf("fallback sent = %#v", fallback.sent) } } func TestFabricClientPacketIngressSendsClientPacketsOverSelectedRoute(t *testing.T) { capture := &captureProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: capture, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-entry-exit", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send client packet batch: %v", err) } if capture.nextNodeID != "relay-1" { t.Fatalf("next node = %q", capture.nextNodeID) } if capture.envelope.CurrentHopNodeID != "relay-1" || capture.envelope.NextHopNodeID != "exit-1" { t.Fatalf("envelope hop = %s -> %s, want relay-1 -> exit-1", capture.envelope.CurrentHopNodeID, capture.envelope.NextHopNodeID) } payload, err := mesh.DecodeProductionVPNPacketBatch(capture.envelope) if err != nil { t.Fatalf("decode envelope payload: %v", err) } if payload.Direction != FabricDirectionClientToGateway || payload.VPNConnectionID != "vpn-1" { t.Fatalf("payload = %+v", payload) } } func TestFabricClientPacketIngressUsesLeasePreferredRouteBeforeConfigOrder(t *testing.T) { capture := &captureProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: capture, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-alternate-exit", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-b", Hops: []string{"entry-1", "exit-b"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-primary-exit", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-a", Hops: []string{"entry-1", "exit-a"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.PreferClientRoute("route-primary-exit") if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send client packet batch: %v", err) } if capture.envelope.RouteID != "route-primary-exit" || capture.nextNodeID != "exit-a" { t.Fatalf("selected route = %s next=%s, want route-primary-exit via exit-a", capture.envelope.RouteID, capture.nextNodeID) } } func TestFabricClientPacketIngressTriesAlternateRouteBeforeBackendFallback(t *testing.T) { transport := &failoverProductionTransport{failNextHop: "relay-bad"} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-bad", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-bad", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-good", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-good", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send client packet batch: %v", err) } if len(transport.calls) != 2 || transport.calls[0] != "relay-bad" || transport.calls[1] != "relay-good" { t.Fatalf("route attempts = %#v, want relay-bad then relay-good", transport.calls) } if transport.envelope.RouteID != "route-good" || transport.envelope.CurrentHopNodeID != "relay-good" { t.Fatalf("selected envelope = %+v", transport.envelope) } snapshot := ingress.Snapshot("cluster-1") if snapshot.SendRouteAttempts != 2 || snapshot.SendRouteFailures != 1 || snapshot.LastSelectedRouteID != "route-good" { t.Fatalf("snapshot = %+v", snapshot) } } func TestFabricClientPacketIngressDoesNotFailOverPreferredRouteToDifferentDestination(t *testing.T) { transport := &failoverProductionTransport{failNextHop: "relay-home"} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-other", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "ifcm-1", Hops: []string{"entry-1", "relay-ifcm", "ifcm-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-home", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "home-1", Hops: []string{"entry-1", "relay-home", "home-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.PreferClientRoute("route-home") err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}) if err == nil { t.Fatal("send client packet batch succeeded after preferred route failure; want failure without cross-destination fallback") } if len(transport.calls) != 1 || transport.calls[0] != "relay-home" { t.Fatalf("route attempts = %#v, want only relay-home", transport.calls) } if transport.envelope.RouteID == "route-other" { t.Fatalf("cross-destination route was used: %+v", transport.envelope) } } func TestFabricClientPacketIngressAvoidsChannelFailedRouteOnNextSend(t *testing.T) { transport := &captureManyProductionTransport{} scheduler := NewFabricFlowScheduler(8, 16) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) channelID := fabricFlowChannelID("vpn-1", shard) scheduler.RecordRouteFailure(channelID, "route-bad", "relay-bad", mesh.ErrForwardPeerUnavailable, time.Millisecond) ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-bad", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-bad", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-good", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-good", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send client packet batch: %v", err) } if len(transport.calls) != 1 || transport.calls[0] != "relay-good" { t.Fatalf("route calls = %#v, want relay-good first without retrying stale route", transport.calls) } snapshot := ingress.Snapshot("cluster-1") stat := snapshot.FlowScheduler.ChannelStats[channelID] if stat.LastRouteID != "route-good" || stat.ConsecutiveFailures != 0 || stat.RouteRebuildRecommended { t.Fatalf("channel stat = %+v", stat) } } func TestFabricClientPacketIngressWithdrawsRouteAfterRebuildDecision(t *testing.T) { transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-bad", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-bad", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-good", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-good", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ RouteID: "route-bad", ReplacementRouteID: "route-good", RebuildRequestID: "route-bad-rebuild", RebuildStatus: "applied", RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate", RebuildAttempt: 3, DecisionSource: "service_channel_feedback_replacement", Generation: "config-v2", }}, "config-v2", time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send client packet batch: %v", err) } if len(transport.calls) != 1 || transport.calls[0] != "relay-good" { t.Fatalf("route calls = %#v, want only rebuild replacement route", transport.calls) } snapshot := ingress.Snapshot("cluster-1") if snapshot.RouteCandidateCount != 1 || snapshot.RouteManager.RebuildRequestCount != 1 || snapshot.RouteManager.RebuildAppliedCount != 1 || snapshot.RouteManager.WithdrawnRouteCount != 1 { t.Fatalf("snapshot route manager = %+v", snapshot.RouteManager) } if snapshot.LastSelectedRouteID != "route-good" { t.Fatalf("selected route = %q, want route-good", snapshot.LastSelectedRouteID) } } func TestFabricClientPacketIngressPrefersControlPlaneReplacementOverConfigOrder(t *testing.T) { transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-bad", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-slow", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-standby", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-standby", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-fast", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-fast", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send initial packet batch: %v", err) } if ingress.Snapshot("cluster-1").LastSelectedRouteID != "route-bad" { t.Fatalf("initial selected route = %q, want route-bad", ingress.Snapshot("cluster-1").LastSelectedRouteID) } ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ RouteID: "route-bad", ReplacementRouteID: "route-fast", RebuildRequestID: "route-bad-rebuild", RebuildStatus: "applied", RebuildReason: "service_channel_feedback_rebuild_applied_to_fastest_pool_route", DecisionSource: "service_channel_feedback_replacement", Generation: "config-v2", }}, "config-v2", time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send packet batch after rebuild: %v", err) } if got := transport.envelopes[len(transport.envelopes)-1].RouteID; got != "route-fast" { t.Fatalf("post-rebuild selected route = %q, want Control Plane replacement route-fast; calls=%v", got, transport.calls) } snapshot := ingress.Snapshot("cluster-1") if snapshot.LastSelectedRouteID != "route-fast" || snapshot.RouteCandidateCount != 2 { t.Fatalf("post-rebuild snapshot = %+v", snapshot) } } func TestFabricClientPacketIngressRestoresWithdrawnRouteAfterFreshConfig(t *testing.T) { transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-bad", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-bad", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-good", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-good", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send initial packet batch: %v", err) } if ingress.Snapshot("cluster-1").LastSelectedRouteID != "route-bad" { t.Fatalf("initial selected route = %q, want route-bad", ingress.Snapshot("cluster-1").LastSelectedRouteID) } ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ RouteID: "route-bad", ReplacementRouteID: "route-good", RebuildRequestID: "route-bad-rebuild", RebuildStatus: "applied", RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate", RebuildAttempt: 4, DecisionSource: "service_channel_feedback_replacement", Generation: "config-v2", }}, "config-v2", time.Now().UTC()) snapshot := ingress.Snapshot("cluster-1") if snapshot.RouteManagerTransition.Status != "applied_rebuild" || snapshot.RouteManagerTransition.ClearedSelectedRouteID != "route-bad" || snapshot.RouteManagerTransition.RebuildAppliedCount != 1 { t.Fatalf("apply transition = %+v", snapshot.RouteManagerTransition) } if snapshot.RouteCandidateCount != 1 { t.Fatalf("route candidate count after rebuild = %d, want 1", snapshot.RouteCandidateCount) } ingress.UpdateRouteManager(nil, "config-v3", time.Now().UTC()) snapshot = ingress.Snapshot("cluster-1") if snapshot.RouteManagerTransition.Status != "restored_by_new_config" || snapshot.RouteManagerTransition.RestoredRouteCount != 1 || snapshot.RouteManager.WithdrawnRouteCount != 0 || snapshot.RouteCandidateCount != 2 { t.Fatalf("restore transition/snapshot = %+v / %+v", snapshot.RouteManagerTransition, snapshot.RouteManager) } } func TestFabricClientPacketIngressPendingDegradedFallbackWithdrawsRouteWithoutAlternate(t *testing.T) { ingress := &FabricClientPacketIngress{ ForwardTransport: &captureManyProductionTransport{}, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-bad", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-bad", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ RouteID: "route-bad", RebuildRequestID: "route-bad-rebuild", RebuildStatus: "pending_degraded_fallback", RebuildReason: "service_channel_feedback_rebuild_pending_backend_relay", RebuildAttempt: 2, DecisionSource: "service_channel_feedback_no_alternate", Generation: "config-v2", }}, "config-v2", time.Now().UTC()) err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}) if !errors.Is(err, mesh.ErrRouteNotFound) { t.Fatalf("send error = %v, want route not found after pending degraded withdrawal", err) } snapshot := ingress.Snapshot("cluster-1") if snapshot.RouteCandidateCount != 0 || snapshot.RouteManager.PendingFallbackCount != 1 || snapshot.RouteManager.WithdrawnRouteCount != 1 || snapshot.RouteManagerTransition.Status != "pending_degraded_fallback" { t.Fatalf("pending fallback snapshot = %+v transition=%+v", snapshot.RouteManager, snapshot.RouteManagerTransition) } } func TestFabricClientPacketIngressKeepsLastRouteWhenWithdrawalPreventionEnabled(t *testing.T) { transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), ClusterID: "cluster-1", LocalNodeID: "entry-1", PreventLastRouteWithdrawal: true, Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-only", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ RouteID: "route-only", RebuildStatus: "pending_degraded_fallback", DecisionSource: "service_channel_feedback_no_alternate", }}, "config-v2", time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send client packet batch: %v", err) } if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-only" { t.Fatalf("envelopes = %+v, want preserved last route", transport.envelopes) } if snapshot := ingress.Snapshot("cluster-1"); snapshot.RouteCandidateCount != 1 { t.Fatalf("route candidate count = %d, want last withdrawn route preserved", snapshot.RouteCandidateCount) } } func TestFabricClientPacketIngressMarksChannelForRebuildAfterRepeatedRouteFailures(t *testing.T) { transport := &failoverProductionTransport{failNextHop: "relay-bad"} scheduler := NewFabricFlowScheduler(8, 16) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) channelID := fabricFlowChannelID("vpn-1", shard) ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-bad-1", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-bad", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-bad-2", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-bad", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err == nil { t.Fatalf("send unexpectedly succeeded") } snapshot := ingress.Snapshot("cluster-1") stat := snapshot.FlowScheduler.ChannelStats[channelID] if !stat.RouteRebuildRecommended || !stat.DegradedFallbackRecommended || stat.ConsecutiveFailures < 2 { t.Fatalf("channel stat = %+v, want rebuild and degraded fallback recommendation", stat) } } func TestFabricClientPacketIngressSplitsIndependentFlowsIntoLogicalChannels(t *testing.T) { transport := &captureManyProductionTransport{} scheduler := NewFabricFlowScheduler(8, 16) packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) packetB := packetWithDifferentShard(packetA, 8) expectedChannels := expectedScheduledChannelCount(scheduler, [][]byte{packetA, packetB}) ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-entry-exit", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil { t.Fatalf("send client packet batch: %v", err) } if len(transport.envelopes) != expectedChannels { t.Fatalf("envelopes = %d, want %d", len(transport.envelopes), expectedChannels) } for _, envelope := range transport.envelopes { payload, err := mesh.DecodeProductionVPNPacketBatch(envelope) if err != nil { t.Fatalf("decode envelope: %v", err) } if len(payload.Packets) == 0 { t.Fatalf("empty logical channel envelope") } } snapshot := ingress.Snapshot("cluster-1") if snapshot.SendFlowBatches != uint64(expectedChannels) || snapshot.FlowScheduler.Enqueued != 2 || snapshot.FlowScheduler.Dequeued != 2 { t.Fatalf("snapshot = %+v", snapshot) } } func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *testing.T) { transport := &captureManyProductionTransport{} scheduler := NewFabricFlowScheduler(8, 16) packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) packetB := packetWithDifferentShard(packetA, 8) _, shardA := classifyPacketFlow(packetA, scheduler.shardCountValue()) _, shardB := classifyPacketFlow(packetB, scheduler.shardCountValue()) channelA := fabricFlowChannelID("vpn-1", shardA) channelB := fabricFlowChannelID("vpn-1", shardB) if channelA == channelB { t.Fatalf("test packets mapped to same channel %s", channelA) } scheduler.RecordRouteFailure(channelA, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, time.Millisecond) scheduler.RecordRouteSuccess(channelB, "route-primary", "relay-primary", time.Millisecond) ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-primary", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-alternate", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-alternate", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil { t.Fatalf("send client packet batch: %v", err) } if len(transport.envelopes) != 2 { t.Fatalf("envelopes = %d, want 2 independent logical channel sends", len(transport.envelopes)) } routeByPacket := map[string]string{} for _, envelope := range transport.envelopes { payload, err := mesh.DecodeProductionVPNPacketBatch(envelope) if err != nil { t.Fatalf("decode envelope: %v", err) } if len(payload.Packets) != 1 { t.Fatalf("payload packets = %d, want one packet per logical channel", len(payload.Packets)) } routeByPacket[string(payload.Packets[0])] = envelope.RouteID } if routeByPacket[string(packetA)] != "route-alternate" { t.Fatalf("channel A route = %q, want alternate after isolated failure", routeByPacket[string(packetA)]) } if routeByPacket[string(packetB)] != "route-primary" { t.Fatalf("channel B route = %q, want primary route memory preserved", routeByPacket[string(packetB)]) } snapshot := ingress.Snapshot("cluster-1") statA := snapshot.FlowScheduler.ChannelStats[channelA] statB := snapshot.FlowScheduler.ChannelStats[channelB] if statA.LastRouteID != "route-alternate" || statA.LastFailedRouteID != "" || statA.ConsecutiveFailures != 0 { t.Fatalf("channel A stat = %+v, want recovered on alternate route", statA) } if statA.LastRecoveredFromRouteID != "route-primary" || statA.LastRecoveredNextHop != "relay-primary" || statA.LastRouteSwitchReason != "peer_unavailable" || statA.RouteSwitchCount != 1 || statA.LastRouteFailureAt == "" || statA.LastRouteSwitchAt == "" || statA.LastRouteRecoveryMillis < 0 || snapshot.FlowScheduler.RouteRecoveredChannelCount != 1 || snapshot.FlowScheduler.RouteSwitchCount != 1 || snapshot.FlowScheduler.RouteRecoveryMaxMillis != statA.LastRouteRecoveryMillis || snapshot.FlowScheduler.RouteRecoveryAvgMillis != statA.LastRouteRecoveryMillis || snapshot.FlowScheduler.RouteSwitchReasonCounts["peer_unavailable"] != 1 { t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler) } if snapshot.FlowScheduler.PressureLevel != "critical" || snapshot.FlowScheduler.PressureScore <= 0 || !containsString(snapshot.FlowScheduler.PressureReasons, "route_recovery") || !containsString(snapshot.FlowScheduler.PressureReasons, "route_failures") { t.Fatalf("route recovery pressure = %s score=%d reasons=%v", snapshot.FlowScheduler.PressureLevel, snapshot.FlowScheduler.PressureScore, snapshot.FlowScheduler.PressureReasons) } if snapshot.FlowScheduler.RecommendedAction != "rebuild_or_reroute" { t.Fatalf("route recovery action = %q, want rebuild_or_reroute", snapshot.FlowScheduler.RecommendedAction) } if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 { t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB) } if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 || snapshot.FlowScheduler.HighWatermark > 1 { t.Fatalf("bounded scheduler telemetry = %+v send_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped) } } func TestNormalizeFabricRouteSwitchReasonBucketsCommonFailures(t *testing.T) { cases := map[string]string{ "context deadline exceeded while dialing 10.0.0.1:19124": "timeout", "dial tcp 10.0.0.1:19124: connection refused": "connection_refused", "production mesh next peer is unavailable": "peer_unavailable", "quic fabric stream capacity limited": "capacity_limited", "": "route_failure", } for input, want := range cases { if got := normalizeFabricRouteSwitchReason(input); got != want { t.Fatalf("normalizeFabricRouteSwitchReason(%q) = %q, want %q", input, got, want) } } } func TestFabricClientPacketIngressIsolatesRouteMemoryPerVPNConnection(t *testing.T) { transport := &captureManyProductionTransport{} scheduler := NewFabricFlowScheduler(8, 16) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) channelA := testFlowChannelID("vpn-a", packet, scheduler.shardCountValue()) channelB := testFlowChannelID("vpn-b", packet, scheduler.shardCountValue()) if channelA == channelB { t.Fatalf("session-scoped channels collapsed to %s", channelA) } scheduler.RecordRouteFailure(channelA, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, time.Millisecond) scheduler.RecordRouteSuccess(channelB, "route-primary", "relay-primary", time.Millisecond) ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-primary", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-alternate", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-alternate", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-a", [][]byte{packet}); err != nil { t.Fatalf("send vpn-a packet: %v", err) } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-b", [][]byte{packet}); err != nil { t.Fatalf("send vpn-b packet: %v", err) } if len(transport.envelopes) != 2 { t.Fatalf("envelopes = %d, want one send per vpn session", len(transport.envelopes)) } if transport.envelopes[0].RouteID != "route-alternate" { t.Fatalf("vpn-a route = %s, want alternate after session-local failure", transport.envelopes[0].RouteID) } if transport.envelopes[1].RouteID != "route-primary" { t.Fatalf("vpn-b route = %s, want primary preserved from separate session memory", transport.envelopes[1].RouteID) } snapshot := ingress.Snapshot("cluster-1") statA := snapshot.FlowScheduler.ChannelStats[channelA] statB := snapshot.FlowScheduler.ChannelStats[channelB] if statA.LastRouteID != "route-alternate" || statA.LastFailedRouteID != "" { t.Fatalf("vpn-a stat = %+v, want recovered alternate route without leaking failure", statA) } if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" { t.Fatalf("vpn-b stat = %+v, want primary route preserved", statB) } if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped) } } func TestFabricClientPacketIngressRouteSelectionUsesUpdatedRuntimeIdentity(t *testing.T) { transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-entry-1", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } ingress.UpdateRuntime( transport, NewFabricPacketInbox(8), "cluster-1", "entry-2", nil, func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-entry-2", ClusterID: "cluster-1", SourceNodeID: "entry-2", DestinationNodeID: "exit-2", Hops: []string{"entry-2", "relay-2", "exit-2"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, "policy-updated", ) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443) if err := ingress.SendClientPacketBatch(context.Background(), "", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send after runtime update: %v", err) } if len(transport.envelopes) != 1 { t.Fatalf("envelopes = %d, want one send", len(transport.envelopes)) } envelope := transport.envelopes[0] if envelope.RouteID != "route-entry-2" || envelope.SourceNodeID != "entry-2" || transport.calls[0] != "relay-2" { t.Fatalf("envelope route/source/next-hop = %s/%s/%s, want updated entry-2 route", envelope.RouteID, envelope.SourceNodeID, transport.calls[0]) } } func TestFabricClientPacketIngressParallelFlowWindowDoesNotBlockIndependentChannel(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 16) slowPacket, fastPacket := packetsForOrderedDistinctChannels(scheduler.shardCountValue()) transport := &blockingProductionTransport{ slowPort: packetSourcePort(slowPacket), slowStarted: make(chan struct{}), releaseSlow: make(chan struct{}), fastDone: make(chan struct{}), } ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, MaxParallelFlowSends: 2, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-primary", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } done := make(chan error, 1) go func() { done <- ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{slowPacket, fastPacket}) }() select { case <-transport.slowStarted: case err := <-done: t.Fatalf("send completed before slow channel started: %v", err) case <-time.After(time.Second): t.Fatalf("timed out waiting for slow channel to start") } select { case <-transport.fastDone: case err := <-done: t.Fatalf("send completed before independent fast channel completed: %v", err) case <-time.After(200 * time.Millisecond): t.Fatalf("fast independent channel was blocked behind slow channel") } close(transport.releaseSlow) if err := <-done; err != nil { t.Fatalf("parallel send returned error: %v", err) } snapshot := ingress.Snapshot("cluster-1") if snapshot.SendFlowParallel != 1 || snapshot.MaxParallelFlowSends != 2 { t.Fatalf("parallel telemetry = batches:%d max:%d", snapshot.SendFlowParallel, snapshot.MaxParallelFlowSends) } if snapshot.RecommendedParallelFlowSends != 2 || snapshot.FlowScheduler.MaxInFlight != 2 || snapshot.FlowScheduler.InFlight != 0 { t.Fatalf("window/in-flight telemetry = recommended:%d in_flight:%d max_in_flight:%d", snapshot.RecommendedParallelFlowSends, snapshot.FlowScheduler.InFlight, snapshot.FlowScheduler.MaxInFlight) } if snapshot.SendFlowBatches != 2 || snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { t.Fatalf("flow telemetry = %+v send_flow_batches=%d send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowBatches, snapshot.SendFlowDropped) } for channelID, stat := range snapshot.FlowScheduler.ChannelStats { if stat.SendAttempts != 1 || stat.SendSuccesses != 1 || stat.SendFailures != 0 || stat.MaxInFlight != 1 { t.Fatalf("channel %s stat = %+v, want one successful in-flight send", channelID, stat) } if stat.LatencyLe10Millis+stat.LatencyLe100Millis+stat.LatencyLe1000Millis+stat.LatencyGt1000Millis != 1 { t.Fatalf("channel %s latency buckets = %+v, want one sample", channelID, stat) } } } func TestFabricClientPacketIngressInteractiveClassCompletesWhileBulkInFlight(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 16) bulkPacket, interactivePacket := packetsForOrderedDistinctChannels(scheduler.shardCountValue()) transport := &blockingProductionTransport{ slowPort: packetSourcePort(bulkPacket), slowStarted: make(chan struct{}), releaseSlow: make(chan struct{}), fastDone: make(chan struct{}), } ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, MaxParallelFlowSends: 1, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-primary", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } bulkDone := make(chan error, 1) go func() { bulkDone <- ingress.SendClientPacketBatchWithTrafficClass(context.Background(), "cluster-1", "vpn-1", FabricTrafficClassBulk, [][]byte{bulkPacket}) }() select { case <-transport.slowStarted: case err := <-bulkDone: t.Fatalf("bulk send completed before blocking: %v", err) case <-time.After(time.Second): t.Fatal("timed out waiting for bulk send to block") } interactiveDone := make(chan error, 1) go func() { interactiveDone <- ingress.SendClientPacketBatchWithTrafficClass(context.Background(), "cluster-1", "vpn-1", FabricTrafficClassInteractive, [][]byte{interactivePacket}) }() select { case err := <-interactiveDone: if err != nil { t.Fatalf("interactive send returned error while bulk in flight: %v", err) } case <-time.After(200 * time.Millisecond): t.Fatal("interactive traffic-class send was blocked behind in-flight bulk") } select { case <-transport.fastDone: default: t.Fatal("interactive transport send did not complete") } close(transport.releaseSlow) if err := <-bulkDone; err != nil { t.Fatalf("bulk send returned error after release: %v", err) } bulkChannel := testFlowChannelID("vpn-1", bulkPacket, scheduler.shardCountValue()) interactiveChannel := fabricFlowChannelIDForClass("vpn-1", FabricTrafficClassInteractive, packetShard(interactivePacket, scheduler.shardCountValue())) snapshot := ingress.Snapshot("cluster-1") if snapshot.FlowScheduler.MaxInFlight < 2 { t.Fatalf("max in-flight = %d, want concurrent bulk+interactive", snapshot.FlowScheduler.MaxInFlight) } bulkStat := snapshot.FlowScheduler.ChannelStats[bulkChannel] interactiveStat := snapshot.FlowScheduler.ChannelStats[interactiveChannel] if bulkStat.TrafficClass != FabricTrafficClassBulk || bulkStat.SendSuccesses != 1 || bulkStat.SendFailures != 0 { t.Fatalf("bulk stat = %+v", bulkStat) } if interactiveStat.TrafficClass != FabricTrafficClassInteractive || interactiveStat.SendSuccesses != 1 || interactiveStat.SendFailures != 0 { t.Fatalf("interactive stat = %+v", interactiveStat) } if snapshot.FlowScheduler.TrafficClassCounts[FabricTrafficClassBulk] != 1 || snapshot.FlowScheduler.TrafficClassCounts[FabricTrafficClassInteractive] != 1 { t.Fatalf("traffic class counts = %+v", snapshot.FlowScheduler.TrafficClassCounts) } if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped) } } func TestFabricFlowSchedulerRecommendsSmallerWindowUnderPressure(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 1) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) if got := scheduler.RecommendedParallelSendWindow(4); got != 4 { t.Fatalf("clean recommended window = %d, want 4", got) } scheduled := scheduler.ScheduleClientPacketsForConnection("vpn-1", [][]byte{packet, packetWithSameShard(packet, scheduler.shardCountValue())}) if len(scheduled) != 1 { t.Fatalf("scheduled = %d, want one accepted channel after bounded drop", len(scheduled)) } if got := scheduler.RecommendedParallelSendWindow(4); got != 1 { t.Fatalf("drop-pressure recommended window = %d, want 1", got) } channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 2500*time.Millisecond) snapshot := scheduler.Snapshot() stat := snapshot.ChannelStats[channelID] if snapshot.FailingChannelCount != 1 || snapshot.SlowChannelCount != 1 { t.Fatalf("pressure counts = failing:%d slow:%d stat=%+v", snapshot.FailingChannelCount, snapshot.SlowChannelCount, stat) } if stat.SendFailures != 1 || stat.LatencyGt1000Millis != 1 || !stat.RouteRebuildRecommended { t.Fatalf("failure/latency stat = %+v, want retry-window telemetry", stat) } if stat.QualityWindowDropCount != 1 || stat.QualityWindowFailureCount != 1 || stat.QualityWindowSlowCount != 1 { t.Fatalf("rolling quality stat = %+v, want fresh drop+failure+slow sample", stat) } } func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testing.T) { scheduler := NewFabricFlowScheduler(32, 16) bulkPackets := packetsForDistinctShards(16, scheduler.shardCountValue()) if len(bulkPackets) != 16 { t.Fatalf("bulk packet count = %d, want 16 distinct shards", len(bulkPackets)) } interactivePacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 61000, 3389) if scheduled := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassBulk, bulkPackets); len(scheduled) != 16 { t.Fatalf("bulk scheduled = %d, want 16", len(scheduled)) } if scheduled := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassInteractive, [][]byte{interactivePacket}); len(scheduled) != 1 { t.Fatalf("interactive scheduled = %d, want 1", len(scheduled)) } if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassBulk, 4); got != 1 { t.Fatalf("bulk adaptive window = %d, want 1", got) } if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassInteractive, 8); got != 8 { t.Fatalf("interactive adaptive window = %d, want 8", got) } snapshot := scheduler.Snapshot() if !snapshot.AdaptiveBackpressureActive || snapshot.AdaptiveBackpressureReason != "bulk_window_reduced_to_protect_interactive" { t.Fatalf("adaptive snapshot = %+v", snapshot) } if snapshot.RecommendedParallelWindows[FabricTrafficClassBulk] != 1 || snapshot.RecommendedParallelWindows[FabricTrafficClassInteractive] != 8 { t.Fatalf("recommended class windows = %+v", snapshot.RecommendedParallelWindows) } if snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 16 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 { t.Fatalf("traffic class counts = %+v", snapshot.TrafficClassCounts) } if !snapshot.BulkPressureActive || snapshot.BulkPressureChannelCount != 16 || snapshot.InteractiveOrControlCount != 1 || !snapshot.BackpressureActive { t.Fatalf("bulk pressure telemetry = %+v", snapshot) } if snapshot.PressureLevel != "warning" || snapshot.PressureScore <= 0 || !containsString(snapshot.PressureReasons, "bulk_pressure") { t.Fatalf("pressure = %s score=%d reasons=%v, want warning bulk pressure", snapshot.PressureLevel, snapshot.PressureScore, snapshot.PressureReasons) } if snapshot.RecommendedAction != "throttle_bulk" { t.Fatalf("recommended action = %q, want throttle_bulk", snapshot.RecommendedAction) } } func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) channelID := fabricFlowChannelID("vpn-1", 0) scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 1500*time.Millisecond) scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 1500*time.Millisecond) if got := scheduler.RecommendedParallelSendWindow(4); got >= 4 { t.Fatalf("pressure recommended window = %d, want reduced", got) } for i := 0; i < defaultFabricFlowQualityWindowCapacity; i++ { scheduler.RecordRouteSuccess(channelID, "route-primary", "relay-primary", time.Millisecond) } if got := scheduler.RecommendedParallelSendWindow(4); got != 4 { t.Fatalf("fresh-success recommended window = %d, want 4", got) } snapshot := scheduler.Snapshot() stat := snapshot.ChannelStats[channelID] if stat.SendFailures != 2 || stat.SendSuccesses != defaultFabricFlowQualityWindowCapacity { t.Fatalf("lifetime counters = failures:%d successes:%d", stat.SendFailures, stat.SendSuccesses) } if stat.QualityWindowSampleCount != defaultFabricFlowQualityWindowCapacity || stat.QualityWindowFailureCount != 0 || stat.QualityWindowSuccessCount != defaultFabricFlowQualityWindowCapacity { t.Fatalf("rolling quality stat = %+v, want old failures rolled out", stat) } if snapshot.FailingChannelCount != 0 || snapshot.SlowChannelCount != 0 || snapshot.BackpressureActive { t.Fatalf("rolling pressure snapshot = %+v, want clean fresh window", snapshot) } } func TestFabricClientPacketIngressReportsBoundedBackpressurePerLogicalChannel(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 1) packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) packetB := packetWithSameShard(packetA, 8) transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(4), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-primary", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil { t.Fatalf("send client packet batch: %v", err) } snapshot := ingress.Snapshot("cluster-1") if snapshot.FlowScheduler.QueueCapacity != 1 || snapshot.FlowScheduler.Dropped != 1 || snapshot.SendFlowDropped != 1 { t.Fatalf("drop telemetry = %+v send_dropped=%d, want one bounded drop", snapshot.FlowScheduler, snapshot.SendFlowDropped) } if snapshot.FlowScheduler.HighWatermark != 1 || len(transport.envelopes) != 1 { t.Fatalf("high watermark/envelopes = %d/%d, want bounded single queued send", snapshot.FlowScheduler.HighWatermark, len(transport.envelopes)) } } func TestFabricClientPacketIngressBoundedLoadRebuildsAwayFromWithdrawnRoute(t *testing.T) { scheduler := NewFabricFlowScheduler(16, 8) packets := packetsForDistinctShards(12, scheduler.shardCountValue()) if len(packets) != 12 { t.Fatalf("distinct packet count = %d", len(packets)) } channels := map[string]struct{}{} for _, packet := range packets { _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) channelID := fabricFlowChannelID("vpn-1", shard) channels[channelID] = struct{}{} scheduler.RecordRouteSuccess(channelID, "route-primary", "relay-primary", time.Millisecond) } transport := &routeFailingProductionTransport{failRouteID: "route-primary"} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(32), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-primary", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-alternate", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-alternate", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil { t.Fatalf("send first load batch: %v", err) } snapshot := ingress.Snapshot("cluster-1") if snapshot.SendFlowBatches != uint64(len(channels)) || snapshot.FlowScheduler.Dropped != 0 || snapshot.FlowScheduler.HighWatermark != 1 { t.Fatalf("first load scheduler snapshot = %+v", snapshot.FlowScheduler) } if snapshot.SendRouteFailures != uint64(len(channels)) { t.Fatalf("route failures = %d, want %d", snapshot.SendRouteFailures, len(channels)) } if transport.callsByRoute["route-primary"] != len(channels) || transport.callsByRoute["route-alternate"] != len(channels) { t.Fatalf("route calls = %+v, want primary and alternate per channel", transport.callsByRoute) } for channelID, stat := range snapshot.FlowScheduler.ChannelStats { if _, ok := channels[channelID]; !ok { continue } if stat.LastRouteID != "route-alternate" || stat.LastFailedRouteID != "" || stat.ConsecutiveFailures != 0 { t.Fatalf("channel %s stat after first load = %+v", channelID, stat) } } ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ RouteID: "route-primary", ReplacementRouteID: "route-alternate", RebuildRequestID: "rebuild-load-1", RebuildStatus: "applied", RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate", DecisionSource: "service_channel_feedback_replacement", Generation: "c18z-load", EffectiveHops: []string{"entry-1", "relay-alternate", "exit-1"}, }}, "c18z-load", time.Now().UTC()) primaryCallsBefore := transport.callsByRoute["route-primary"] if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil { t.Fatalf("send second load batch: %v", err) } snapshot = ingress.Snapshot("cluster-1") if transport.callsByRoute["route-primary"] != primaryCallsBefore { t.Fatalf("primary route was retried after withdrawal: before=%d after=%d calls=%+v", primaryCallsBefore, transport.callsByRoute["route-primary"], transport.callsByRoute) } if snapshot.RouteCandidateCount != 1 || snapshot.RouteManager.WithdrawnRouteCount != 1 || snapshot.RouteManagerTransition.Status != "applied_rebuild" { t.Fatalf("route manager snapshot = manager:%+v transition:%+v candidates=%d", snapshot.RouteManager, snapshot.RouteManagerTransition, snapshot.RouteCandidateCount) } if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 || snapshot.FlowScheduler.HighWatermark > 1 { t.Fatalf("second load scheduler snapshot = %+v send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped) } } func TestFabricClientPacketIngressQualityPreferenceOverridesStickyRoute(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389) _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) channelID := fabricFlowChannelID("vpn-1", shard) scheduler.RecordRouteSuccess(channelID, "route-slow", "relay-slow", time.Millisecond) transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-slow", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-slow", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-fast", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ RouteID: "route-fast", FeedbackStatus: "healthy", ScoreAdjustment: 90, Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, LastSendDurationMs: 1, ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), }}, time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send packet: %v", err) } if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-fast" { t.Fatalf("route = %+v, want quality-preferred fast route over sticky slow route", transport.envelopes) } snapshot := ingress.Snapshot("cluster-1") if snapshot.RouteQualityPreferenceCount != 1 { t.Fatalf("quality preference count = %d, want 1", snapshot.RouteQualityPreferenceCount) } stat := snapshot.FlowScheduler.ChannelStats[channelID] if stat.LastRouteID != "route-fast" || stat.LastFailedRouteID != "" { t.Fatalf("flow stat = %+v, want moved to fast healthy route", stat) } if stat.QualityPreferenceRouteID != "route-fast" || stat.QualityPreferenceScore != 90 || !containsString(stat.QualityPreferenceReasons, "service_channel_quality_latency_le_10ms") { t.Fatalf("quality preference stat = %+v, want applied fast route preference", stat) } } func TestFabricClientPacketIngressQualityPreferenceUsesEffectiveScore(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) packet := testIPv4TCPPacket([4]byte{10, 78, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389) _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) channelID := fabricFlowChannelID("vpn-1", shard) scheduler.RecordRouteSuccess(channelID, "route-sticky", "relay-sticky", time.Millisecond) transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-sticky", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-sticky", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-decayed-fast", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ RouteID: "route-decayed-fast", FeedbackStatus: "healthy", ScoreAdjustment: 20, RawScoreAdjustment: 90, Reasons: []string{"service_channel_recent_success", "service_channel_feedback_age_decay"}, LastSendDurationMs: 1, ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), }}, time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send packet: %v", err) } if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-sticky" { t.Fatalf("route = %+v, want decayed quality score to preserve sticky route", transport.envelopes) } snapshot := ingress.Snapshot("cluster-1") if len(snapshot.RouteQualityPreferences) != 1 { t.Fatalf("quality preferences = %+v, want one visible preference", snapshot.RouteQualityPreferences) } preference := snapshot.RouteQualityPreferences[0] if preference.ScoreAdjustment != 20 || preference.RawScoreAdjustment != 90 || !containsString(preference.Reasons, "service_channel_feedback_age_decay") { t.Fatalf("preference snapshot = %+v, want effective/raw score and decay reason", preference) } } func TestFabricClientPacketIngressQualityPreferencePreservesMultiChannelFairness(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) firstPacket := testIPv4TCPPacket([4]byte{10, 79, 0, 2}, [4]byte{192, 168, 200, 95}, 51002, 3389) secondPacket := packetWithDifferentShard(firstPacket, scheduler.shardCountValue()) firstChannel := testFlowChannelID("vpn-1", firstPacket, scheduler.shardCountValue()) secondChannel := testFlowChannelID("vpn-1", secondPacket, scheduler.shardCountValue()) if firstChannel == secondChannel { t.Fatalf("test packets unexpectedly map to same channel %s", firstChannel) } scheduler.RecordRouteSuccess(firstChannel, "route-slow", "relay-slow", time.Millisecond) scheduler.RecordRouteSuccess(secondChannel, "route-slow", "relay-slow", time.Millisecond) transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(16), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-slow", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-slow", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-fast", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ RouteID: "route-fast", FeedbackStatus: "healthy", ScoreAdjustment: 90, RawScoreAdjustment: 90, Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, LastSendDurationMs: 1, ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), }}, time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{firstPacket, secondPacket}); err != nil { t.Fatalf("send packet batch: %v", err) } if len(transport.envelopes) != 2 { t.Fatalf("envelope count = %d, want one envelope per logical channel", len(transport.envelopes)) } for _, envelope := range transport.envelopes { if envelope.RouteID != "route-fast" { t.Fatalf("envelope route = %s, want quality-preferred fast route", envelope.RouteID) } } snapshot := ingress.Snapshot("cluster-1") for _, channelID := range []string{firstChannel, secondChannel} { stat := snapshot.FlowScheduler.ChannelStats[channelID] if stat.LastRouteID != "route-fast" || stat.Served != 1 || stat.Dropped != 0 { t.Fatalf("channel %s stat = %+v, want fair fast-route service without drops", channelID, stat) } if stat.QualityPreferenceRouteID != "route-fast" || stat.QualityPreferenceScore != 90 { t.Fatalf("channel %s quality stat = %+v, want applied quality preference", channelID, stat) } } if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 { t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped) } } func TestFabricClientPacketIngressClearsStaleQualityPreferenceMarkers(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) packet := testIPv4TCPPacket([4]byte{10, 80, 0, 2}, [4]byte{192, 168, 200, 95}, 51003, 3389) channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-slow", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-slow", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-fast", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.PreferClientRoute("route-slow") ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ RouteID: "route-fast", FeedbackStatus: "healthy", ScoreAdjustment: 90, RawScoreAdjustment: 90, Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, LastSendDurationMs: 1, ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), }}, time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send packet: %v", err) } stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID] if stat.QualityPreferenceRouteID != "route-fast" { t.Fatalf("quality preference marker = %+v, want route-fast", stat) } ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ RouteID: "route-fast", FeedbackStatus: "healthy", ScoreAdjustment: 90, RawScoreAdjustment: 90, ExpiresAt: time.Now().UTC().Add(-time.Second).Format(time.RFC3339Nano), }}, time.Now().UTC()) snapshot := ingress.Snapshot("cluster-1") if snapshot.RouteQualityPreferenceCount != 0 { t.Fatalf("quality preference count = %d, want expired preference removed", snapshot.RouteQualityPreferenceCount) } stat = snapshot.FlowScheduler.ChannelStats[channelID] if stat.QualityPreferenceRouteID != "" || stat.QualityPreferenceScore != 0 || len(stat.QualityPreferenceReasons) != 0 { t.Fatalf("stale quality marker = %+v, want cleared", stat) } } func TestFabricClientPacketIngressClearsWithdrawnQualityPreferenceMarkers(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) packet := testIPv4TCPPacket([4]byte{10, 81, 0, 2}, [4]byte{192, 168, 200, 95}, 51004, 3389) channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{ { RouteID: "route-slow", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-slow", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, { RouteID: "route-fast", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }, } }, } ingress.PreferClientRoute("route-slow") ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{ RouteID: "route-fast", FeedbackStatus: "healthy", ScoreAdjustment: 90, RawScoreAdjustment: 90, Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"}, LastSendDurationMs: 1, ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano), }}, time.Now().UTC()) if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send packet: %v", err) } ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{ RouteID: "route-fast", ReplacementRouteID: "route-slow", RebuildStatus: "applied", DecisionSource: "test_withdraw_quality_route", }}, "config-withdraw-quality", time.Now().UTC()) stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID] if stat.QualityPreferenceRouteID != "" || stat.QualityPreferenceScore != 0 { t.Fatalf("withdrawn quality marker = %+v, want cleared", stat) } } func TestFabricClientPacketIngressReportsRoutePolicyProvenance(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) packet := testIPv4TCPPacket([4]byte{10, 91, 0, 2}, [4]byte{192, 168, 200, 95}, 51014, 443) channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue()) transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(8), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", RecoveryPolicyFingerprint: "policy-fp-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, RouteVersion: "route-v1", PolicyVersion: "policy-v1", }} }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil { t.Fatalf("send packet: %v", err) } stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID] if stat.LastRouteID != "route-primary" || stat.RoutePolicyVersion != "policy-v1" || stat.RouteGeneration != "policy-v1" || stat.RecoveryPolicyFingerprint != "policy-fp-1" { t.Fatalf("route provenance stat = %+v", stat) } } func TestFabricClientPacketIngressBoundedLoadReportsPerChannelDrops(t *testing.T) { scheduler := NewFabricFlowScheduler(8, 8) packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443) packets := make([][]byte, 0, 32) for i := 0; i < 32; i++ { packets = append(packets, packetWithSameShard(packet, scheduler.shardCountValue())) } transport := &captureManyProductionTransport{} ingress := &FabricClientPacketIngress{ ForwardTransport: transport, Inbox: NewFabricPacketInbox(32), FlowScheduler: scheduler, ClusterID: "cluster-1", LocalNodeID: "entry-1", Routes: func() []mesh.SyntheticRoute { return []mesh.SyntheticRoute{{ RouteID: "route-primary", ClusterID: "cluster-1", SourceNodeID: "entry-1", DestinationNodeID: "exit-1", Hops: []string{"entry-1", "relay-primary", "exit-1"}, AllowedChannels: []string{mesh.ProductionChannelVPNPacket}, ExpiresAt: time.Now().UTC().Add(time.Minute), MaxTTL: 8, }} }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil { t.Fatalf("send congested load batch: %v", err) } snapshot := ingress.Snapshot("cluster-1") if snapshot.FlowScheduler.QueueCapacity != 8 || snapshot.FlowScheduler.HighWatermark != 8 { t.Fatalf("capacity/high watermark = %d/%d, want 8/8", snapshot.FlowScheduler.QueueCapacity, snapshot.FlowScheduler.HighWatermark) } if snapshot.FlowScheduler.Dropped != 24 || snapshot.SendFlowDropped != 24 || !snapshot.FlowScheduler.BackpressureActive { t.Fatalf("drop telemetry = %+v send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped) } if snapshot.SendFlowPackets != 8 || len(transport.envelopes) != 1 { t.Fatalf("sent packets/envelopes = %d/%d, want bounded 8/1", snapshot.SendFlowPackets, len(transport.envelopes)) } } func TestFabricClientPacketIngressUsesLocalGatewayShortcutWithoutRoute(t *testing.T) { inbox := NewFabricPacketInbox(4) ingress := &FabricClientPacketIngress{ Inbox: inbox, ClusterID: "cluster-1", LocalNodeID: "entry-1", AllowLegacyLocalGatewayFallback: true, LocalGateway: func(vpnConnectionID string) bool { return vpnConnectionID == "vpn-1" }, Routes: func() []mesh.SyntheticRoute { return nil }, } if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil { t.Fatalf("send local gateway packet: %v", err) } packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second) if err != nil { t.Fatalf("receive local gateway packet: %v", err) } if len(packets) != 1 || string(packets[0]) != "packet" { t.Fatalf("packets = %#v", packets) } } func TestFabricClientPacketIngressReceivesLocalGatewayReplyWithoutRoute(t *testing.T) { inbox := NewFabricPacketInbox(4) ingress := &FabricClientPacketIngress{ Inbox: inbox, ClusterID: "cluster-1", LocalNodeID: "entry-1", AllowLegacyLocalGatewayFallback: true, LocalGateway: func(vpnConnectionID string) bool { return vpnConnectionID == "vpn-1" }, Routes: func() []mesh.SyntheticRoute { return nil }, } if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{[]byte("reply")}); err != nil { t.Fatalf("deliver local gateway reply: %v", err) } packets, err := ingress.ReceiveClientPacketBatch(context.Background(), "cluster-1", "vpn-1", time.Second) if err != nil { t.Fatalf("receive local gateway reply: %v", err) } if len(packets) != 1 || string(packets[0]) != "reply" { t.Fatalf("packets = %#v", packets) } } func testIPv4TCPPacket(src [4]byte, dst [4]byte, srcPort uint16, dstPort uint16) []byte { packet := make([]byte, 40) packet[0] = 0x45 packet[2] = 0 packet[3] = 40 packet[8] = 64 packet[9] = 6 copy(packet[12:16], src[:]) copy(packet[16:20], dst[:]) packet[20] = byte(srcPort >> 8) packet[21] = byte(srcPort) packet[22] = byte(dstPort >> 8) packet[23] = byte(dstPort) packet[32] = 0x50 return packet } func packetWithDifferentShard(reference []byte, shardCount int) []byte { _, referenceShard := classifyPacketFlow(reference, shardCount) for port := uint16(10000); port < 11000; port++ { candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443) _, shard := classifyPacketFlow(candidate, shardCount) if shard != referenceShard { return candidate } } return testIPv4TCPPacket([4]byte{10, 77, 0, 3}, [4]byte{192, 168, 200, 96}, 52000, 443) } func packetShard(packet []byte, shardCount int) int { _, shard := classifyPacketFlow(packet, shardCount) return shard } func packetSourcePort(packet []byte) uint16 { if len(packet) < 22 { return 0 } return uint16(packet[20])<<8 | uint16(packet[21]) } func testFlowChannelID(vpnConnectionID string, packet []byte, shardCount int) string { return fabricFlowChannelID(vpnConnectionID, packetShard(packet, shardCount)) } func packetWithSameShard(reference []byte, shardCount int) []byte { _, referenceShard := classifyPacketFlow(reference, shardCount) for port := uint16(11000); port < 12000; port++ { candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 3389) _, shard := classifyPacketFlow(candidate, shardCount) if shard == referenceShard { return candidate } } return append([]byte{}, reference...) } func packetsForDistinctShards(count int, shardCount int) [][]byte { packets := make([][]byte, 0, count) seen := map[int]struct{}{} for port := uint16(10000); port < 65000 && len(packets) < count; port++ { candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443) _, shard := classifyPacketFlow(candidate, shardCount) if _, ok := seen[shard]; ok { continue } seen[shard] = struct{}{} packets = append(packets, candidate) } return packets } func packetsForOrderedDistinctChannels(shardCount int) ([]byte, []byte) { packets := packetsForDistinctShards(shardCount, shardCount) if len(packets) < 2 { panic("not enough distinct channel packets") } for _, left := range packets { leftChannel := testFlowChannelID("vpn-1", left, shardCount) for _, right := range packets { rightChannel := testFlowChannelID("vpn-1", right, shardCount) if leftChannel < rightChannel { return left, right } } } panic("failed to find ordered distinct channel packets") } func expectedScheduledChannelCount(scheduler *FabricFlowScheduler, packets [][]byte) int { channels := map[string]struct{}{} for _, packet := range packets { _, shard := classifyPacketFlow(packet, scheduler.shardCountValue()) channels[fmt.Sprintf("flow-%02d", shard)] = struct{}{} } return len(channels) } func closeOnce(ch chan struct{}) { defer func() { _ = recover() }() close(ch) }