diff --git a/agents/rap-node-agent/internal/agent/payload.go b/agents/rap-node-agent/internal/agent/payload.go index 628632e..398eb3a 100644 --- a/agents/rap-node-agent/internal/agent/payload.go +++ b/agents/rap-node-agent/internal/agent/payload.go @@ -7,7 +7,7 @@ import ( "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" ) -const Version = "0.2.273-vpnclass" +const Version = "0.2.277-vpnpribatch20" func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest { return client.EnrollRequest{ diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go index 7c1ae32..2098362 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport.go @@ -1794,17 +1794,22 @@ func (t *FabricPacketTransport) ReceiveGatewayPacketBatch(ctx context.Context, t type FabricPacketInbox struct { capacity int mu sync.Mutex - queues map[string]chan mesh.VPNPacketBatchPayload + queues map[string]*fabricPacketInboxQueue dropped uint64 } +type fabricPacketInboxQueue struct { + normal chan mesh.VPNPacketBatchPayload + priority chan mesh.VPNPacketBatchPayload +} + func NewFabricPacketInbox(capacity int) *FabricPacketInbox { if capacity <= 0 { capacity = 4096 } return &FabricPacketInbox{ capacity: capacity, - queues: map[string]chan mesh.VPNPacketBatchPayload{}, + queues: map[string]*fabricPacketInboxQueue{}, } } @@ -1860,12 +1865,46 @@ func (i *FabricPacketInbox) Receive(ctx context.Context, vpnConnectionID, direct defer timer.Stop() queue := i.queue(vpnConnectionID, direction) for { + select { + case payload := <-queue.priority: + packets := cleanPacketBatch(payload.Packets) + if len(packets) == 0 { + continue + } + return packets, nil + default: + } + if len(queue.normal) > 0 { + priorityTimer := time.NewTimer(2 * time.Millisecond) + select { + case <-ctx.Done(): + priorityTimer.Stop() + return nil, ctx.Err() + case <-timer.C: + priorityTimer.Stop() + return nil, nil + case payload := <-queue.priority: + priorityTimer.Stop() + packets := cleanPacketBatch(payload.Packets) + if len(packets) == 0 { + continue + } + return packets, nil + case <-priorityTimer.C: + } + } select { case <-ctx.Done(): return nil, ctx.Err() case <-timer.C: return nil, nil - case payload := <-queue: + case payload := <-queue.priority: + packets := cleanPacketBatch(payload.Packets) + if len(packets) == 0 { + continue + } + return packets, nil + case payload := <-queue.normal: packets := cleanPacketBatch(payload.Packets) if len(packets) == 0 { continue @@ -1877,8 +1916,12 @@ func (i *FabricPacketInbox) Receive(ctx context.Context, vpnConnectionID, direct func (i *FabricPacketInbox) enqueue(payload mesh.VPNPacketBatchPayload) error { queue := i.queue(payload.VPNConnectionID, payload.Direction) + target := queue.normal + if payload.Direction == FabricDirectionGatewayToClient && batchHasTCPControlPacket(payload.Packets) { + target = queue.priority + } select { - case queue <- payload: + case target <- payload: default: i.mu.Lock() i.dropped++ @@ -1887,21 +1930,41 @@ func (i *FabricPacketInbox) enqueue(payload mesh.VPNPacketBatchPayload) error { return nil } -func (i *FabricPacketInbox) queue(vpnConnectionID, direction string) chan mesh.VPNPacketBatchPayload { +func (i *FabricPacketInbox) queue(vpnConnectionID, direction string) *fabricPacketInboxQueue { key := vpnConnectionID + "\x00" + direction i.mu.Lock() defer i.mu.Unlock() if i.queues == nil { - i.queues = map[string]chan mesh.VPNPacketBatchPayload{} + i.queues = map[string]*fabricPacketInboxQueue{} } queue, ok := i.queues[key] if !ok { - queue = make(chan mesh.VPNPacketBatchPayload, i.capacity) + priorityCapacity := maxInt(1, i.capacity/4) + queue = &fabricPacketInboxQueue{ + normal: make(chan mesh.VPNPacketBatchPayload, i.capacity), + priority: make(chan mesh.VPNPacketBatchPayload, priorityCapacity), + } i.queues[key] = queue } return queue } +func batchHasTCPControlPacket(packets [][]byte) bool { + for _, packet := range packets { + if isTCPControlPacket(packet) { + return true + } + } + return false +} + +func maxInt(a, b int) int { + if a > b { + return a + } + return b +} + func (i *FabricPacketInbox) Dropped() uint64 { if i == nil { return 0 @@ -1933,7 +1996,7 @@ func (i *FabricPacketInbox) Snapshot() FabricPacketInboxSnapshot { snapshot.Dropped = i.dropped snapshot.QueueCount = len(i.queues) for key, queue := range i.queues { - snapshot.QueueDepths[strings.ReplaceAll(key, "\x00", ":")] = len(queue) + snapshot.QueueDepths[strings.ReplaceAll(key, "\x00", ":")] = len(queue.normal) + len(queue.priority) } return snapshot } diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 42aefd2..dbb3b78 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -241,6 +241,51 @@ func TestFabricPacketInboxDropsEmptyPackets(t *testing.T) { } } +func TestFabricPacketInboxPrioritizesGatewayTCPControlPackets(t *testing.T) { + inbox := NewFabricPacketInbox(4) + normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000) + priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032) + priority[33] = 0x12 + + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil { + t.Fatalf("deliver normal gateway packet: %v", err) + } + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority}); err != nil { + t.Fatalf("deliver priority gateway packet: %v", err) + } + + packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) + if err != nil { + t.Fatalf("receive gateway packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != string(priority) { + t.Fatalf("first packets = %#v, want priority tcp control packet", packets) + } +} + +func TestFabricPacketInboxWaitsBrieflyForGatewayTCPControlPackets(t *testing.T) { + inbox := NewFabricPacketInbox(4) + normal := testIPv4TCPPacket([4]byte{185, 16, 148, 89}, [4]byte{10, 77, 0, 2}, 443, 56000) + priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 57032) + priority[33] = 0x12 + + if err := inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{normal}); err != nil { + t.Fatalf("deliver normal gateway packet: %v", err) + } + go func() { + time.Sleep(time.Millisecond) + _ = inbox.DeliverLocalPacketBatch("vpn-1", FabricDirectionGatewayToClient, [][]byte{priority}) + }() + + packets, err := inbox.Receive(context.Background(), "vpn-1", FabricDirectionGatewayToClient, time.Second) + if err != nil { + t.Fatalf("receive gateway packet: %v", err) + } + if len(packets) != 1 || string(packets[0]) != string(priority) { + t.Fatalf("first packets = %#v, want delayed priority tcp control packet", packets) + } +} + func TestLocalPacketTransportUsesFabricInboxDirections(t *testing.T) { inbox := NewFabricPacketInbox(4) transport := &LocalPacketTransport{Inbox: inbox, VPNConnectionID: "vpn-1"} diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway.go b/agents/rap-node-agent/internal/vpnruntime/gateway.go index f5d471d..e86f10e 100644 --- a/agents/rap-node-agent/internal/vpnruntime/gateway.go +++ b/agents/rap-node-agent/internal/vpnruntime/gateway.go @@ -51,6 +51,7 @@ const ( vpnGatewayBatchMaxPackets = 2048 vpnGatewayBatchMaxBytes = 8 * 1024 * 1024 vpnGatewayBatchFlushTimeout = 3 * time.Millisecond + vpnGatewayPriorityBatchWait = 20 * time.Millisecond ) type readWriteCloser interface { @@ -350,9 +351,34 @@ func (g *Gateway) uploadGatewayPackets(ctx context.Context, priorityPackets <-ch } flushPriority := func(packet []byte) { flush() - if addPacket(packet) { - flush() + if !addPacket(packet) { + return } + deadline := time.Now().Add(vpnGatewayPriorityBatchWait) + for len(batch) < vpnGatewayBatchMaxPackets && batchBytes < vpnGatewayBatchMaxBytes { + wait := time.Until(deadline) + if wait <= 0 { + break + } + timer := time.NewTimer(wait) + select { + case next := <-priorityPackets: + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + if !addPacket(next) { + flush() + _ = addPacket(next) + } + case <-timer.C: + flush() + return + } + } + flush() } for { if len(batch) == 0 && timerActive { diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway_test.go b/agents/rap-node-agent/internal/vpnruntime/gateway_test.go index 428882f..3732a0c 100644 --- a/agents/rap-node-agent/internal/vpnruntime/gateway_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/gateway_test.go @@ -76,6 +76,46 @@ func TestGatewayUploadPrioritizesTCPControlPackets(t *testing.T) { } } +func TestGatewayUploadMicroBatchesTCPControlPackets(t *testing.T) { + transport := &recordingGatewayTransport{} + gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"} + priorityPackets := make(chan []byte, 2) + packets := make(chan []byte, 1) + + first := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000) + first[33] = 0x12 + second := testIPv4TCPPacket([4]byte{188, 40, 167, 82}, [4]byte{10, 77, 0, 2}, 80, 51002) + second[33] = 0x12 + priorityPackets <- first + priorityPackets <- second + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + done <- gateway.uploadGatewayPackets(ctx, priorityPackets, packets) + }() + defer func() { + cancel() + <-done + }() + + deadline := time.After(time.Second) + for { + if batch := transport.firstBatch(); len(batch) == 2 { + if string(batch[0]) != string(first) || string(batch[1]) != string(second) { + t.Fatalf("priority batch = %#v, want both control packets in order", batch) + } + return + } + select { + case <-deadline: + t.Fatal("timed out waiting for priority microbatch") + default: + time.Sleep(time.Millisecond) + } + } +} + func TestIsTCPControlPacket(t *testing.T) { packet := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000) if isTCPControlPacket(packet) { diff --git a/clients/android/app/build.gradle b/clients/android/app/build.gradle index f2caae7..d172af6 100644 --- a/clients/android/app/build.gradle +++ b/clients/android/app/build.gradle @@ -30,8 +30,8 @@ android { applicationId "su.cin.rapvpn" minSdk 26 targetSdk 35 - versionCode 202 - versionName "0.2.202" + versionCode 203 + versionName "0.2.203" buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\"" buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\"" buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\"" diff --git a/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java b/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java index 5920941..6b2e5cb 100644 --- a/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java +++ b/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java @@ -26,6 +26,7 @@ final class VpnPacketWebSocketRelay { private static final int MAX_SINGLE_PACKET_BYTES = 65535; private static final long CONNECTING_STALE_MS = 8000; private static final long OPEN_WAIT_MS = 3500; + private static final int PRIORITY_GRACE_MS = 2; private final String baseUrl; private final VpnService vpnService; @@ -158,7 +159,11 @@ final class VpnPacketWebSocketRelay { if (packets != null) { return packets; } - packets = incoming.poll(Math.min(2, waitMs), TimeUnit.MILLISECONDS); + packets = priorityIncoming.poll(Math.min(PRIORITY_GRACE_MS, waitMs), TimeUnit.MILLISECONDS); + if (packets != null) { + return packets; + } + packets = incoming.poll(); if (packets != null) { return packets; } @@ -166,7 +171,7 @@ final class VpnPacketWebSocketRelay { if (packets != null) { return packets; } - packets = incoming.poll(Math.max(1, waitMs - 2), TimeUnit.MILLISECONDS); + packets = incoming.poll(Math.max(1, waitMs - PRIORITY_GRACE_MS), TimeUnit.MILLISECONDS); return packets == null ? new ArrayList<>() : packets; }