Cancel VPN gateway loops together

This commit is contained in:
2026-05-16 12:29:12 +03:00
parent d170820445
commit da59de7042
3 changed files with 69 additions and 4 deletions
@@ -266,15 +266,18 @@ func (g *Gateway) run(ctx context.Context, tun readWriteCloser) error {
if closer, ok := g.Transport.(packetTransportCloser); ok { if closer, ok := g.Transport.(packetTransportCloser); ok {
defer closer.Close() defer closer.Close()
} }
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
errCh := make(chan error, 2) errCh := make(chan error, 2)
go func() { errCh <- g.copyGatewayToClient(ctx, tun) }() go func() { errCh <- g.copyGatewayToClient(runCtx, tun) }()
go func() { errCh <- g.copyClientToGateway(ctx, tun) }() go func() { errCh <- g.copyClientToGateway(runCtx, tun) }()
select { select {
case <-ctx.Done(): case <-runCtx.Done():
return ctx.Err() return runCtx.Err()
case err := <-errCh: case err := <-errCh:
cancel()
return err return err
} }
} }
@@ -2,6 +2,8 @@ package vpnruntime
import ( import (
"context" "context"
"errors"
"io"
"sync" "sync"
"testing" "testing"
"time" "time"
@@ -12,6 +14,45 @@ type recordingGatewayTransport struct {
batches [][][]byte batches [][][]byte
} }
type closingGatewayTransport struct {
closed chan struct{}
}
func (t *closingGatewayTransport) SendGatewayPacketBatch(context.Context, [][]byte) error {
return nil
}
func (t *closingGatewayTransport) ReceiveGatewayPacketBatch(context.Context, time.Duration) ([][]byte, error) {
return [][]byte{[]byte("packet")}, nil
}
func (t *closingGatewayTransport) Close() error {
close(t.closed)
return nil
}
type failingWriteTun struct {
closed chan struct{}
}
func (t failingWriteTun) Read([]byte) (int, error) {
<-t.closed
return 0, io.EOF
}
func (t failingWriteTun) Write([]byte) (int, error) {
return 0, errors.New("write failed")
}
func (t failingWriteTun) Close() error {
select {
case <-t.closed:
default:
close(t.closed)
}
return nil
}
func (t *recordingGatewayTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { func (t *recordingGatewayTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error {
copied := make([][]byte, len(packets)) copied := make([][]byte, len(packets))
for i, packet := range packets { for i, packet := range packets {
@@ -36,6 +77,24 @@ func (t *recordingGatewayTransport) firstBatch() [][]byte {
return t.batches[0] return t.batches[0]
} }
func TestGatewayRunClosesPacketTransportOnRuntimeError(t *testing.T) {
transport := &closingGatewayTransport{closed: make(chan struct{})}
gateway := &Gateway{
Transport: transport,
VPNConnectionID: "vpn-1",
PollTimeout: time.Millisecond,
}
err := gateway.run(context.Background(), failingWriteTun{closed: make(chan struct{})})
if err == nil || err.Error() != "write failed" {
t.Fatalf("run error = %v, want write failed", err)
}
select {
case <-transport.closed:
case <-time.After(time.Second):
t.Fatal("packet transport was not closed")
}
}
func TestGatewayUploadPrioritizesTCPControlPackets(t *testing.T) { func TestGatewayUploadPrioritizesTCPControlPackets(t *testing.T) {
transport := &recordingGatewayTransport{} transport := &recordingGatewayTransport{}
gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"} gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"}
@@ -394,6 +394,9 @@ per-stream maps.
Gateway shutdown now closes all VPN fabric-session stream shards and then the Gateway shutdown now closes all VPN fabric-session stream shards and then the
underlying fabric session, preventing stale logical streams from consuming QUIC underlying fabric session, preventing stale logical streams from consuming QUIC
carrier capacity after reconnects or rollout restarts. carrier capacity after reconnects or rollout restarts.
Gateway runtime cancellation now fans out to both upload and download loops
when either direction exits, so transport cleanup runs promptly on one-sided
TUN or carrier failures.
Endpoint ranking treats `capacity_limited` observations as a soft pressure Endpoint ranking treats `capacity_limited` observations as a soft pressure
penalty instead of a hard recent failure, enabling load spreading without penalty instead of a hard recent failure, enabling load spreading without
marking the carrier unhealthy. marking the carrier unhealthy.