2631 lines
105 KiB
Go
2631 lines
105 KiB
Go
package vpnruntime
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
|
|
)
|
|
|
|
type captureProductionTransport struct {
|
|
nextNodeID string
|
|
envelope mesh.ProductionEnvelope
|
|
}
|
|
|
|
func (t *captureProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) {
|
|
t.nextNodeID = nextNodeID
|
|
t.envelope = envelope
|
|
return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil
|
|
}
|
|
|
|
type failoverProductionTransport struct {
|
|
failNextHop string
|
|
calls []string
|
|
envelope mesh.ProductionEnvelope
|
|
}
|
|
|
|
func (t *failoverProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) {
|
|
t.calls = append(t.calls, nextNodeID)
|
|
if nextNodeID == t.failNextHop {
|
|
return mesh.ProductionForwardResult{}, mesh.ErrForwardPeerUnavailable
|
|
}
|
|
t.envelope = envelope
|
|
return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil
|
|
}
|
|
|
|
type captureManyProductionTransport struct {
|
|
calls []string
|
|
envelopes []mesh.ProductionEnvelope
|
|
}
|
|
|
|
func (t *captureManyProductionTransport) SendProduction(_ context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) {
|
|
t.calls = append(t.calls, nextNodeID)
|
|
t.envelopes = append(t.envelopes, envelope)
|
|
return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil
|
|
}
|
|
|
|
type routeFailingProductionTransport struct {
|
|
failRouteID string
|
|
callsByRoute map[string]int
|
|
envelopes []mesh.ProductionEnvelope
|
|
}
|
|
|
|
func (t *routeFailingProductionTransport) SendProduction(_ context.Context, _ string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) {
|
|
if t.callsByRoute == nil {
|
|
t.callsByRoute = map[string]int{}
|
|
}
|
|
t.callsByRoute[envelope.RouteID]++
|
|
if envelope.RouteID == t.failRouteID {
|
|
return mesh.ProductionForwardResult{}, mesh.ErrForwardPeerUnavailable
|
|
}
|
|
t.envelopes = append(t.envelopes, envelope)
|
|
return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil
|
|
}
|
|
|
|
type blockingProductionTransport struct {
|
|
mu sync.Mutex
|
|
calls []string
|
|
envelopes []mesh.ProductionEnvelope
|
|
slowPort uint16
|
|
slowStarted chan struct{}
|
|
releaseSlow chan struct{}
|
|
fastDone chan struct{}
|
|
}
|
|
|
|
func (t *blockingProductionTransport) SendProduction(ctx context.Context, nextNodeID string, envelope mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) {
|
|
payload, err := mesh.DecodeProductionVPNPacketBatch(envelope)
|
|
if err != nil {
|
|
return mesh.ProductionForwardResult{}, err
|
|
}
|
|
isSlow := false
|
|
for _, packet := range payload.Packets {
|
|
if packetSourcePort(packet) == t.slowPort {
|
|
isSlow = true
|
|
break
|
|
}
|
|
}
|
|
if isSlow {
|
|
closeOnce(t.slowStarted)
|
|
select {
|
|
case <-t.releaseSlow:
|
|
case <-ctx.Done():
|
|
return mesh.ProductionForwardResult{}, ctx.Err()
|
|
}
|
|
} else if t.slowStarted != nil {
|
|
// Keep fast sends behind the observed slow-start boundary so the test
|
|
// deterministically exercises overlapping in-flight sends.
|
|
select {
|
|
case <-t.slowStarted:
|
|
case <-ctx.Done():
|
|
return mesh.ProductionForwardResult{}, ctx.Err()
|
|
}
|
|
}
|
|
t.mu.Lock()
|
|
t.calls = append(t.calls, nextNodeID)
|
|
t.envelopes = append(t.envelopes, envelope)
|
|
t.mu.Unlock()
|
|
if !isSlow {
|
|
closeOnce(t.fastDone)
|
|
}
|
|
return mesh.ProductionForwardResult{Accepted: true, Delivered: true}, nil
|
|
}
|
|
|
|
type memoryPacketTransport struct {
|
|
sendErr error
|
|
recvErr error
|
|
sent [][]byte
|
|
recv [][]byte
|
|
}
|
|
|
|
type captureFabricSessionSender struct {
|
|
err error
|
|
frames []fabricproto.Frame
|
|
closed bool
|
|
}
|
|
|
|
func (s *captureFabricSessionSender) Send(_ context.Context, frame fabricproto.Frame) error {
|
|
if s.err != nil {
|
|
return s.err
|
|
}
|
|
s.frames = append(s.frames, frame)
|
|
return nil
|
|
}
|
|
|
|
func (s *captureFabricSessionSender) Close() error {
|
|
s.closed = true
|
|
return nil
|
|
}
|
|
|
|
type memoryFabricSessionReceiver struct {
|
|
frames chan fabricproto.Frame
|
|
errors chan error
|
|
}
|
|
|
|
func (r memoryFabricSessionReceiver) Frames() <-chan fabricproto.Frame {
|
|
return r.frames
|
|
}
|
|
|
|
func (r memoryFabricSessionReceiver) Errors() <-chan error {
|
|
return r.errors
|
|
}
|
|
|
|
func (t *memoryPacketTransport) SendGatewayPacketBatch(_ context.Context, packets [][]byte) error {
|
|
if t.sendErr != nil {
|
|
return t.sendErr
|
|
}
|
|
t.sent = append(t.sent, packets...)
|
|
return nil
|
|
}
|
|
|
|
func (t *memoryPacketTransport) ReceiveGatewayPacketBatch(_ context.Context, _ time.Duration) ([][]byte, error) {
|
|
if t.recvErr != nil {
|
|
return nil, t.recvErr
|
|
}
|
|
packets := t.recv
|
|
t.recv = nil
|
|
return packets, nil
|
|
}
|
|
|
|
func TestFabricPacketTransportSendsVPNPacketBatchEnvelope(t *testing.T) {
|
|
capture := &captureProductionTransport{}
|
|
transport := &FabricPacketTransport{
|
|
ForwardTransport: capture,
|
|
ClusterID: "cluster-1",
|
|
VPNConnectionID: "vpn-1",
|
|
RouteID: "route-1",
|
|
LocalNodeID: "exit-1",
|
|
RemoteNodeID: "entry-1",
|
|
NextHopNodeID: "relay-1",
|
|
RoutePath: []string{"exit-1", "relay-1", "entry-1"},
|
|
SendDirection: FabricDirectionGatewayToClient,
|
|
}
|
|
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send fabric packet batch: %v", err)
|
|
}
|
|
if capture.nextNodeID != "relay-1" {
|
|
t.Fatalf("next node = %q", capture.nextNodeID)
|
|
}
|
|
if capture.envelope.CurrentHopNodeID != "relay-1" || capture.envelope.NextHopNodeID != "entry-1" {
|
|
t.Fatalf("envelope hop = %s -> %s, want relay-1 -> entry-1", capture.envelope.CurrentHopNodeID, capture.envelope.NextHopNodeID)
|
|
}
|
|
payload, err := mesh.DecodeProductionVPNPacketBatch(capture.envelope)
|
|
if err != nil {
|
|
t.Fatalf("decode envelope payload: %v", err)
|
|
}
|
|
if payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionGatewayToClient || string(payload.Packets[0]) != "packet" {
|
|
t.Fatalf("payload = %+v", payload)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportSendsDataFrame(t *testing.T) {
|
|
sender := &captureFabricSessionSender{}
|
|
transport := &FabricSessionPacketTransport{
|
|
Sender: sender,
|
|
StreamID: 700,
|
|
VPNConnectionID: "vpn-1",
|
|
SendDirection: FabricDirectionClientToGateway,
|
|
TrafficClass: FabricTrafficClassInteractive,
|
|
}
|
|
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send fabric session packet batch: %v", err)
|
|
}
|
|
if len(sender.frames) != 1 {
|
|
t.Fatalf("sent frames = %d, want 1", len(sender.frames))
|
|
}
|
|
frame := sender.frames[0]
|
|
if frame.Type != fabricproto.FrameData || frame.StreamID != 700 || frame.Sequence != 1 || frame.TrafficClass != fabricproto.TrafficClassInteractive {
|
|
t.Fatalf("frame = %+v", frame)
|
|
}
|
|
payload, err := DecodeFabricVPNPacketDataFrame(frame)
|
|
if err != nil {
|
|
t.Fatalf("decode sent frame: %v", err)
|
|
}
|
|
if payload.VPNConnectionID != "vpn-1" || payload.Direction != FabricDirectionClientToGateway || len(payload.Packets) != 1 || string(payload.Packets[0]) != "packet" {
|
|
t.Fatalf("payload = %+v", payload)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportShardsStreamsByTrafficClass(t *testing.T) {
|
|
sender := &captureFabricSessionSender{}
|
|
transport := &FabricSessionPacketTransport{
|
|
Sender: sender,
|
|
StreamID: 700,
|
|
VPNConnectionID: "vpn-1",
|
|
SendDirection: FabricDirectionClientToGateway,
|
|
StreamIDsByTrafficClass: map[string][]uint64{
|
|
FabricTrafficClassInteractive: []uint64{801, 802},
|
|
FabricTrafficClassBulk: []uint64{901, 902},
|
|
},
|
|
}
|
|
bulkPacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443)
|
|
controlPacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389)
|
|
controlPacket[33] = 0x02
|
|
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{bulkPacket}); err != nil {
|
|
t.Fatalf("send bulk packet: %v", err)
|
|
}
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{controlPacket}); err != nil {
|
|
t.Fatalf("send control packet: %v", err)
|
|
}
|
|
if len(sender.frames) != 2 {
|
|
t.Fatalf("sent frames = %d, want 2", len(sender.frames))
|
|
}
|
|
if sender.frames[0].TrafficClass != fabricproto.TrafficClassBulk || sender.frames[0].StreamID < 901 || sender.frames[0].StreamID > 902 {
|
|
t.Fatalf("bulk frame did not use bulk shard: %+v", sender.frames[0])
|
|
}
|
|
if sender.frames[1].TrafficClass != fabricproto.TrafficClassInteractive || sender.frames[1].StreamID < 801 || sender.frames[1].StreamID > 802 {
|
|
t.Fatalf("control frame did not use interactive shard: %+v", sender.frames[1])
|
|
}
|
|
if sender.frames[0].Sequence != 1 || sender.frames[1].Sequence != 1 {
|
|
t.Fatalf("per-stream sequences = %d/%d, want 1/1", sender.frames[0].Sequence, sender.frames[1].Sequence)
|
|
}
|
|
snapshot := transport.Snapshot()
|
|
if snapshot["schema_version"] != "rap.vpn_fabric_session_packet_transport.v1" {
|
|
t.Fatalf("snapshot schema missing: %+v", snapshot)
|
|
}
|
|
if snapshot["stream_class_count"] != 2 ||
|
|
snapshot["stream_shard_count"] != 4 ||
|
|
snapshot["send_class_count"] != 2 ||
|
|
snapshot["send_stream_count"] != 2 ||
|
|
snapshot["sharding_active"] != true {
|
|
t.Fatalf("unexpected shard summary: %+v", snapshot)
|
|
}
|
|
framesByClass := snapshot["send_frames_by_class"].(map[string]uint64)
|
|
if framesByClass[FabricTrafficClassBulk] != 1 || framesByClass[FabricTrafficClassInteractive] != 1 {
|
|
t.Fatalf("send frames by class = %+v", framesByClass)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportSplitsMixedBatchByStream(t *testing.T) {
|
|
sender := &captureFabricSessionSender{}
|
|
transport := &FabricSessionPacketTransport{
|
|
Sender: sender,
|
|
VPNConnectionID: "vpn-1",
|
|
SendDirection: FabricDirectionClientToGateway,
|
|
StreamIDsByTrafficClass: map[string][]uint64{
|
|
FabricTrafficClassInteractive: []uint64{801},
|
|
FabricTrafficClassBulk: []uint64{901, 902},
|
|
},
|
|
}
|
|
bulkA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443)
|
|
bulkB := packetWithDifferentShard(bulkA, 2)
|
|
control := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389)
|
|
control[33] = 0x02
|
|
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{bulkA, bulkB, control}); err != nil {
|
|
t.Fatalf("send mixed batch: %v", err)
|
|
}
|
|
if len(sender.frames) != 3 {
|
|
t.Fatalf("sent frames = %d, want 3: %+v", len(sender.frames), sender.frames)
|
|
}
|
|
streams := map[uint64]fabricproto.TrafficClass{}
|
|
for _, frame := range sender.frames {
|
|
streams[frame.StreamID] = frame.TrafficClass
|
|
}
|
|
if streams[801] != fabricproto.TrafficClassInteractive ||
|
|
streams[901] != fabricproto.TrafficClassBulk ||
|
|
streams[902] != fabricproto.TrafficClassBulk {
|
|
t.Fatalf("unexpected stream/class split: %+v", sender.frames)
|
|
}
|
|
snapshot := transport.Snapshot()
|
|
if snapshot["send_stream_count"] != 3 ||
|
|
snapshot["send_class_count"] != 2 ||
|
|
snapshot["split_batch_count"] != uint64(1) ||
|
|
snapshot["last_batch_frame_count"] != uint64(3) ||
|
|
snapshot["max_batch_frame_count"] != uint64(3) {
|
|
t.Fatalf("unexpected mixed-batch shard summary: %+v", snapshot)
|
|
}
|
|
packetsByStream := snapshot["send_packets_by_stream_id"].(map[string]uint64)
|
|
if packetsByStream["801"] != 1 || packetsByStream["901"] != 1 || packetsByStream["902"] != 1 {
|
|
t.Fatalf("unexpected packets by stream: %+v", packetsByStream)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportFanoutBoundedByConfiguredStreams(t *testing.T) {
|
|
sender := &captureFabricSessionSender{}
|
|
transport := &FabricSessionPacketTransport{
|
|
Sender: sender,
|
|
VPNConnectionID: "vpn-1",
|
|
SendDirection: FabricDirectionClientToGateway,
|
|
StreamIDsByTrafficClass: map[string][]uint64{
|
|
FabricTrafficClassBulk: []uint64{901, 902, 903, 904},
|
|
},
|
|
}
|
|
var packets [][]byte
|
|
for port := uint16(10000); port < 10100; port++ {
|
|
packets = append(packets, testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443))
|
|
}
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), packets); err != nil {
|
|
t.Fatalf("send large flow batch: %v", err)
|
|
}
|
|
if len(sender.frames) > 4 {
|
|
t.Fatalf("fanout = %d, want bounded by 4 streams", len(sender.frames))
|
|
}
|
|
snapshot := transport.Snapshot()
|
|
if snapshot["max_batch_frame_count"].(uint64) > 4 {
|
|
t.Fatalf("max fanout not bounded: %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportClosesAllStreamShards(t *testing.T) {
|
|
sender := &captureFabricSessionSender{}
|
|
transport := &FabricSessionPacketTransport{
|
|
Sender: sender,
|
|
StreamID: 700,
|
|
StreamIDsByTrafficClass: map[string][]uint64{
|
|
FabricTrafficClassInteractive: []uint64{801, 802},
|
|
FabricTrafficClassBulk: []uint64{901, 902},
|
|
},
|
|
}
|
|
if err := transport.Close(); err != nil {
|
|
t.Fatalf("close transport: %v", err)
|
|
}
|
|
if !sender.closed {
|
|
t.Fatal("underlying fabric session was not closed")
|
|
}
|
|
closed := map[uint64]bool{}
|
|
for _, frame := range sender.frames {
|
|
if frame.Type == fabricproto.FrameCloseStream {
|
|
closed[frame.StreamID] = true
|
|
}
|
|
}
|
|
for _, streamID := range []uint64{700, 801, 802, 901, 902} {
|
|
if !closed[streamID] {
|
|
t.Fatalf("stream %d was not closed; frames=%+v", streamID, sender.frames)
|
|
}
|
|
}
|
|
snapshot := transport.Snapshot()
|
|
if snapshot["close_stream_frames"] != uint64(5) || snapshot["close_errors"] != uint64(0) {
|
|
t.Fatalf("unexpected close counters: %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
receiver := memoryFabricSessionReceiver{
|
|
frames: make(chan fabricproto.Frame, 2),
|
|
errors: make(chan error, 1),
|
|
}
|
|
transport := &FabricSessionPacketTransport{
|
|
Receiver: receiver,
|
|
Inbox: inbox,
|
|
StreamID: 701,
|
|
VPNConnectionID: "vpn-1",
|
|
}
|
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
|
StreamID: 701,
|
|
Sequence: 1,
|
|
VPNConnectionID: "vpn-1",
|
|
Direction: FabricDirectionClientToGateway,
|
|
Packets: [][]byte{[]byte("packet")},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("new fabric vpn frame: %v", err)
|
|
}
|
|
receiver.frames <- fabricproto.Frame{Type: fabricproto.FrameAck, StreamID: 701, Sequence: 1}
|
|
receiver.frames <- frame
|
|
close(receiver.frames)
|
|
|
|
if err := transport.RunFrameIngress(context.Background()); err != nil {
|
|
t.Fatalf("run frame ingress: %v", err)
|
|
}
|
|
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "packet" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportReceiveReadsPumpFrames(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
receiver := memoryFabricSessionReceiver{
|
|
frames: make(chan fabricproto.Frame, 1),
|
|
errors: make(chan error, 1),
|
|
}
|
|
transport := &FabricSessionPacketTransport{
|
|
Receiver: receiver,
|
|
Inbox: inbox,
|
|
StreamID: 711,
|
|
VPNConnectionID: "vpn-1",
|
|
ReceiveDirection: FabricDirectionClientToGateway,
|
|
}
|
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
|
StreamID: 711,
|
|
Sequence: 1,
|
|
VPNConnectionID: "vpn-1",
|
|
Direction: FabricDirectionClientToGateway,
|
|
Packets: [][]byte{[]byte("request")},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("new fabric vpn frame: %v", err)
|
|
}
|
|
receiver.frames <- frame
|
|
|
|
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "request" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
snapshot := transport.Snapshot()
|
|
framesByClass := snapshot["receive_frames_by_class"].(map[string]uint64)
|
|
packetsByStream := snapshot["receive_packets_by_stream_id"].(map[string]uint64)
|
|
if framesByClass[FabricTrafficClassBulk] != 1 || packetsByStream["711"] != 1 {
|
|
t.Fatalf("unexpected receive counters: %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricSessionPacketTransportIngressIgnoresOtherStreams(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
receiver := memoryFabricSessionReceiver{
|
|
frames: make(chan fabricproto.Frame, 1),
|
|
errors: make(chan error, 1),
|
|
}
|
|
transport := &FabricSessionPacketTransport{
|
|
Receiver: receiver,
|
|
Inbox: inbox,
|
|
StreamID: 701,
|
|
VPNConnectionID: "vpn-1",
|
|
}
|
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
|
StreamID: 702,
|
|
Sequence: 1,
|
|
VPNConnectionID: "vpn-1",
|
|
Direction: FabricDirectionClientToGateway,
|
|
Packets: [][]byte{[]byte("packet")},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("new fabric vpn frame: %v", err)
|
|
}
|
|
receiver.frames <- frame
|
|
close(receiver.frames)
|
|
|
|
if err := transport.RunFrameIngress(context.Background()); err != nil {
|
|
t.Fatalf("run frame ingress: %v", err)
|
|
}
|
|
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), 10*time.Millisecond)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 0 {
|
|
t.Fatalf("packets = %#v, want none", packets)
|
|
}
|
|
}
|
|
|
|
func TestFabricPacketInboxReceivesMatchingDirection(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
envelope, err := mesh.NewProductionVPNPacketBatchEnvelope(mesh.ProductionVPNPacketEnvelopeInput{
|
|
RouteID: "route-1",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
CurrentHopNodeID: "exit-1",
|
|
NextHopNodeID: "exit-1",
|
|
RoutePath: []string{"entry-1", "exit-1"},
|
|
VPNConnectionID: "vpn-1",
|
|
Direction: FabricDirectionClientToGateway,
|
|
Packets: [][]byte{[]byte("packet")},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("new envelope: %v", err)
|
|
}
|
|
if err := inbox.DeliverProductionEnvelope(context.Background(), envelope); err != nil {
|
|
t.Fatalf("deliver envelope: %v", err)
|
|
}
|
|
|
|
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "packet" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
}
|
|
|
|
func TestFabricPacketInboxKeepsDirectionsIsolated(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{[]byte("reply")}); err != nil {
|
|
t.Fatalf("deliver gateway packet: %v", err)
|
|
}
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionClientToGateway, [][]byte{[]byte("request")}); err != nil {
|
|
t.Fatalf("deliver client packet: %v", err)
|
|
}
|
|
|
|
clientPackets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive client direction: %v", err)
|
|
}
|
|
gatewayPackets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway direction: %v", err)
|
|
}
|
|
if len(clientPackets) != 1 || string(clientPackets[0]) != "request" {
|
|
t.Fatalf("client packets = %#v", clientPackets)
|
|
}
|
|
if len(gatewayPackets) != 1 || string(gatewayPackets[0]) != "reply" {
|
|
t.Fatalf("gateway packets = %#v", gatewayPackets)
|
|
}
|
|
}
|
|
|
|
func TestFabricPacketInboxDropsEmptyPackets(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{nil, []byte{}, []byte("reply")}); err != nil {
|
|
t.Fatalf("deliver gateway packet: %v", err)
|
|
}
|
|
|
|
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "reply" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
}
|
|
|
|
func TestFabricVPNPacketDataFrameRoundTrip(t *testing.T) {
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 57032, 3389)
|
|
packet[33] = 0x18
|
|
now := time.Unix(1700000000, 123).UTC()
|
|
|
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
|
StreamID: 55,
|
|
Sequence: 9,
|
|
VPNConnectionID: "vpn-1",
|
|
Direction: FabricDirectionClientToGateway,
|
|
TrafficClass: FabricTrafficClassInteractive,
|
|
Packets: [][]byte{packet},
|
|
Now: now,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("new fabric vpn frame: %v", err)
|
|
}
|
|
if frame.Type != fabricproto.FrameData || frame.StreamID != 55 || frame.Sequence != 9 || frame.TrafficClass != fabricproto.TrafficClassInteractive {
|
|
t.Fatalf("frame = %+v", frame)
|
|
}
|
|
payload, err := DecodeFabricVPNPacketDataFrame(frame)
|
|
if err != nil {
|
|
t.Fatalf("decode fabric vpn frame: %v", err)
|
|
}
|
|
if payload.SchemaVersion != "rap.vpn_packet_batch.fabric.v1" ||
|
|
payload.VPNConnectionID != "vpn-1" ||
|
|
payload.Direction != FabricDirectionClientToGateway ||
|
|
!payload.SentAt.Equal(now) ||
|
|
len(payload.Packets) != 1 ||
|
|
string(payload.Packets[0]) != string(packet) {
|
|
t.Fatalf("payload = %+v", payload)
|
|
}
|
|
}
|
|
|
|
func TestFabricPacketInboxReceivesFabricSessionFrame(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
|
StreamID: 88,
|
|
Sequence: 1,
|
|
VPNConnectionID: "vpn-1",
|
|
Direction: FabricDirectionGatewayToClient,
|
|
Packets: [][]byte{[]byte("reply")},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("new fabric vpn frame: %v", err)
|
|
}
|
|
if err := inbox.DeliverFabricSessionFrame(context.Background(), frame); err != nil {
|
|
t.Fatalf("deliver fabric session frame: %v", err)
|
|
}
|
|
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive fabric session packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "reply" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
}
|
|
|
|
func TestFabricVPNPacketDataFrameInfersInteractiveTCPControl(t *testing.T) {
|
|
packet := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032)
|
|
packet[33] = 0x12
|
|
frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{
|
|
StreamID: 91,
|
|
Sequence: 1,
|
|
VPNConnectionID: "vpn-1",
|
|
Direction: FabricDirectionGatewayToClient,
|
|
TrafficClass: FabricTrafficClassBulk,
|
|
Packets: [][]byte{packet},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("new fabric vpn frame: %v", err)
|
|
}
|
|
if frame.TrafficClass != fabricproto.TrafficClassInteractive {
|
|
t.Fatalf("traffic class = %v, want interactive", frame.TrafficClass)
|
|
}
|
|
}
|
|
|
|
func TestFabricPacketInboxPrioritizesGatewayTCPControlPackets(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000)
|
|
priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032)
|
|
priority[33] = 0x12
|
|
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil {
|
|
t.Fatalf("deliver normal gateway packet: %v", err)
|
|
}
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority}); err != nil {
|
|
t.Fatalf("deliver priority gateway packet: %v", err)
|
|
}
|
|
|
|
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != string(priority) {
|
|
t.Fatalf("first packets = %#v, want priority tcp control packet", packets)
|
|
}
|
|
}
|
|
|
|
func TestFabricPacketInboxWaitsBrieflyForGatewayTCPControlPackets(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000)
|
|
priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032)
|
|
priority[33] = 0x12
|
|
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil {
|
|
t.Fatalf("deliver normal gateway packet: %v", err)
|
|
}
|
|
go func() {
|
|
time.Sleep(time.Millisecond)
|
|
_ = inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority})
|
|
}()
|
|
|
|
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != string(priority) {
|
|
t.Fatalf("first packets = %#v, want delayed priority tcp control packet", packets)
|
|
}
|
|
}
|
|
|
|
func TestLocalPacketTransportUsesFabricInboxDirections(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
transport := &LocalPacketTransport{Inbox: inbox, VPNConnectionID: "vpn-1"}
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionClientToGateway, [][]byte{[]byte("request")}); err != nil {
|
|
t.Fatalf("deliver client packet: %v", err)
|
|
}
|
|
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "request" {
|
|
t.Fatalf("received packets = %#v", packets)
|
|
}
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil {
|
|
t.Fatalf("send gateway packet: %v", err)
|
|
}
|
|
replies, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive client reply: %v", err)
|
|
}
|
|
if len(replies) != 1 || string(replies[0]) != "reply" {
|
|
t.Fatalf("reply packets = %#v", replies)
|
|
}
|
|
}
|
|
|
|
func TestFabricFlowSchedulerKeepsReverseFiveTupleTogether(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
forward := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
reverse := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000)
|
|
|
|
batches := scheduler.ScheduleClientPackets([][]byte{forward, reverse})
|
|
if len(batches) != 1 {
|
|
t.Fatalf("batches = %#v, want one logical flow channel", batches)
|
|
}
|
|
if len(batches[0].Packets) != 2 {
|
|
t.Fatalf("batch packets = %d", len(batches[0].Packets))
|
|
}
|
|
snapshot := scheduler.Snapshot()
|
|
if snapshot.Enqueued != 2 || snapshot.ChannelCount != 1 || snapshot.QueueDepths[batches[0].ChannelID] != 2 {
|
|
t.Fatalf("snapshot before complete = %+v", snapshot)
|
|
}
|
|
scheduler.Complete(batches[0])
|
|
snapshot = scheduler.Snapshot()
|
|
if snapshot.Dequeued != 2 || snapshot.QueueDepths[batches[0].ChannelID] != 0 {
|
|
t.Fatalf("snapshot after complete = %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricFlowSchedulerPrioritizesExplicitTrafficClass(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
bulk := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassBulk, [][]byte{packet})
|
|
interactive := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassInteractive, [][]byte{packet})
|
|
control := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassControl, [][]byte{packet})
|
|
combined := append(append(bulk, interactive...), control...)
|
|
|
|
scheduler.sortScheduledBatches(combined)
|
|
if len(combined) != 3 {
|
|
t.Fatalf("scheduled count = %d, want 3", len(combined))
|
|
}
|
|
if combined[0].TrafficClass != FabricTrafficClassControl || combined[1].TrafficClass != FabricTrafficClassInteractive || combined[2].TrafficClass != FabricTrafficClassBulk {
|
|
t.Fatalf("scheduled classes = %s, %s, %s", combined[0].TrafficClass, combined[1].TrafficClass, combined[2].TrafficClass)
|
|
}
|
|
if !strings.Contains(combined[0].ChannelID, ":control:") || !strings.Contains(combined[1].ChannelID, ":interactive:") {
|
|
t.Fatalf("priority channel ids = %q %q", combined[0].ChannelID, combined[1].ChannelID)
|
|
}
|
|
snapshot := scheduler.Snapshot()
|
|
if snapshot.ChannelStats[combined[0].ChannelID].TrafficClass != FabricTrafficClassControl {
|
|
t.Fatalf("control stat = %+v", snapshot.ChannelStats[combined[0].ChannelID])
|
|
}
|
|
if snapshot.ChannelStats[combined[2].ChannelID].TrafficClass != FabricTrafficClassBulk {
|
|
t.Fatalf("bulk stat = %+v", snapshot.ChannelStats[combined[2].ChannelID])
|
|
}
|
|
if snapshot.TrafficClassCounts[FabricTrafficClassControl] != 1 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 || snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 1 {
|
|
t.Fatalf("traffic class counts = %+v", snapshot.TrafficClassCounts)
|
|
}
|
|
}
|
|
|
|
func TestFabricFlowSchedulerDropsWhenChannelQueueIsFull(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(1, 1)
|
|
packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
packetB := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
|
|
batches, dropped := scheduler.scheduleClientPackets("", "", [][]byte{packetA, packetB})
|
|
if len(batches) != 1 || len(batches[0].Packets) != 1 {
|
|
t.Fatalf("batches = %#v, want one accepted packet", batches)
|
|
}
|
|
if dropped != 1 {
|
|
t.Fatalf("dropped = %d, want per-call drop count 1", dropped)
|
|
}
|
|
snapshot := scheduler.Snapshot()
|
|
if snapshot.Dropped != 1 || !snapshot.BackpressureActive {
|
|
t.Fatalf("snapshot = %+v, want one dropped packet and active backpressure", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricFlowSchedulerRoundsSubMillisecondSendDuration(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
channelID := "flow-01"
|
|
|
|
scheduler.RecordRouteSuccess(channelID, "route-fast", "node-b", 200*time.Microsecond)
|
|
snapshot := scheduler.Snapshot()
|
|
stat := snapshot.ChannelStats[channelID]
|
|
if stat.LastSendDurationMillis != 1 {
|
|
t.Fatalf("success last send duration = %d, want 1ms minimum for positive samples", stat.LastSendDurationMillis)
|
|
}
|
|
|
|
scheduler.RecordRouteFailure(channelID, "route-fast", "node-b", mesh.ErrForwardPeerUnavailable, 300*time.Microsecond)
|
|
snapshot = scheduler.Snapshot()
|
|
stat = snapshot.ChannelStats[channelID]
|
|
if stat.LastSendDurationMillis != 1 {
|
|
t.Fatalf("failure last send duration = %d, want 1ms minimum for positive samples", stat.LastSendDurationMillis)
|
|
}
|
|
}
|
|
|
|
func TestAdaptivePacketTransportReturnsRepliesToLastReceiveSide(t *testing.T) {
|
|
primary := &memoryPacketTransport{}
|
|
fallback := &memoryPacketTransport{recv: [][]byte{[]byte("request")}}
|
|
transport := &AdaptivePacketTransport{
|
|
Primary: primary,
|
|
Fallback: fallback,
|
|
PrimaryTimeout: time.Millisecond,
|
|
}
|
|
packets, err := transport.ReceiveGatewayPacketBatch(context.Background(), time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive adaptive packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "request" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil {
|
|
t.Fatalf("send adaptive reply: %v", err)
|
|
}
|
|
if len(primary.sent) != 0 {
|
|
t.Fatalf("primary unexpectedly received reply: %#v", primary.sent)
|
|
}
|
|
if len(fallback.sent) != 1 || string(fallback.sent[0]) != "reply" {
|
|
t.Fatalf("fallback reply = %#v", fallback.sent)
|
|
}
|
|
}
|
|
|
|
func TestAdaptivePacketTransportFallsBackWhenPreferredSendFails(t *testing.T) {
|
|
sendErr := errors.New("send failed")
|
|
primary := &memoryPacketTransport{sendErr: sendErr}
|
|
fallback := &memoryPacketTransport{}
|
|
transport := &AdaptivePacketTransport{Primary: primary, Fallback: fallback}
|
|
if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{[]byte("reply")}); err != nil {
|
|
t.Fatalf("send adaptive reply: %v", err)
|
|
}
|
|
if len(fallback.sent) != 1 || string(fallback.sent[0]) != "reply" {
|
|
t.Fatalf("fallback sent = %#v", fallback.sent)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressSendsClientPacketsOverSelectedRoute(t *testing.T) {
|
|
capture := &captureProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: capture,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-entry-exit",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if capture.nextNodeID != "relay-1" {
|
|
t.Fatalf("next node = %q", capture.nextNodeID)
|
|
}
|
|
if capture.envelope.CurrentHopNodeID != "relay-1" || capture.envelope.NextHopNodeID != "exit-1" {
|
|
t.Fatalf("envelope hop = %s -> %s, want relay-1 -> exit-1", capture.envelope.CurrentHopNodeID, capture.envelope.NextHopNodeID)
|
|
}
|
|
payload, err := mesh.DecodeProductionVPNPacketBatch(capture.envelope)
|
|
if err != nil {
|
|
t.Fatalf("decode envelope payload: %v", err)
|
|
}
|
|
if payload.Direction != FabricDirectionClientToGateway || payload.VPNConnectionID != "vpn-1" {
|
|
t.Fatalf("payload = %+v", payload)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressUsesLeasePreferredRouteBeforeConfigOrder(t *testing.T) {
|
|
capture := &captureProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: capture,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-alternate-exit",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-b",
|
|
Hops: []string{"entry-1", "exit-b"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-primary-exit",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-a",
|
|
Hops: []string{"entry-1", "exit-a"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.PreferClientRoute("route-primary-exit")
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if capture.envelope.RouteID != "route-primary-exit" || capture.nextNodeID != "exit-a" {
|
|
t.Fatalf("selected route = %s next=%s, want route-primary-exit via exit-a", capture.envelope.RouteID, capture.nextNodeID)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressTriesAlternateRouteBeforeBackendFallback(t *testing.T) {
|
|
transport := &failoverProductionTransport{failNextHop: "relay-bad"}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-bad",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-bad", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-good",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-good", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if len(transport.calls) != 2 || transport.calls[0] != "relay-bad" || transport.calls[1] != "relay-good" {
|
|
t.Fatalf("route attempts = %#v, want relay-bad then relay-good", transport.calls)
|
|
}
|
|
if transport.envelope.RouteID != "route-good" || transport.envelope.CurrentHopNodeID != "relay-good" {
|
|
t.Fatalf("selected envelope = %+v", transport.envelope)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.SendRouteAttempts != 2 || snapshot.SendRouteFailures != 1 || snapshot.LastSelectedRouteID != "route-good" {
|
|
t.Fatalf("snapshot = %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressDoesNotFailOverPreferredRouteToDifferentDestination(t *testing.T) {
|
|
transport := &failoverProductionTransport{failNextHop: "relay-home"}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-other",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "ifcm-1",
|
|
Hops: []string{"entry-1", "relay-ifcm", "ifcm-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-home",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "home-1",
|
|
Hops: []string{"entry-1", "relay-home", "home-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.PreferClientRoute("route-home")
|
|
|
|
err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")})
|
|
if err == nil {
|
|
t.Fatal("send client packet batch succeeded after preferred route failure; want failure without cross-destination fallback")
|
|
}
|
|
if len(transport.calls) != 1 || transport.calls[0] != "relay-home" {
|
|
t.Fatalf("route attempts = %#v, want only relay-home", transport.calls)
|
|
}
|
|
if transport.envelope.RouteID == "route-other" {
|
|
t.Fatalf("cross-destination route was used: %+v", transport.envelope)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressAvoidsChannelFailedRouteOnNextSend(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
scheduler := NewFabricFlowScheduler(8, 16)
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
_, shard := classifyPacketFlow(packet, scheduler.shardCountValue())
|
|
channelID := fabricFlowChannelID("vpn-1", shard)
|
|
scheduler.RecordRouteFailure(channelID, "route-bad", "relay-bad", mesh.ErrForwardPeerUnavailable, time.Millisecond)
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-bad",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-bad", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-good",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-good", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if len(transport.calls) != 1 || transport.calls[0] != "relay-good" {
|
|
t.Fatalf("route calls = %#v, want relay-good first without retrying stale route", transport.calls)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
stat := snapshot.FlowScheduler.ChannelStats[channelID]
|
|
if stat.LastRouteID != "route-good" || stat.ConsecutiveFailures != 0 || stat.RouteRebuildRecommended {
|
|
t.Fatalf("channel stat = %+v", stat)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressWithdrawsRouteAfterRebuildDecision(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-bad",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-bad", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-good",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-good", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{
|
|
RouteID: "route-bad",
|
|
ReplacementRouteID: "route-good",
|
|
RebuildRequestID: "route-bad-rebuild",
|
|
RebuildStatus: "applied",
|
|
RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate",
|
|
RebuildAttempt: 3,
|
|
DecisionSource: "service_channel_feedback_replacement",
|
|
Generation: "config-v2",
|
|
}}, "config-v2", time.Now().UTC())
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if len(transport.calls) != 1 || transport.calls[0] != "relay-good" {
|
|
t.Fatalf("route calls = %#v, want only rebuild replacement route", transport.calls)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.RouteCandidateCount != 1 ||
|
|
snapshot.RouteManager.RebuildRequestCount != 1 ||
|
|
snapshot.RouteManager.RebuildAppliedCount != 1 ||
|
|
snapshot.RouteManager.WithdrawnRouteCount != 1 {
|
|
t.Fatalf("snapshot route manager = %+v", snapshot.RouteManager)
|
|
}
|
|
if snapshot.LastSelectedRouteID != "route-good" {
|
|
t.Fatalf("selected route = %q, want route-good", snapshot.LastSelectedRouteID)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressPrefersControlPlaneReplacementOverConfigOrder(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-bad",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-slow", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-standby",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-standby", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-fast",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-fast", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send initial packet batch: %v", err)
|
|
}
|
|
if ingress.Snapshot("cluster-1").LastSelectedRouteID != "route-bad" {
|
|
t.Fatalf("initial selected route = %q, want route-bad", ingress.Snapshot("cluster-1").LastSelectedRouteID)
|
|
}
|
|
|
|
ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{
|
|
RouteID: "route-bad",
|
|
ReplacementRouteID: "route-fast",
|
|
RebuildRequestID: "route-bad-rebuild",
|
|
RebuildStatus: "applied",
|
|
RebuildReason: "service_channel_feedback_rebuild_applied_to_fastest_pool_route",
|
|
DecisionSource: "service_channel_feedback_replacement",
|
|
Generation: "config-v2",
|
|
}}, "config-v2", time.Now().UTC())
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send packet batch after rebuild: %v", err)
|
|
}
|
|
if got := transport.envelopes[len(transport.envelopes)-1].RouteID; got != "route-fast" {
|
|
t.Fatalf("post-rebuild selected route = %q, want Control Plane replacement route-fast; calls=%v", got, transport.calls)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.LastSelectedRouteID != "route-fast" || snapshot.RouteCandidateCount != 2 {
|
|
t.Fatalf("post-rebuild snapshot = %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressRestoresWithdrawnRouteAfterFreshConfig(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-bad",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-bad", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-good",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-good", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send initial packet batch: %v", err)
|
|
}
|
|
if ingress.Snapshot("cluster-1").LastSelectedRouteID != "route-bad" {
|
|
t.Fatalf("initial selected route = %q, want route-bad", ingress.Snapshot("cluster-1").LastSelectedRouteID)
|
|
}
|
|
|
|
ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{
|
|
RouteID: "route-bad",
|
|
ReplacementRouteID: "route-good",
|
|
RebuildRequestID: "route-bad-rebuild",
|
|
RebuildStatus: "applied",
|
|
RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate",
|
|
RebuildAttempt: 4,
|
|
DecisionSource: "service_channel_feedback_replacement",
|
|
Generation: "config-v2",
|
|
}}, "config-v2", time.Now().UTC())
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.RouteManagerTransition.Status != "applied_rebuild" ||
|
|
snapshot.RouteManagerTransition.ClearedSelectedRouteID != "route-bad" ||
|
|
snapshot.RouteManagerTransition.RebuildAppliedCount != 1 {
|
|
t.Fatalf("apply transition = %+v", snapshot.RouteManagerTransition)
|
|
}
|
|
if snapshot.RouteCandidateCount != 1 {
|
|
t.Fatalf("route candidate count after rebuild = %d, want 1", snapshot.RouteCandidateCount)
|
|
}
|
|
|
|
ingress.UpdateRouteManager(nil, "config-v3", time.Now().UTC())
|
|
snapshot = ingress.Snapshot("cluster-1")
|
|
if snapshot.RouteManagerTransition.Status != "restored_by_new_config" ||
|
|
snapshot.RouteManagerTransition.RestoredRouteCount != 1 ||
|
|
snapshot.RouteManager.WithdrawnRouteCount != 0 ||
|
|
snapshot.RouteCandidateCount != 2 {
|
|
t.Fatalf("restore transition/snapshot = %+v / %+v", snapshot.RouteManagerTransition, snapshot.RouteManager)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressPendingDegradedFallbackWithdrawsRouteWithoutAlternate(t *testing.T) {
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: &captureManyProductionTransport{},
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-bad",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-bad", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{
|
|
RouteID: "route-bad",
|
|
RebuildRequestID: "route-bad-rebuild",
|
|
RebuildStatus: "pending_degraded_fallback",
|
|
RebuildReason: "service_channel_feedback_rebuild_pending_backend_relay",
|
|
RebuildAttempt: 2,
|
|
DecisionSource: "service_channel_feedback_no_alternate",
|
|
Generation: "config-v2",
|
|
}}, "config-v2", time.Now().UTC())
|
|
|
|
err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")})
|
|
if !errors.Is(err, mesh.ErrRouteNotFound) {
|
|
t.Fatalf("send error = %v, want route not found after pending degraded withdrawal", err)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.RouteCandidateCount != 0 ||
|
|
snapshot.RouteManager.PendingFallbackCount != 1 ||
|
|
snapshot.RouteManager.WithdrawnRouteCount != 1 ||
|
|
snapshot.RouteManagerTransition.Status != "pending_degraded_fallback" {
|
|
t.Fatalf("pending fallback snapshot = %+v transition=%+v", snapshot.RouteManager, snapshot.RouteManagerTransition)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressKeepsLastRouteWhenWithdrawalPreventionEnabled(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
PreventLastRouteWithdrawal: true,
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-only",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{
|
|
RouteID: "route-only",
|
|
RebuildStatus: "pending_degraded_fallback",
|
|
DecisionSource: "service_channel_feedback_no_alternate",
|
|
}}, "config-v2", time.Now().UTC())
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-only" {
|
|
t.Fatalf("envelopes = %+v, want preserved last route", transport.envelopes)
|
|
}
|
|
if snapshot := ingress.Snapshot("cluster-1"); snapshot.RouteCandidateCount != 1 {
|
|
t.Fatalf("route candidate count = %d, want last withdrawn route preserved", snapshot.RouteCandidateCount)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressMarksChannelForRebuildAfterRepeatedRouteFailures(t *testing.T) {
|
|
transport := &failoverProductionTransport{failNextHop: "relay-bad"}
|
|
scheduler := NewFabricFlowScheduler(8, 16)
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
_, shard := classifyPacketFlow(packet, scheduler.shardCountValue())
|
|
channelID := fabricFlowChannelID("vpn-1", shard)
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-bad-1",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-bad", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-bad-2",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-bad", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err == nil {
|
|
t.Fatalf("send unexpectedly succeeded")
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
stat := snapshot.FlowScheduler.ChannelStats[channelID]
|
|
if !stat.RouteRebuildRecommended || !stat.DegradedFallbackRecommended || stat.ConsecutiveFailures < 2 {
|
|
t.Fatalf("channel stat = %+v, want rebuild and degraded fallback recommendation", stat)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressSplitsIndependentFlowsIntoLogicalChannels(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
scheduler := NewFabricFlowScheduler(8, 16)
|
|
packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
packetB := packetWithDifferentShard(packetA, 8)
|
|
expectedChannels := expectedScheduledChannelCount(scheduler, [][]byte{packetA, packetB})
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-entry-exit",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if len(transport.envelopes) != expectedChannels {
|
|
t.Fatalf("envelopes = %d, want %d", len(transport.envelopes), expectedChannels)
|
|
}
|
|
for _, envelope := range transport.envelopes {
|
|
payload, err := mesh.DecodeProductionVPNPacketBatch(envelope)
|
|
if err != nil {
|
|
t.Fatalf("decode envelope: %v", err)
|
|
}
|
|
if len(payload.Packets) == 0 {
|
|
t.Fatalf("empty logical channel envelope")
|
|
}
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.SendFlowBatches != uint64(expectedChannels) || snapshot.FlowScheduler.Enqueued != 2 || snapshot.FlowScheduler.Dequeued != 2 {
|
|
t.Fatalf("snapshot = %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressIsolatesRouteFailoverPerLogicalChannel(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
scheduler := NewFabricFlowScheduler(8, 16)
|
|
packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
packetB := packetWithDifferentShard(packetA, 8)
|
|
_, shardA := classifyPacketFlow(packetA, scheduler.shardCountValue())
|
|
_, shardB := classifyPacketFlow(packetB, scheduler.shardCountValue())
|
|
channelA := fabricFlowChannelID("vpn-1", shardA)
|
|
channelB := fabricFlowChannelID("vpn-1", shardB)
|
|
if channelA == channelB {
|
|
t.Fatalf("test packets mapped to same channel %s", channelA)
|
|
}
|
|
scheduler.RecordRouteFailure(channelA, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, time.Millisecond)
|
|
scheduler.RecordRouteSuccess(channelB, "route-primary", "relay-primary", time.Millisecond)
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-primary", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-alternate",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-alternate", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
if len(transport.envelopes) != 2 {
|
|
t.Fatalf("envelopes = %d, want 2 independent logical channel sends", len(transport.envelopes))
|
|
}
|
|
routeByPacket := map[string]string{}
|
|
for _, envelope := range transport.envelopes {
|
|
payload, err := mesh.DecodeProductionVPNPacketBatch(envelope)
|
|
if err != nil {
|
|
t.Fatalf("decode envelope: %v", err)
|
|
}
|
|
if len(payload.Packets) != 1 {
|
|
t.Fatalf("payload packets = %d, want one packet per logical channel", len(payload.Packets))
|
|
}
|
|
routeByPacket[string(payload.Packets[0])] = envelope.RouteID
|
|
}
|
|
if routeByPacket[string(packetA)] != "route-alternate" {
|
|
t.Fatalf("channel A route = %q, want alternate after isolated failure", routeByPacket[string(packetA)])
|
|
}
|
|
if routeByPacket[string(packetB)] != "route-primary" {
|
|
t.Fatalf("channel B route = %q, want primary route memory preserved", routeByPacket[string(packetB)])
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
statA := snapshot.FlowScheduler.ChannelStats[channelA]
|
|
statB := snapshot.FlowScheduler.ChannelStats[channelB]
|
|
if statA.LastRouteID != "route-alternate" || statA.LastFailedRouteID != "" || statA.ConsecutiveFailures != 0 {
|
|
t.Fatalf("channel A stat = %+v, want recovered on alternate route", statA)
|
|
}
|
|
if statA.LastRecoveredFromRouteID != "route-primary" ||
|
|
statA.LastRecoveredNextHop != "relay-primary" ||
|
|
statA.RouteSwitchCount != 1 ||
|
|
statA.LastRouteSwitchAt == "" ||
|
|
snapshot.FlowScheduler.RouteRecoveredChannelCount != 1 ||
|
|
snapshot.FlowScheduler.RouteSwitchCount != 1 {
|
|
t.Fatalf("route recovery telemetry = stat:%+v scheduler:%+v", statA, snapshot.FlowScheduler)
|
|
}
|
|
if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" || statB.ConsecutiveFailures != 0 {
|
|
t.Fatalf("channel B stat = %+v, want primary route memory preserved", statB)
|
|
}
|
|
if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 || snapshot.FlowScheduler.HighWatermark > 1 {
|
|
t.Fatalf("bounded scheduler telemetry = %+v send_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressIsolatesRouteMemoryPerVPNConnection(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
scheduler := NewFabricFlowScheduler(8, 16)
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
channelA := testFlowChannelID("vpn-a", packet, scheduler.shardCountValue())
|
|
channelB := testFlowChannelID("vpn-b", packet, scheduler.shardCountValue())
|
|
if channelA == channelB {
|
|
t.Fatalf("session-scoped channels collapsed to %s", channelA)
|
|
}
|
|
scheduler.RecordRouteFailure(channelA, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, time.Millisecond)
|
|
scheduler.RecordRouteSuccess(channelB, "route-primary", "relay-primary", time.Millisecond)
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-primary", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-alternate",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-alternate", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-a", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send vpn-a packet: %v", err)
|
|
}
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-b", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send vpn-b packet: %v", err)
|
|
}
|
|
if len(transport.envelopes) != 2 {
|
|
t.Fatalf("envelopes = %d, want one send per vpn session", len(transport.envelopes))
|
|
}
|
|
if transport.envelopes[0].RouteID != "route-alternate" {
|
|
t.Fatalf("vpn-a route = %s, want alternate after session-local failure", transport.envelopes[0].RouteID)
|
|
}
|
|
if transport.envelopes[1].RouteID != "route-primary" {
|
|
t.Fatalf("vpn-b route = %s, want primary preserved from separate session memory", transport.envelopes[1].RouteID)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
statA := snapshot.FlowScheduler.ChannelStats[channelA]
|
|
statB := snapshot.FlowScheduler.ChannelStats[channelB]
|
|
if statA.LastRouteID != "route-alternate" || statA.LastFailedRouteID != "" {
|
|
t.Fatalf("vpn-a stat = %+v, want recovered alternate route without leaking failure", statA)
|
|
}
|
|
if statB.LastRouteID != "route-primary" || statB.LastFailedRouteID != "" {
|
|
t.Fatalf("vpn-b stat = %+v, want primary route preserved", statB)
|
|
}
|
|
if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 {
|
|
t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressRouteSelectionUsesUpdatedRuntimeIdentity(t *testing.T) {
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-entry-1",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
ingress.UpdateRuntime(
|
|
transport,
|
|
NewFabricPacketInbox(8),
|
|
"cluster-1",
|
|
"entry-2",
|
|
nil,
|
|
func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-entry-2",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-2",
|
|
DestinationNodeID: "exit-2",
|
|
Hops: []string{"entry-2", "relay-2", "exit-2"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
"policy-updated",
|
|
)
|
|
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443)
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send after runtime update: %v", err)
|
|
}
|
|
if len(transport.envelopes) != 1 {
|
|
t.Fatalf("envelopes = %d, want one send", len(transport.envelopes))
|
|
}
|
|
envelope := transport.envelopes[0]
|
|
if envelope.RouteID != "route-entry-2" || envelope.SourceNodeID != "entry-2" || transport.calls[0] != "relay-2" {
|
|
t.Fatalf("envelope route/source/next-hop = %s/%s/%s, want updated entry-2 route", envelope.RouteID, envelope.SourceNodeID, transport.calls[0])
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressParallelFlowWindowDoesNotBlockIndependentChannel(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 16)
|
|
slowPacket, fastPacket := packetsForOrderedDistinctChannels(scheduler.shardCountValue())
|
|
transport := &blockingProductionTransport{
|
|
slowPort: packetSourcePort(slowPacket),
|
|
slowStarted: make(chan struct{}),
|
|
releaseSlow: make(chan struct{}),
|
|
fastDone: make(chan struct{}),
|
|
}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
MaxParallelFlowSends: 2,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-primary", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{slowPacket, fastPacket})
|
|
}()
|
|
select {
|
|
case <-transport.slowStarted:
|
|
case err := <-done:
|
|
t.Fatalf("send completed before slow channel started: %v", err)
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("timed out waiting for slow channel to start")
|
|
}
|
|
select {
|
|
case <-transport.fastDone:
|
|
case err := <-done:
|
|
t.Fatalf("send completed before independent fast channel completed: %v", err)
|
|
case <-time.After(200 * time.Millisecond):
|
|
t.Fatalf("fast independent channel was blocked behind slow channel")
|
|
}
|
|
close(transport.releaseSlow)
|
|
if err := <-done; err != nil {
|
|
t.Fatalf("parallel send returned error: %v", err)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.SendFlowParallel != 1 || snapshot.MaxParallelFlowSends != 2 {
|
|
t.Fatalf("parallel telemetry = batches:%d max:%d", snapshot.SendFlowParallel, snapshot.MaxParallelFlowSends)
|
|
}
|
|
if snapshot.RecommendedParallelFlowSends != 2 || snapshot.FlowScheduler.MaxInFlight != 2 || snapshot.FlowScheduler.InFlight != 0 {
|
|
t.Fatalf("window/in-flight telemetry = recommended:%d in_flight:%d max_in_flight:%d", snapshot.RecommendedParallelFlowSends, snapshot.FlowScheduler.InFlight, snapshot.FlowScheduler.MaxInFlight)
|
|
}
|
|
if snapshot.SendFlowBatches != 2 || snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 {
|
|
t.Fatalf("flow telemetry = %+v send_flow_batches=%d send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowBatches, snapshot.SendFlowDropped)
|
|
}
|
|
for channelID, stat := range snapshot.FlowScheduler.ChannelStats {
|
|
if stat.SendAttempts != 1 || stat.SendSuccesses != 1 || stat.SendFailures != 0 || stat.MaxInFlight != 1 {
|
|
t.Fatalf("channel %s stat = %+v, want one successful in-flight send", channelID, stat)
|
|
}
|
|
if stat.LatencyLe10Millis+stat.LatencyLe100Millis+stat.LatencyLe1000Millis+stat.LatencyGt1000Millis != 1 {
|
|
t.Fatalf("channel %s latency buckets = %+v, want one sample", channelID, stat)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressInteractiveClassCompletesWhileBulkInFlight(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 16)
|
|
bulkPacket, interactivePacket := packetsForOrderedDistinctChannels(scheduler.shardCountValue())
|
|
transport := &blockingProductionTransport{
|
|
slowPort: packetSourcePort(bulkPacket),
|
|
slowStarted: make(chan struct{}),
|
|
releaseSlow: make(chan struct{}),
|
|
fastDone: make(chan struct{}),
|
|
}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
MaxParallelFlowSends: 1,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-primary", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
|
|
bulkDone := make(chan error, 1)
|
|
go func() {
|
|
bulkDone <- ingress.SendClientPacketBatchWithTrafficClass(context.Background(), "cluster-1", "vpn-1", FabricTrafficClassBulk, [][]byte{bulkPacket})
|
|
}()
|
|
select {
|
|
case <-transport.slowStarted:
|
|
case err := <-bulkDone:
|
|
t.Fatalf("bulk send completed before blocking: %v", err)
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timed out waiting for bulk send to block")
|
|
}
|
|
|
|
interactiveDone := make(chan error, 1)
|
|
go func() {
|
|
interactiveDone <- ingress.SendClientPacketBatchWithTrafficClass(context.Background(), "cluster-1", "vpn-1", FabricTrafficClassInteractive, [][]byte{interactivePacket})
|
|
}()
|
|
select {
|
|
case err := <-interactiveDone:
|
|
if err != nil {
|
|
t.Fatalf("interactive send returned error while bulk in flight: %v", err)
|
|
}
|
|
case <-time.After(200 * time.Millisecond):
|
|
t.Fatal("interactive traffic-class send was blocked behind in-flight bulk")
|
|
}
|
|
select {
|
|
case <-transport.fastDone:
|
|
default:
|
|
t.Fatal("interactive transport send did not complete")
|
|
}
|
|
close(transport.releaseSlow)
|
|
if err := <-bulkDone; err != nil {
|
|
t.Fatalf("bulk send returned error after release: %v", err)
|
|
}
|
|
|
|
bulkChannel := testFlowChannelID("vpn-1", bulkPacket, scheduler.shardCountValue())
|
|
interactiveChannel := fabricFlowChannelIDForClass("vpn-1", FabricTrafficClassInteractive, packetShard(interactivePacket, scheduler.shardCountValue()))
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.FlowScheduler.MaxInFlight < 2 {
|
|
t.Fatalf("max in-flight = %d, want concurrent bulk+interactive", snapshot.FlowScheduler.MaxInFlight)
|
|
}
|
|
bulkStat := snapshot.FlowScheduler.ChannelStats[bulkChannel]
|
|
interactiveStat := snapshot.FlowScheduler.ChannelStats[interactiveChannel]
|
|
if bulkStat.TrafficClass != FabricTrafficClassBulk || bulkStat.SendSuccesses != 1 || bulkStat.SendFailures != 0 {
|
|
t.Fatalf("bulk stat = %+v", bulkStat)
|
|
}
|
|
if interactiveStat.TrafficClass != FabricTrafficClassInteractive || interactiveStat.SendSuccesses != 1 || interactiveStat.SendFailures != 0 {
|
|
t.Fatalf("interactive stat = %+v", interactiveStat)
|
|
}
|
|
if snapshot.FlowScheduler.TrafficClassCounts[FabricTrafficClassBulk] != 1 || snapshot.FlowScheduler.TrafficClassCounts[FabricTrafficClassInteractive] != 1 {
|
|
t.Fatalf("traffic class counts = %+v", snapshot.FlowScheduler.TrafficClassCounts)
|
|
}
|
|
if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 {
|
|
t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped)
|
|
}
|
|
}
|
|
|
|
func TestFabricFlowSchedulerRecommendsSmallerWindowUnderPressure(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 1)
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
if got := scheduler.RecommendedParallelSendWindow(4); got != 4 {
|
|
t.Fatalf("clean recommended window = %d, want 4", got)
|
|
}
|
|
scheduled := scheduler.ScheduleClientPacketsForConnection("vpn-1", [][]byte{packet, packetWithSameShard(packet, scheduler.shardCountValue())})
|
|
if len(scheduled) != 1 {
|
|
t.Fatalf("scheduled = %d, want one accepted channel after bounded drop", len(scheduled))
|
|
}
|
|
if got := scheduler.RecommendedParallelSendWindow(4); got != 1 {
|
|
t.Fatalf("drop-pressure recommended window = %d, want 1", got)
|
|
}
|
|
channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue())
|
|
scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 2500*time.Millisecond)
|
|
snapshot := scheduler.Snapshot()
|
|
stat := snapshot.ChannelStats[channelID]
|
|
if snapshot.FailingChannelCount != 1 || snapshot.SlowChannelCount != 1 {
|
|
t.Fatalf("pressure counts = failing:%d slow:%d stat=%+v", snapshot.FailingChannelCount, snapshot.SlowChannelCount, stat)
|
|
}
|
|
if stat.SendFailures != 1 || stat.LatencyGt1000Millis != 1 || !stat.RouteRebuildRecommended {
|
|
t.Fatalf("failure/latency stat = %+v, want retry-window telemetry", stat)
|
|
}
|
|
if stat.QualityWindowDropCount != 1 || stat.QualityWindowFailureCount != 1 || stat.QualityWindowSlowCount != 1 {
|
|
t.Fatalf("rolling quality stat = %+v, want fresh drop+failure+slow sample", stat)
|
|
}
|
|
}
|
|
|
|
func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(32, 16)
|
|
bulkPackets := packetsForDistinctShards(16, scheduler.shardCountValue())
|
|
if len(bulkPackets) != 16 {
|
|
t.Fatalf("bulk packet count = %d, want 16 distinct shards", len(bulkPackets))
|
|
}
|
|
interactivePacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 61000, 3389)
|
|
if scheduled := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassBulk, bulkPackets); len(scheduled) != 16 {
|
|
t.Fatalf("bulk scheduled = %d, want 16", len(scheduled))
|
|
}
|
|
if scheduled := scheduler.ScheduleClientPacketsForConnectionClass("vpn-1", FabricTrafficClassInteractive, [][]byte{interactivePacket}); len(scheduled) != 1 {
|
|
t.Fatalf("interactive scheduled = %d, want 1", len(scheduled))
|
|
}
|
|
if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassBulk, 4); got != 1 {
|
|
t.Fatalf("bulk adaptive window = %d, want 1", got)
|
|
}
|
|
if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassInteractive, 8); got != 8 {
|
|
t.Fatalf("interactive adaptive window = %d, want 8", got)
|
|
}
|
|
snapshot := scheduler.Snapshot()
|
|
if !snapshot.AdaptiveBackpressureActive || snapshot.AdaptiveBackpressureReason != "bulk_window_reduced_to_protect_interactive" {
|
|
t.Fatalf("adaptive snapshot = %+v", snapshot)
|
|
}
|
|
if snapshot.RecommendedParallelWindows[FabricTrafficClassBulk] != 1 || snapshot.RecommendedParallelWindows[FabricTrafficClassInteractive] != 8 {
|
|
t.Fatalf("recommended class windows = %+v", snapshot.RecommendedParallelWindows)
|
|
}
|
|
if snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 16 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 {
|
|
t.Fatalf("traffic class counts = %+v", snapshot.TrafficClassCounts)
|
|
}
|
|
if !snapshot.BulkPressureActive || snapshot.BulkPressureChannelCount != 16 || snapshot.InteractiveOrControlCount != 1 || !snapshot.BackpressureActive {
|
|
t.Fatalf("bulk pressure telemetry = %+v", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricFlowSchedulerRollingQualityWindowForgetsOldPressure(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
channelID := fabricFlowChannelID("vpn-1", 0)
|
|
|
|
scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 1500*time.Millisecond)
|
|
scheduler.RecordRouteFailure(channelID, "route-primary", "relay-primary", mesh.ErrForwardPeerUnavailable, 1500*time.Millisecond)
|
|
if got := scheduler.RecommendedParallelSendWindow(4); got >= 4 {
|
|
t.Fatalf("pressure recommended window = %d, want reduced", got)
|
|
}
|
|
|
|
for i := 0; i < defaultFabricFlowQualityWindowCapacity; i++ {
|
|
scheduler.RecordRouteSuccess(channelID, "route-primary", "relay-primary", time.Millisecond)
|
|
}
|
|
if got := scheduler.RecommendedParallelSendWindow(4); got != 4 {
|
|
t.Fatalf("fresh-success recommended window = %d, want 4", got)
|
|
}
|
|
snapshot := scheduler.Snapshot()
|
|
stat := snapshot.ChannelStats[channelID]
|
|
if stat.SendFailures != 2 || stat.SendSuccesses != defaultFabricFlowQualityWindowCapacity {
|
|
t.Fatalf("lifetime counters = failures:%d successes:%d", stat.SendFailures, stat.SendSuccesses)
|
|
}
|
|
if stat.QualityWindowSampleCount != defaultFabricFlowQualityWindowCapacity || stat.QualityWindowFailureCount != 0 || stat.QualityWindowSuccessCount != defaultFabricFlowQualityWindowCapacity {
|
|
t.Fatalf("rolling quality stat = %+v, want old failures rolled out", stat)
|
|
}
|
|
if snapshot.FailingChannelCount != 0 || snapshot.SlowChannelCount != 0 || snapshot.BackpressureActive {
|
|
t.Fatalf("rolling pressure snapshot = %+v, want clean fresh window", snapshot)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressReportsBoundedBackpressurePerLogicalChannel(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 1)
|
|
packetA := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
packetB := packetWithSameShard(packetA, 8)
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(4),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-primary", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packetA, packetB}); err != nil {
|
|
t.Fatalf("send client packet batch: %v", err)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.FlowScheduler.QueueCapacity != 1 || snapshot.FlowScheduler.Dropped != 1 || snapshot.SendFlowDropped != 1 {
|
|
t.Fatalf("drop telemetry = %+v send_dropped=%d, want one bounded drop", snapshot.FlowScheduler, snapshot.SendFlowDropped)
|
|
}
|
|
if snapshot.FlowScheduler.HighWatermark != 1 || len(transport.envelopes) != 1 {
|
|
t.Fatalf("high watermark/envelopes = %d/%d, want bounded single queued send", snapshot.FlowScheduler.HighWatermark, len(transport.envelopes))
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressBoundedLoadRebuildsAwayFromWithdrawnRoute(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(16, 8)
|
|
packets := packetsForDistinctShards(12, scheduler.shardCountValue())
|
|
if len(packets) != 12 {
|
|
t.Fatalf("distinct packet count = %d", len(packets))
|
|
}
|
|
channels := map[string]struct{}{}
|
|
for _, packet := range packets {
|
|
_, shard := classifyPacketFlow(packet, scheduler.shardCountValue())
|
|
channelID := fabricFlowChannelID("vpn-1", shard)
|
|
channels[channelID] = struct{}{}
|
|
scheduler.RecordRouteSuccess(channelID, "route-primary", "relay-primary", time.Millisecond)
|
|
}
|
|
transport := &routeFailingProductionTransport{failRouteID: "route-primary"}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(32),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-primary", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-alternate",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-alternate", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil {
|
|
t.Fatalf("send first load batch: %v", err)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.SendFlowBatches != uint64(len(channels)) || snapshot.FlowScheduler.Dropped != 0 || snapshot.FlowScheduler.HighWatermark != 1 {
|
|
t.Fatalf("first load scheduler snapshot = %+v", snapshot.FlowScheduler)
|
|
}
|
|
if snapshot.SendRouteFailures != uint64(len(channels)) {
|
|
t.Fatalf("route failures = %d, want %d", snapshot.SendRouteFailures, len(channels))
|
|
}
|
|
if transport.callsByRoute["route-primary"] != len(channels) || transport.callsByRoute["route-alternate"] != len(channels) {
|
|
t.Fatalf("route calls = %+v, want primary and alternate per channel", transport.callsByRoute)
|
|
}
|
|
for channelID, stat := range snapshot.FlowScheduler.ChannelStats {
|
|
if _, ok := channels[channelID]; !ok {
|
|
continue
|
|
}
|
|
if stat.LastRouteID != "route-alternate" || stat.LastFailedRouteID != "" || stat.ConsecutiveFailures != 0 {
|
|
t.Fatalf("channel %s stat after first load = %+v", channelID, stat)
|
|
}
|
|
}
|
|
|
|
ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{
|
|
RouteID: "route-primary",
|
|
ReplacementRouteID: "route-alternate",
|
|
RebuildRequestID: "rebuild-load-1",
|
|
RebuildStatus: "applied",
|
|
RebuildReason: "service_channel_feedback_rebuild_applied_to_alternate",
|
|
DecisionSource: "service_channel_feedback_replacement",
|
|
Generation: "c18z-load",
|
|
EffectiveHops: []string{"entry-1", "relay-alternate", "exit-1"},
|
|
}}, "c18z-load", time.Now().UTC())
|
|
|
|
primaryCallsBefore := transport.callsByRoute["route-primary"]
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil {
|
|
t.Fatalf("send second load batch: %v", err)
|
|
}
|
|
snapshot = ingress.Snapshot("cluster-1")
|
|
if transport.callsByRoute["route-primary"] != primaryCallsBefore {
|
|
t.Fatalf("primary route was retried after withdrawal: before=%d after=%d calls=%+v", primaryCallsBefore, transport.callsByRoute["route-primary"], transport.callsByRoute)
|
|
}
|
|
if snapshot.RouteCandidateCount != 1 || snapshot.RouteManager.WithdrawnRouteCount != 1 || snapshot.RouteManagerTransition.Status != "applied_rebuild" {
|
|
t.Fatalf("route manager snapshot = manager:%+v transition:%+v candidates=%d", snapshot.RouteManager, snapshot.RouteManagerTransition, snapshot.RouteCandidateCount)
|
|
}
|
|
if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 || snapshot.FlowScheduler.HighWatermark > 1 {
|
|
t.Fatalf("second load scheduler snapshot = %+v send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressQualityPreferenceOverridesStickyRoute(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 3389)
|
|
_, shard := classifyPacketFlow(packet, scheduler.shardCountValue())
|
|
channelID := fabricFlowChannelID("vpn-1", shard)
|
|
scheduler.RecordRouteSuccess(channelID, "route-slow", "relay-slow", time.Millisecond)
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-slow",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-slow", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-fast",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{
|
|
RouteID: "route-fast",
|
|
FeedbackStatus: "healthy",
|
|
ScoreAdjustment: 90,
|
|
Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"},
|
|
LastSendDurationMs: 1,
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano),
|
|
}}, time.Now().UTC())
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send packet: %v", err)
|
|
}
|
|
if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-fast" {
|
|
t.Fatalf("route = %+v, want quality-preferred fast route over sticky slow route", transport.envelopes)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.RouteQualityPreferenceCount != 1 {
|
|
t.Fatalf("quality preference count = %d, want 1", snapshot.RouteQualityPreferenceCount)
|
|
}
|
|
stat := snapshot.FlowScheduler.ChannelStats[channelID]
|
|
if stat.LastRouteID != "route-fast" || stat.LastFailedRouteID != "" {
|
|
t.Fatalf("flow stat = %+v, want moved to fast healthy route", stat)
|
|
}
|
|
if stat.QualityPreferenceRouteID != "route-fast" || stat.QualityPreferenceScore != 90 || !containsString(stat.QualityPreferenceReasons, "service_channel_quality_latency_le_10ms") {
|
|
t.Fatalf("quality preference stat = %+v, want applied fast route preference", stat)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressQualityPreferenceUsesEffectiveScore(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
packet := testIPv4TCPPacket([4]byte{10, 78, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389)
|
|
_, shard := classifyPacketFlow(packet, scheduler.shardCountValue())
|
|
channelID := fabricFlowChannelID("vpn-1", shard)
|
|
scheduler.RecordRouteSuccess(channelID, "route-sticky", "relay-sticky", time.Millisecond)
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-sticky",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-sticky", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-decayed-fast",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{
|
|
RouteID: "route-decayed-fast",
|
|
FeedbackStatus: "healthy",
|
|
ScoreAdjustment: 20,
|
|
RawScoreAdjustment: 90,
|
|
Reasons: []string{"service_channel_recent_success", "service_channel_feedback_age_decay"},
|
|
LastSendDurationMs: 1,
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano),
|
|
}}, time.Now().UTC())
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send packet: %v", err)
|
|
}
|
|
if len(transport.envelopes) != 1 || transport.envelopes[0].RouteID != "route-sticky" {
|
|
t.Fatalf("route = %+v, want decayed quality score to preserve sticky route", transport.envelopes)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if len(snapshot.RouteQualityPreferences) != 1 {
|
|
t.Fatalf("quality preferences = %+v, want one visible preference", snapshot.RouteQualityPreferences)
|
|
}
|
|
preference := snapshot.RouteQualityPreferences[0]
|
|
if preference.ScoreAdjustment != 20 || preference.RawScoreAdjustment != 90 || !containsString(preference.Reasons, "service_channel_feedback_age_decay") {
|
|
t.Fatalf("preference snapshot = %+v, want effective/raw score and decay reason", preference)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressQualityPreferencePreservesMultiChannelFairness(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
firstPacket := testIPv4TCPPacket([4]byte{10, 79, 0, 2}, [4]byte{192, 168, 200, 95}, 51002, 3389)
|
|
secondPacket := packetWithDifferentShard(firstPacket, scheduler.shardCountValue())
|
|
firstChannel := testFlowChannelID("vpn-1", firstPacket, scheduler.shardCountValue())
|
|
secondChannel := testFlowChannelID("vpn-1", secondPacket, scheduler.shardCountValue())
|
|
if firstChannel == secondChannel {
|
|
t.Fatalf("test packets unexpectedly map to same channel %s", firstChannel)
|
|
}
|
|
scheduler.RecordRouteSuccess(firstChannel, "route-slow", "relay-slow", time.Millisecond)
|
|
scheduler.RecordRouteSuccess(secondChannel, "route-slow", "relay-slow", time.Millisecond)
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(16),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-slow",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-slow", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-fast",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{
|
|
RouteID: "route-fast",
|
|
FeedbackStatus: "healthy",
|
|
ScoreAdjustment: 90,
|
|
RawScoreAdjustment: 90,
|
|
Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"},
|
|
LastSendDurationMs: 1,
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano),
|
|
}}, time.Now().UTC())
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{firstPacket, secondPacket}); err != nil {
|
|
t.Fatalf("send packet batch: %v", err)
|
|
}
|
|
if len(transport.envelopes) != 2 {
|
|
t.Fatalf("envelope count = %d, want one envelope per logical channel", len(transport.envelopes))
|
|
}
|
|
for _, envelope := range transport.envelopes {
|
|
if envelope.RouteID != "route-fast" {
|
|
t.Fatalf("envelope route = %s, want quality-preferred fast route", envelope.RouteID)
|
|
}
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
for _, channelID := range []string{firstChannel, secondChannel} {
|
|
stat := snapshot.FlowScheduler.ChannelStats[channelID]
|
|
if stat.LastRouteID != "route-fast" || stat.Served != 1 || stat.Dropped != 0 {
|
|
t.Fatalf("channel %s stat = %+v, want fair fast-route service without drops", channelID, stat)
|
|
}
|
|
if stat.QualityPreferenceRouteID != "route-fast" || stat.QualityPreferenceScore != 90 {
|
|
t.Fatalf("channel %s quality stat = %+v, want applied quality preference", channelID, stat)
|
|
}
|
|
}
|
|
if snapshot.FlowScheduler.Dropped != 0 || snapshot.SendFlowDropped != 0 {
|
|
t.Fatalf("drop counters = scheduler:%d ingress:%d", snapshot.FlowScheduler.Dropped, snapshot.SendFlowDropped)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressClearsStaleQualityPreferenceMarkers(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
packet := testIPv4TCPPacket([4]byte{10, 80, 0, 2}, [4]byte{192, 168, 200, 95}, 51003, 3389)
|
|
channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue())
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-slow",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-slow", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-fast",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.PreferClientRoute("route-slow")
|
|
ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{
|
|
RouteID: "route-fast",
|
|
FeedbackStatus: "healthy",
|
|
ScoreAdjustment: 90,
|
|
RawScoreAdjustment: 90,
|
|
Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"},
|
|
LastSendDurationMs: 1,
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano),
|
|
}}, time.Now().UTC())
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send packet: %v", err)
|
|
}
|
|
stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID]
|
|
if stat.QualityPreferenceRouteID != "route-fast" {
|
|
t.Fatalf("quality preference marker = %+v, want route-fast", stat)
|
|
}
|
|
|
|
ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{
|
|
RouteID: "route-fast",
|
|
FeedbackStatus: "healthy",
|
|
ScoreAdjustment: 90,
|
|
RawScoreAdjustment: 90,
|
|
ExpiresAt: time.Now().UTC().Add(-time.Second).Format(time.RFC3339Nano),
|
|
}}, time.Now().UTC())
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.RouteQualityPreferenceCount != 0 {
|
|
t.Fatalf("quality preference count = %d, want expired preference removed", snapshot.RouteQualityPreferenceCount)
|
|
}
|
|
stat = snapshot.FlowScheduler.ChannelStats[channelID]
|
|
if stat.QualityPreferenceRouteID != "" || stat.QualityPreferenceScore != 0 || len(stat.QualityPreferenceReasons) != 0 {
|
|
t.Fatalf("stale quality marker = %+v, want cleared", stat)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressClearsWithdrawnQualityPreferenceMarkers(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
packet := testIPv4TCPPacket([4]byte{10, 81, 0, 2}, [4]byte{192, 168, 200, 95}, 51004, 3389)
|
|
channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue())
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{
|
|
{
|
|
RouteID: "route-slow",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-slow", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
{
|
|
RouteID: "route-fast",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
},
|
|
}
|
|
},
|
|
}
|
|
ingress.PreferClientRoute("route-slow")
|
|
ingress.UpdateRouteQualityPreferences([]FabricServiceChannelRouteQualityPreference{{
|
|
RouteID: "route-fast",
|
|
FeedbackStatus: "healthy",
|
|
ScoreAdjustment: 90,
|
|
RawScoreAdjustment: 90,
|
|
Reasons: []string{"service_channel_recent_success", "service_channel_quality_latency_le_10ms"},
|
|
LastSendDurationMs: 1,
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute).Format(time.RFC3339Nano),
|
|
}}, time.Now().UTC())
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send packet: %v", err)
|
|
}
|
|
ingress.UpdateRouteManager([]FabricServiceChannelRouteManagerDecision{{
|
|
RouteID: "route-fast",
|
|
ReplacementRouteID: "route-slow",
|
|
RebuildStatus: "applied",
|
|
DecisionSource: "test_withdraw_quality_route",
|
|
}}, "config-withdraw-quality", time.Now().UTC())
|
|
stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID]
|
|
if stat.QualityPreferenceRouteID != "" || stat.QualityPreferenceScore != 0 {
|
|
t.Fatalf("withdrawn quality marker = %+v, want cleared", stat)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressReportsRoutePolicyProvenance(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
packet := testIPv4TCPPacket([4]byte{10, 91, 0, 2}, [4]byte{192, 168, 200, 95}, 51014, 443)
|
|
channelID := testFlowChannelID("vpn-1", packet, scheduler.shardCountValue())
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(8),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
RecoveryPolicyFingerprint: "policy-fp-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
RouteVersion: "route-v1",
|
|
PolicyVersion: "policy-v1",
|
|
}}
|
|
},
|
|
}
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{packet}); err != nil {
|
|
t.Fatalf("send packet: %v", err)
|
|
}
|
|
stat := ingress.Snapshot("cluster-1").FlowScheduler.ChannelStats[channelID]
|
|
if stat.LastRouteID != "route-primary" ||
|
|
stat.RoutePolicyVersion != "policy-v1" ||
|
|
stat.RouteGeneration != "policy-v1" ||
|
|
stat.RecoveryPolicyFingerprint != "policy-fp-1" {
|
|
t.Fatalf("route provenance stat = %+v", stat)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressBoundedLoadReportsPerChannelDrops(t *testing.T) {
|
|
scheduler := NewFabricFlowScheduler(8, 8)
|
|
packet := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443)
|
|
packets := make([][]byte, 0, 32)
|
|
for i := 0; i < 32; i++ {
|
|
packets = append(packets, packetWithSameShard(packet, scheduler.shardCountValue()))
|
|
}
|
|
transport := &captureManyProductionTransport{}
|
|
ingress := &FabricClientPacketIngress{
|
|
ForwardTransport: transport,
|
|
Inbox: NewFabricPacketInbox(32),
|
|
FlowScheduler: scheduler,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
Routes: func() []mesh.SyntheticRoute {
|
|
return []mesh.SyntheticRoute{{
|
|
RouteID: "route-primary",
|
|
ClusterID: "cluster-1",
|
|
SourceNodeID: "entry-1",
|
|
DestinationNodeID: "exit-1",
|
|
Hops: []string{"entry-1", "relay-primary", "exit-1"},
|
|
AllowedChannels: []string{mesh.ProductionChannelVPNPacket},
|
|
ExpiresAt: time.Now().UTC().Add(time.Minute),
|
|
MaxTTL: 8,
|
|
}}
|
|
},
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", packets); err != nil {
|
|
t.Fatalf("send congested load batch: %v", err)
|
|
}
|
|
snapshot := ingress.Snapshot("cluster-1")
|
|
if snapshot.FlowScheduler.QueueCapacity != 8 || snapshot.FlowScheduler.HighWatermark != 8 {
|
|
t.Fatalf("capacity/high watermark = %d/%d, want 8/8", snapshot.FlowScheduler.QueueCapacity, snapshot.FlowScheduler.HighWatermark)
|
|
}
|
|
if snapshot.FlowScheduler.Dropped != 24 || snapshot.SendFlowDropped != 24 || !snapshot.FlowScheduler.BackpressureActive {
|
|
t.Fatalf("drop telemetry = %+v send_flow_dropped=%d", snapshot.FlowScheduler, snapshot.SendFlowDropped)
|
|
}
|
|
if snapshot.SendFlowPackets != 8 || len(transport.envelopes) != 1 {
|
|
t.Fatalf("sent packets/envelopes = %d/%d, want bounded 8/1", snapshot.SendFlowPackets, len(transport.envelopes))
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressUsesLocalGatewayShortcutWithoutRoute(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
ingress := &FabricClientPacketIngress{
|
|
Inbox: inbox,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
AllowLegacyLocalGatewayFallback: true,
|
|
LocalGateway: func(vpnConnectionID string) bool {
|
|
return vpnConnectionID == "vpn-1"
|
|
},
|
|
Routes: func() []mesh.SyntheticRoute { return nil },
|
|
}
|
|
|
|
if err := ingress.SendClientPacketBatch(context.Background(), "cluster-1", "vpn-1", [][]byte{[]byte("packet")}); err != nil {
|
|
t.Fatalf("send local gateway packet: %v", err)
|
|
}
|
|
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionClientToGateway, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive local gateway packet: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "packet" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
}
|
|
|
|
func TestFabricClientPacketIngressReceivesLocalGatewayReplyWithoutRoute(t *testing.T) {
|
|
inbox := NewFabricPacketInbox(4)
|
|
ingress := &FabricClientPacketIngress{
|
|
Inbox: inbox,
|
|
ClusterID: "cluster-1",
|
|
LocalNodeID: "entry-1",
|
|
AllowLegacyLocalGatewayFallback: true,
|
|
LocalGateway: func(vpnConnectionID string) bool {
|
|
return vpnConnectionID == "vpn-1"
|
|
},
|
|
Routes: func() []mesh.SyntheticRoute { return nil },
|
|
}
|
|
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{[]byte("reply")}); err != nil {
|
|
t.Fatalf("deliver local gateway reply: %v", err)
|
|
}
|
|
|
|
packets, err := ingress.ReceiveClientPacketBatch(context.Background(), "cluster-1", "vpn-1", time.Second)
|
|
if err != nil {
|
|
t.Fatalf("receive local gateway reply: %v", err)
|
|
}
|
|
if len(packets) != 1 || string(packets[0]) != "reply" {
|
|
t.Fatalf("packets = %#v", packets)
|
|
}
|
|
}
|
|
|
|
func testIPv4TCPPacket(src [4]byte, dst [4]byte, srcPort uint16, dstPort uint16) []byte {
|
|
packet := make([]byte, 40)
|
|
packet[0] = 0x45
|
|
packet[2] = 0
|
|
packet[3] = 40
|
|
packet[8] = 64
|
|
packet[9] = 6
|
|
copy(packet[12:16], src[:])
|
|
copy(packet[16:20], dst[:])
|
|
packet[20] = byte(srcPort >> 8)
|
|
packet[21] = byte(srcPort)
|
|
packet[22] = byte(dstPort >> 8)
|
|
packet[23] = byte(dstPort)
|
|
packet[32] = 0x50
|
|
return packet
|
|
}
|
|
|
|
func packetWithDifferentShard(reference []byte, shardCount int) []byte {
|
|
_, referenceShard := classifyPacketFlow(reference, shardCount)
|
|
for port := uint16(10000); port < 11000; port++ {
|
|
candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443)
|
|
_, shard := classifyPacketFlow(candidate, shardCount)
|
|
if shard != referenceShard {
|
|
return candidate
|
|
}
|
|
}
|
|
return testIPv4TCPPacket([4]byte{10, 77, 0, 3}, [4]byte{192, 168, 200, 96}, 52000, 443)
|
|
}
|
|
|
|
func packetShard(packet []byte, shardCount int) int {
|
|
_, shard := classifyPacketFlow(packet, shardCount)
|
|
return shard
|
|
}
|
|
|
|
func packetSourcePort(packet []byte) uint16 {
|
|
if len(packet) < 22 {
|
|
return 0
|
|
}
|
|
return uint16(packet[20])<<8 | uint16(packet[21])
|
|
}
|
|
|
|
func testFlowChannelID(vpnConnectionID string, packet []byte, shardCount int) string {
|
|
return fabricFlowChannelID(vpnConnectionID, packetShard(packet, shardCount))
|
|
}
|
|
|
|
func packetWithSameShard(reference []byte, shardCount int) []byte {
|
|
_, referenceShard := classifyPacketFlow(reference, shardCount)
|
|
for port := uint16(11000); port < 12000; port++ {
|
|
candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 3389)
|
|
_, shard := classifyPacketFlow(candidate, shardCount)
|
|
if shard == referenceShard {
|
|
return candidate
|
|
}
|
|
}
|
|
return append([]byte{}, reference...)
|
|
}
|
|
|
|
func packetsForDistinctShards(count int, shardCount int) [][]byte {
|
|
packets := make([][]byte, 0, count)
|
|
seen := map[int]struct{}{}
|
|
for port := uint16(10000); port < 65000 && len(packets) < count; port++ {
|
|
candidate := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, port, 443)
|
|
_, shard := classifyPacketFlow(candidate, shardCount)
|
|
if _, ok := seen[shard]; ok {
|
|
continue
|
|
}
|
|
seen[shard] = struct{}{}
|
|
packets = append(packets, candidate)
|
|
}
|
|
return packets
|
|
}
|
|
|
|
func packetsForOrderedDistinctChannels(shardCount int) ([]byte, []byte) {
|
|
packets := packetsForDistinctShards(shardCount, shardCount)
|
|
if len(packets) < 2 {
|
|
panic("not enough distinct channel packets")
|
|
}
|
|
for _, left := range packets {
|
|
leftChannel := testFlowChannelID("vpn-1", left, shardCount)
|
|
for _, right := range packets {
|
|
rightChannel := testFlowChannelID("vpn-1", right, shardCount)
|
|
if leftChannel < rightChannel {
|
|
return left, right
|
|
}
|
|
}
|
|
}
|
|
panic("failed to find ordered distinct channel packets")
|
|
}
|
|
|
|
func expectedScheduledChannelCount(scheduler *FabricFlowScheduler, packets [][]byte) int {
|
|
channels := map[string]struct{}{}
|
|
for _, packet := range packets {
|
|
_, shard := classifyPacketFlow(packet, scheduler.shardCountValue())
|
|
channels[fmt.Sprintf("flow-%02d", shard)] = struct{}{}
|
|
}
|
|
return len(channels)
|
|
}
|
|
|
|
func closeOnce(ch chan struct{}) {
|
|
defer func() { _ = recover() }()
|
|
close(ch)
|
|
}
|