From 50db5e7a0d36134a82c5fb5b9cfd743539440c9b Mon Sep 17 00:00:00 2001 From: Mikhail Date: Fri, 15 May 2026 16:19:24 +0300 Subject: [PATCH] Prioritize VPN gateway control packets --- .../rap-node-agent/internal/agent/payload.go | 2 +- .../internal/vpnruntime/gateway.go | 66 ++++++++++--- .../internal/vpnruntime/gateway_test.go | 92 +++++++++++++++++++ 3 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 agents/rap-node-agent/internal/vpnruntime/gateway_test.go diff --git a/agents/rap-node-agent/internal/agent/payload.go b/agents/rap-node-agent/internal/agent/payload.go index babc1d0..2d6864f 100644 --- a/agents/rap-node-agent/internal/agent/payload.go +++ b/agents/rap-node-agent/internal/agent/payload.go @@ -7,7 +7,7 @@ import ( "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" ) -const Version = "0.2.271-vpnwsfarm" +const Version = "0.2.272-vpnprio" func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest { return client.EnrollRequest{ diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway.go b/agents/rap-node-agent/internal/vpnruntime/gateway.go index 7cdf536..f5d471d 100644 --- a/agents/rap-node-agent/internal/vpnruntime/gateway.go +++ b/agents/rap-node-agent/internal/vpnruntime/gateway.go @@ -261,10 +261,11 @@ func (g *Gateway) run(ctx context.Context, tun readWriteCloser) error { } func (g *Gateway) copyGatewayToClient(ctx context.Context, tun io.Reader) error { + priorityPackets := make(chan []byte, 1024) packets := make(chan []byte, 32768) errCh := make(chan error, 1) go func() { - errCh <- g.uploadGatewayPackets(ctx, packets) + errCh <- g.uploadGatewayPackets(ctx, priorityPackets, packets) }() buffer := make([]byte, 65535) @@ -288,6 +289,15 @@ func (g *Gateway) copyGatewayToClient(ctx context.Context, tun io.Reader) error packet := append([]byte(nil), buffer[:n]...) normalizeIPv4PacketChecksums(packet) g.recordTunRead(packet) + if isTCPControlPacket(packet) { + select { + case priorityPackets <- packet: + default: + g.uploadQueueDrops.Add(1) + log.Printf("vpn gateway priority packet upload queue full; dropping packet: vpn_connection_id=%s", g.VPNConnectionID) + } + continue + } select { case packets <- packet: default: @@ -297,7 +307,7 @@ func (g *Gateway) copyGatewayToClient(ctx context.Context, tun io.Reader) error } } -func (g *Gateway) uploadGatewayPackets(ctx context.Context, packets <-chan []byte) error { +func (g *Gateway) uploadGatewayPackets(ctx context.Context, priorityPackets <-chan []byte, packets <-chan []byte) error { batch := make([][]byte, 0, vpnGatewayBatchMaxPackets) batchBytes := 0 timer := time.NewTimer(time.Hour) @@ -323,6 +333,27 @@ func (g *Gateway) uploadGatewayPackets(ctx context.Context, packets <-chan []byt batch = batch[:0] batchBytes = 0 } + addPacket := func(packet []byte) bool { + packetBytes := len(packet) + if packetBytes <= 0 { + return false + } + packetFrameSize := 4 + packetBytes + if len(batch) > 0 { + if len(batch) >= vpnGatewayBatchMaxPackets || batchBytes+packetFrameSize > vpnGatewayBatchMaxBytes { + flush() + } + } + batch = append(batch, packet) + batchBytes += packetFrameSize + return true + } + flushPriority := func(packet []byte) { + flush() + if addPacket(packet) { + flush() + } + } for { if len(batch) == 0 && timerActive { if !timer.Stop() { @@ -334,22 +365,21 @@ func (g *Gateway) uploadGatewayPackets(ctx context.Context, packets <-chan []byt timerActive = false } select { + case packet := <-priorityPackets: + flushPriority(packet) + continue + default: + } + select { case <-ctx.Done(): flush() return ctx.Err() + case packet := <-priorityPackets: + flushPriority(packet) case packet := <-packets: - packetBytes := len(packet) - if packetBytes <= 0 { + if !addPacket(packet) { continue } - packetFrameSize := 4 + packetBytes - if len(batch) > 0 { - if len(batch) >= vpnGatewayBatchMaxPackets || batchBytes+packetFrameSize > vpnGatewayBatchMaxBytes { - flush() - } - } - batch = append(batch, packet) - batchBytes += packetFrameSize if len(batch) >= vpnGatewayBatchMaxPackets || batchBytes >= vpnGatewayBatchMaxBytes { flush() continue @@ -365,6 +395,18 @@ func (g *Gateway) uploadGatewayPackets(ctx context.Context, packets <-chan []byt } } +func isTCPControlPacket(packet []byte) bool { + if len(packet) < 20 || packet[0]>>4 != 4 { + return false + } + ihl := int(packet[0]&0x0f) * 4 + if ihl < 20 || len(packet) < ihl+20 || packet[9] != 6 { + return false + } + flags := packet[ihl+13] + return flags&0x17 != 0 +} + func (g *Gateway) copyClientToGateway(ctx context.Context, tun io.Writer) error { for { packets, err := g.Transport.ReceiveGatewayPacketBatch(ctx, g.PollTimeout) diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway_test.go b/agents/rap-node-agent/internal/vpnruntime/gateway_test.go new file mode 100644 index 0000000..428882f --- /dev/null +++ b/agents/rap-node-agent/internal/vpnruntime/gateway_test.go @@ -0,0 +1,92 @@ +package vpnruntime + +import ( + "context" + "sync" + "testing" + "time" +) + +type recordingGatewayTransport struct { + mu sync.Mutex + batches [][][]byte +} + +func (t *recordingGatewayTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { + copied := make([][]byte, len(packets)) + for i, packet := range packets { + copied[i] = append([]byte(nil), packet...) + } + t.mu.Lock() + t.batches = append(t.batches, copied) + t.mu.Unlock() + return nil +} + +func (t *recordingGatewayTransport) ReceiveGatewayPacketBatch(ctx context.Context, timeout time.Duration) ([][]byte, error) { + return nil, ctx.Err() +} + +func (t *recordingGatewayTransport) firstBatch() [][]byte { + t.mu.Lock() + defer t.mu.Unlock() + if len(t.batches) == 0 { + return nil + } + return t.batches[0] +} + +func TestGatewayUploadPrioritizesTCPControlPackets(t *testing.T) { + transport := &recordingGatewayTransport{} + gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"} + priorityPackets := make(chan []byte, 1) + packets := make(chan []byte, 1) + + normal := testIPv4TCPPacket([4]byte{101, 32, 118, 25}, [4]byte{10, 77, 0, 2}, 443, 37566) + priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000) + priority[33] = 0x12 + + packets <- normal + priorityPackets <- priority + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + done <- gateway.uploadGatewayPackets(ctx, priorityPackets, packets) + }() + defer func() { + cancel() + <-done + }() + + deadline := time.After(time.Second) + for { + if batch := transport.firstBatch(); len(batch) == 1 { + if string(batch[0]) != string(priority) { + t.Fatalf("first uploaded packet = %#v, want priority packet", batch[0]) + } + return + } + select { + case <-deadline: + t.Fatal("timed out waiting for first gateway upload batch") + default: + time.Sleep(time.Millisecond) + } + } +} + +func TestIsTCPControlPacket(t *testing.T) { + packet := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000) + if isTCPControlPacket(packet) { + t.Fatal("packet without control flags was classified as control") + } + packet[33] = 0x12 + if !isTCPControlPacket(packet) { + t.Fatal("tcp syn-ack was not classified as control") + } + packet[9] = 17 + if isTCPControlPacket(packet) { + t.Fatal("udp packet was classified as tcp control") + } +}