Close VPN fabric session stream shards

This commit is contained in:
2026-05-16 12:26:51 +03:00
parent a5b91113bf
commit d170820445
4 changed files with 105 additions and 0 deletions
@@ -21,6 +21,10 @@ type FabricSessionFrameReceiver interface {
Errors() <-chan error Errors() <-chan error
} }
type FabricSessionCloser interface {
Close() error
}
type FabricSessionPacketTransport struct { type FabricSessionPacketTransport struct {
Sender FabricSessionFrameSender Sender FabricSessionFrameSender
Receiver FabricSessionFrameReceiver Receiver FabricSessionFrameReceiver
@@ -42,6 +46,8 @@ type FabricSessionPacketTransport struct {
sendFramesByClass map[string]uint64 sendFramesByClass map[string]uint64
sendPacketsByClass map[string]uint64 sendPacketsByClass map[string]uint64
sendFramesByStream map[uint64]uint64 sendFramesByStream map[uint64]uint64
closeOnce sync.Once
closeErr error
} }
func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error {
@@ -166,6 +172,32 @@ func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) erro
} }
} }
func (t *FabricSessionPacketTransport) Close() error {
if t == nil {
return nil
}
t.closeOnce.Do(func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for _, streamID := range t.allStreamIDs() {
if t.Sender != nil {
if err := t.Sender.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameCloseStream,
StreamID: streamID,
}); err != nil && t.closeErr == nil {
t.closeErr = err
}
}
}
if closer, ok := t.Sender.(FabricSessionCloser); ok {
if err := closer.Close(); err != nil && t.closeErr == nil {
t.closeErr = err
}
}
})
return t.closeErr
}
func (t *FabricSessionPacketTransport) selectStreamForPackets(packets [][]byte) (uint64, string) { func (t *FabricSessionPacketTransport) selectStreamForPackets(packets [][]byte) (uint64, string) {
trafficClass := fabricSessionTrafficClassForPackets(t.TrafficClass, packets) trafficClass := fabricSessionTrafficClassForPackets(t.TrafficClass, packets)
if ids := t.streamIDsForTrafficClass(trafficClass); len(ids) > 0 { if ids := t.streamIDsForTrafficClass(trafficClass); len(ids) > 0 {
@@ -185,6 +217,34 @@ func (t *FabricSessionPacketTransport) selectStreamForPackets(packets [][]byte)
return t.StreamID, trafficClass return t.StreamID, trafficClass
} }
func (t *FabricSessionPacketTransport) allStreamIDs() []uint64 {
if t == nil {
return nil
}
seen := map[uint64]struct{}{}
var out []uint64
add := func(streamID uint64) {
if streamID == 0 {
return
}
if _, ok := seen[streamID]; ok {
return
}
seen[streamID] = struct{}{}
out = append(out, streamID)
}
add(t.StreamID)
for _, streamID := range t.StreamIDs {
add(streamID)
}
for _, ids := range t.StreamIDsByTrafficClass {
for _, streamID := range ids {
add(streamID)
}
}
return out
}
func (t *FabricSessionPacketTransport) hasSendStream() bool { func (t *FabricSessionPacketTransport) hasSendStream() bool {
if t == nil { if t == nil {
return false return false
@@ -126,6 +126,7 @@ type memoryPacketTransport struct {
type captureFabricSessionSender struct { type captureFabricSessionSender struct {
err error err error
frames []fabricproto.Frame frames []fabricproto.Frame
closed bool
} }
func (s *captureFabricSessionSender) Send(_ context.Context, frame fabricproto.Frame) error { func (s *captureFabricSessionSender) Send(_ context.Context, frame fabricproto.Frame) error {
@@ -136,6 +137,11 @@ func (s *captureFabricSessionSender) Send(_ context.Context, frame fabricproto.F
return nil return nil
} }
func (s *captureFabricSessionSender) Close() error {
s.closed = true
return nil
}
type memoryFabricSessionReceiver struct { type memoryFabricSessionReceiver struct {
frames chan fabricproto.Frame frames chan fabricproto.Frame
errors chan error errors chan error
@@ -278,6 +284,35 @@ func TestFabricSessionPacketTransportShardsStreamsByTrafficClass(t *testing.T) {
} }
} }
func TestFabricSessionPacketTransportClosesAllStreamShards(t *testing.T) {
sender := &captureFabricSessionSender{}
transport := &FabricSessionPacketTransport{
Sender: sender,
StreamID: 700,
StreamIDsByTrafficClass: map[string][]uint64{
FabricTrafficClassInteractive: []uint64{801, 802},
FabricTrafficClassBulk: []uint64{901, 902},
},
}
if err := transport.Close(); err != nil {
t.Fatalf("close transport: %v", err)
}
if !sender.closed {
t.Fatal("underlying fabric session was not closed")
}
closed := map[uint64]bool{}
for _, frame := range sender.frames {
if frame.Type == fabricproto.FrameCloseStream {
closed[frame.StreamID] = true
}
}
for _, streamID := range []uint64{700, 801, 802, 901, 902} {
if !closed[streamID] {
t.Fatalf("stream %d was not closed; frames=%+v", streamID, sender.frames)
}
}
}
func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) { func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) {
inbox := NewFabricPacketInbox(4) inbox := NewFabricPacketInbox(4)
receiver := memoryFabricSessionReceiver{ receiver := memoryFabricSessionReceiver{
@@ -69,6 +69,10 @@ type packetTransportSnapshotter interface {
Snapshot() map[string]any Snapshot() map[string]any
} }
type packetTransportCloser interface {
Close() error
}
type BackendPacketTransport struct { type BackendPacketTransport struct {
API *client.Client API *client.Client
ClusterID string ClusterID string
@@ -259,6 +263,9 @@ func (g *Gateway) normalize() error {
func (g *Gateway) run(ctx context.Context, tun readWriteCloser) error { func (g *Gateway) run(ctx context.Context, tun readWriteCloser) error {
defer tun.Close() defer tun.Close()
if closer, ok := g.Transport.(packetTransportCloser); ok {
defer closer.Close()
}
errCh := make(chan error, 2) errCh := make(chan error, 2)
go func() { errCh <- g.copyGatewayToClient(ctx, tun) }() go func() { errCh <- g.copyGatewayToClient(ctx, tun) }()
@@ -391,6 +391,9 @@ layout and send counters by traffic class/stream id for load-test diagnosis.
Those snapshots also summarize configured stream class/shard counts and active Those snapshots also summarize configured stream class/shard counts and active
send class/stream counts, making sharding health visible without expanding send class/stream counts, making sharding health visible without expanding
per-stream maps. per-stream maps.
Gateway shutdown now closes all VPN fabric-session stream shards and then the
underlying fabric session, preventing stale logical streams from consuming QUIC
carrier capacity after reconnects or rollout restarts.
Endpoint ranking treats `capacity_limited` observations as a soft pressure Endpoint ranking treats `capacity_limited` observations as a soft pressure
penalty instead of a hard recent failure, enabling load spreading without penalty instead of a hard recent failure, enabling load spreading without
marking the carrier unhealthy. marking the carrier unhealthy.