Recover VPN fabric lease expiry

This commit is contained in:
2026-05-15 18:52:37 +03:00
parent 52dfce316d
commit be25ff5725
5 changed files with 115 additions and 5 deletions
@@ -7,7 +7,7 @@ import (
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/state"
)
const Version = "0.2.277-vpnpribatch20"
const Version = "0.2.278-vpnpreempt"
func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest {
return client.EnrollRequest{
@@ -350,8 +350,13 @@ func (g *Gateway) uploadGatewayPackets(ctx context.Context, priorityPackets <-ch
return true
}
flushPriority := func(packet []byte) {
flush()
pendingBatch := batch
pendingBatchBytes := batchBytes
batch = make([][]byte, 0, vpnGatewayBatchMaxPackets)
batchBytes = 0
if !addPacket(packet) {
batch = pendingBatch
batchBytes = pendingBatchBytes
return
}
deadline := time.Now().Add(vpnGatewayPriorityBatchWait)
@@ -379,6 +384,14 @@ func (g *Gateway) uploadGatewayPackets(ctx context.Context, priorityPackets <-ch
}
}
flush()
if len(pendingBatch) > 0 {
batch = pendingBatch
batchBytes = pendingBatchBytes
if !timerActive {
timer.Reset(vpnGatewayBatchFlushTimeout)
timerActive = true
}
}
}
for {
if len(batch) == 0 && timerActive {
@@ -76,6 +76,47 @@ func TestGatewayUploadPrioritizesTCPControlPackets(t *testing.T) {
}
}
func TestGatewayUploadPreemptsPendingNormalBatchForTCPControlPackets(t *testing.T) {
transport := &recordingGatewayTransport{}
gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"}
priorityPackets := make(chan []byte, 1)
packets := make(chan []byte, 1)
normal := testIPv4TCPPacket([4]byte{101, 32, 118, 25}, [4]byte{10, 77, 0, 2}, 443, 37566)
priority := testIPv4TCPPacket([4]byte{192, 168, 200, 95}, [4]byte{10, 77, 0, 2}, 3389, 51000)
priority[33] = 0x12
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
done <- gateway.uploadGatewayPackets(ctx, priorityPackets, packets)
}()
defer func() {
cancel()
<-done
}()
packets <- normal
time.Sleep(time.Millisecond)
priorityPackets <- priority
deadline := time.After(time.Second)
for {
if batch := transport.firstBatch(); len(batch) == 1 {
if string(batch[0]) != string(priority) {
t.Fatalf("first uploaded packet = %#v, want priority packet before pending normal batch", batch[0])
}
return
}
select {
case <-deadline:
t.Fatal("timed out waiting for preempted priority upload batch")
default:
time.Sleep(time.Millisecond)
}
}
}
func TestGatewayUploadMicroBatchesTCPControlPackets(t *testing.T) {
transport := &recordingGatewayTransport{}
gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"}