diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway.go b/agents/rap-node-agent/internal/vpnruntime/gateway.go index 35efbc5..36eb217 100644 --- a/agents/rap-node-agent/internal/vpnruntime/gateway.go +++ b/agents/rap-node-agent/internal/vpnruntime/gateway.go @@ -266,15 +266,18 @@ func (g *Gateway) run(ctx context.Context, tun readWriteCloser) error { if closer, ok := g.Transport.(packetTransportCloser); ok { defer closer.Close() } + runCtx, cancel := context.WithCancel(ctx) + defer cancel() errCh := make(chan error, 2) - go func() { errCh <- g.copyGatewayToClient(ctx, tun) }() - go func() { errCh <- g.copyClientToGateway(ctx, tun) }() + go func() { errCh <- g.copyGatewayToClient(runCtx, tun) }() + go func() { errCh <- g.copyClientToGateway(runCtx, tun) }() select { - case <-ctx.Done(): - return ctx.Err() + case <-runCtx.Done(): + return runCtx.Err() case err := <-errCh: + cancel() return err } } diff --git a/agents/rap-node-agent/internal/vpnruntime/gateway_test.go b/agents/rap-node-agent/internal/vpnruntime/gateway_test.go index 9a03657..36c6e00 100644 --- a/agents/rap-node-agent/internal/vpnruntime/gateway_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/gateway_test.go @@ -2,6 +2,8 @@ package vpnruntime import ( "context" + "errors" + "io" "sync" "testing" "time" @@ -12,6 +14,45 @@ type recordingGatewayTransport struct { batches [][][]byte } +type closingGatewayTransport struct { + closed chan struct{} +} + +func (t *closingGatewayTransport) SendGatewayPacketBatch(context.Context, [][]byte) error { + return nil +} + +func (t *closingGatewayTransport) ReceiveGatewayPacketBatch(context.Context, time.Duration) ([][]byte, error) { + return [][]byte{[]byte("packet")}, nil +} + +func (t *closingGatewayTransport) Close() error { + close(t.closed) + return nil +} + +type failingWriteTun struct { + closed chan struct{} +} + +func (t failingWriteTun) Read([]byte) (int, error) { + <-t.closed + return 0, io.EOF +} + +func (t failingWriteTun) Write([]byte) (int, error) { + return 0, errors.New("write failed") +} + +func (t failingWriteTun) Close() error { + select { + case <-t.closed: + default: + close(t.closed) + } + return nil +} + func (t *recordingGatewayTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { copied := make([][]byte, len(packets)) for i, packet := range packets { @@ -36,6 +77,24 @@ func (t *recordingGatewayTransport) firstBatch() [][]byte { return t.batches[0] } +func TestGatewayRunClosesPacketTransportOnRuntimeError(t *testing.T) { + transport := &closingGatewayTransport{closed: make(chan struct{})} + gateway := &Gateway{ + Transport: transport, + VPNConnectionID: "vpn-1", + PollTimeout: time.Millisecond, + } + err := gateway.run(context.Background(), failingWriteTun{closed: make(chan struct{})}) + if err == nil || err.Error() != "write failed" { + t.Fatalf("run error = %v, want write failed", err) + } + select { + case <-transport.closed: + case <-time.After(time.Second): + t.Fatal("packet transport was not closed") + } +} + func TestGatewayUploadPrioritizesTCPControlPackets(t *testing.T) { transport := &recordingGatewayTransport{} gateway := &Gateway{Transport: transport, VPNConnectionID: "vpn-1"} diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 578ee5c..f36a904 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -394,6 +394,9 @@ per-stream maps. Gateway shutdown now closes all VPN fabric-session stream shards and then the underlying fabric session, preventing stale logical streams from consuming QUIC carrier capacity after reconnects or rollout restarts. +Gateway runtime cancellation now fans out to both upload and download loops +when either direction exits, so transport cleanup runs promptly on one-sided +TUN or carrier failures. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.