Reduce VPN control packet latency

This commit is contained in:
2026-05-15 18:19:55 +03:00
parent ceda460d09
commit 52dfce316d
7 changed files with 194 additions and 15 deletions
@@ -7,7 +7,7 @@ import (
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/state"
)
const Version = "0.2.273-vpnclass"
const Version = "0.2.277-vpnpribatch20"
func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest {
return client.EnrollRequest{
@@ -1794,17 +1794,22 @@ func (t *FabricPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, t
type FabricPacketInbox struct {
capacity int
mu sync.Mutex
queues map[string]chan mesh.VPNPacketBatchPayload
queues map[string]*fabricPacketInboxQueue
dropped uint64
}
type fabricPacketInboxQueue struct {
normal chan mesh.VPNPacketBatchPayload
priority chan mesh.VPNPacketBatchPayload
}
func NewFabricPacketInbox(capacity int) *FabricPacketInbox {
if capacity <= 0 {
capacity = 4096
}
return &FabricPacketInbox{
capacity: capacity,
queues: map[string]chan mesh.VPNPacketBatchPayload{},
queues: map[string]*fabricPacketInboxQueue{},
}
}
@@ -1860,12 +1865,46 @@ func (i *FabricPacketInbox) Receive(ctx context.Context, vpnConnectionID, direct
defer timer.Stop()
queue := i.queue(vpnConnectionID, direction)
for {
select {
case payload := <-queue.priority:
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
}
return packets, nil
default:
}
if len(queue.normal) > 0 {
priorityTimer := time.NewTimer(2 * time.Millisecond)
select {
case <-ctx.Done():
priorityTimer.Stop()
return nil, ctx.Err()
case <-timer.C:
priorityTimer.Stop()
return nil, nil
case payload := <-queue.priority:
priorityTimer.Stop()
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
}
return packets, nil
case <-priorityTimer.C:
}
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
return nil, nil
case payload := <-queue:
case payload := <-queue.priority:
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
}
return packets, nil
case payload := <-queue.normal:
packets := cleanPacketBatch(payload.Packets)
if len(packets) == 0 {
continue
@@ -1877,8 +1916,12 @@ func (i *FabricPacketInbox) Receive(ctx context.Context, vpnConnectionID, direct
func (i *FabricPacketInbox) enqueue(payload mesh.VPNPacketBatchPayload) error {
queue := i.queue(payload.VPNConnectionID, payload.Direction)
target := queue.normal
if payload.Direction == FabricDirectionGatewayToClient && batchHasTCPControlPacket(payload.Packets) {
target = queue.priority
}
select {
case queue <- payload:
case target <- payload:
default:
i.mu.Lock()
i.dropped++
@@ -1887,21 +1930,41 @@ func (i *FabricPacketInbox) enqueue(payload mesh.VPNPacketBatchPayload) error {
return nil
}
func (i *FabricPacketInbox) queue(vpnConnectionID, direction string) chan mesh.VPNPacketBatchPayload {
func (i *FabricPacketInbox) queue(vpnConnectionID, direction string) *fabricPacketInboxQueue {
key := vpnConnectionID + "\x00" + direction
i.mu.Lock()
defer i.mu.Unlock()
if i.queues == nil {
i.queues = map[string]chan mesh.VPNPacketBatchPayload{}
i.queues = map[string]*fabricPacketInboxQueue{}
}
queue, ok := i.queues[key]
if !ok {
queue = make(chan mesh.VPNPacketBatchPayload, i.capacity)
priorityCapacity := maxInt(1, i.capacity/4)
queue = &fabricPacketInboxQueue{
normal: make(chan mesh.VPNPacketBatchPayload, i.capacity),
priority: make(chan mesh.VPNPacketBatchPayload, priorityCapacity),
}
i.queues[key] = queue
}
return queue
}
func batchHasTCPControlPacket(packets [][]byte) bool {
for _, packet := range packets {
if isTCPControlPacket(packet) {
return true
}
}
return false
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
func (i *FabricPacketInbox) Dropped() uint64 {
if i == nil {
return 0
@@ -1933,7 +1996,7 @@ func (i *FabricPacketInbox) Snapshot() FabricPacketInboxSnapshot {
snapshot.Dropped = i.dropped
snapshot.QueueCount = len(i.queues)
for key, queue := range i.queues {
snapshot.QueueDepths[strings.ReplaceAll(key, "\x00", ":")] = len(queue)
snapshot.QueueDepths[strings.ReplaceAll(key, "\x00", ":")] = len(queue.normal) + len(queue.priority)
}
return snapshot
}
@@ -241,6 +241,51 @@ func TestFabricPacketInboxDropsEmptyPackets(t *testing.T) {
}
}
func TestFabricPacketInboxPrioritizesGatewayTCPControlPackets(t *testing.T) {
inbox := NewFabricPacketInbox(4)
normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000)
priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032)
priority[33] = 0x12
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil {
t.Fatalf("deliver normal gateway packet: %v", err)
}
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority}); err != nil {
t.Fatalf("deliver priority gateway packet: %v", err)
}
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
if err != nil {
t.Fatalf("receive gateway packet: %v", err)
}
if len(packets) != 1 || string(packets[0]) != string(priority) {
t.Fatalf("first packets = %#v, want priority tcp control packet", packets)
}
}
func TestFabricPacketInboxWaitsBrieflyForGatewayTCPControlPackets(t *testing.T) {
inbox := NewFabricPacketInbox(4)
normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000)
priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032)
priority[33] = 0x12
if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil {
t.Fatalf("deliver normal gateway packet: %v", err)
}
go func() {
time.Sleep(time.Millisecond)
_ = inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority})
}()
packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second)
if err != nil {
t.Fatalf("receive gateway packet: %v", err)
}
if len(packets) != 1 || string(packets[0]) != string(priority) {
t.Fatalf("first packets = %#v, want delayed priority tcp control packet", packets)
}
}
func TestLocalPacketTransportUsesFabricInboxDirections(t *testing.T) {
inbox := NewFabricPacketInbox(4)
transport := &LocalPacketTransport{Inbox: inbox, VPNConnectionID: "vpn-1"}
@@ -51,6 +51,7 @@ const (
vpnGatewayBatchMaxPackets = 2048
vpnGatewayBatchMaxBytes = 8 * 1024 * 1024
vpnGatewayBatchFlushTimeout = 3 * time.Millisecond
vpnGatewayPriorityBatchWait = 20 * time.Millisecond
)
type readWriteCloser interface {
@@ -350,9 +351,34 @@ func (g *Gateway) uploadGatewayPackets(ctx context.Context, priorityPackets <-ch
}
flushPriority := func(packet []byte) {
flush()
if addPacket(packet) {
flush()
if !addPacket(packet) {
return
}
deadline := time.Now().Add(vpnGatewayPriorityBatchWait)
for len(batch) < vpnGatewayBatchMaxPackets && batchBytes < vpnGatewayBatchMaxBytes {
wait := time.Until(deadline)
if wait <= 0 {
break
}
timer := time.NewTimer(wait)
select {
case next := <-priorityPackets:
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
if !addPacket(next) {
flush()
_ = addPacket(next)
}
case <-timer.C:
flush()
return
}
}
flush()
}
for {
if len(batch) == 0 && timerActive {
@@ -76,6 +76,46 @@ func TestGatewayUploadPrioritizesTCPControlPackets(t *testing.T) {
}
}
func TestGatewayUploadMicroBatchesTCPControlPackets(t *testing.T) {
transport := &recordingGatewayTransport{}
gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"}
priorityPackets := make(chan []byte, 2)
packets := make(chan []byte, 1)
first := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000)
first[33] = 0x12
second := testIPv4TCPPacket([4]byte{188, 40, 167, 82}, [4]byte{10, 77, 0, 2}, 80, 51002)
second[33] = 0x12
priorityPackets <- first
priorityPackets <- second
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
done <- gateway.uploadGatewayPackets(ctx, priorityPackets, packets)
}()
defer func() {
cancel()
<-done
}()
deadline := time.After(time.Second)
for {
if batch := transport.firstBatch(); len(batch) == 2 {
if string(batch[0]) != string(first) || string(batch[1]) != string(second) {
t.Fatalf("priority batch = %#v, want both control packets in order", batch)
}
return
}
select {
case <-deadline:
t.Fatal("timed out waiting for priority microbatch")
default:
time.Sleep(time.Millisecond)
}
}
}
func TestIsTCPControlPacket(t *testing.T) {
packet := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000)
if isTCPControlPacket(packet) {