Report VPN fabric stream cleanup

This commit is contained in:
2026-05-16 12:30:49 +03:00
parent da59de7042
commit bbd9f8c257
3 changed files with 42 additions and 4 deletions
@@ -46,6 +46,8 @@ type FabricSessionPacketTransport struct {
sendFramesByClass map[string]uint64 sendFramesByClass map[string]uint64
sendPacketsByClass map[string]uint64 sendPacketsByClass map[string]uint64
sendFramesByStream map[uint64]uint64 sendFramesByStream map[uint64]uint64
closeStreamFrames uint64
closeErrors uint64
closeOnce sync.Once closeOnce sync.Once
closeErr error closeErr error
} }
@@ -184,16 +186,24 @@ func (t *FabricSessionPacketTransport) Close() error {
if err := t.Sender.Send(ctx, fabricproto.Frame{ if err := t.Sender.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameCloseStream, Type: fabricproto.FrameCloseStream,
StreamID: streamID, StreamID: streamID,
}); err != nil && t.closeErr == nil { }); err != nil {
t.recordCloseError()
if t.closeErr == nil {
t.closeErr = err t.closeErr = err
} }
} else if err == nil {
t.recordCloseStream()
}
} }
} }
if closer, ok := t.Sender.(FabricSessionCloser); ok { if closer, ok := t.Sender.(FabricSessionCloser); ok {
if err := closer.Close(); err != nil && t.closeErr == nil { if err := closer.Close(); err != nil {
t.recordCloseError()
if t.closeErr == nil {
t.closeErr = err t.closeErr = err
} }
} }
}
}) })
return t.closeErr return t.closeErr
} }
@@ -336,6 +346,8 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any {
t.statsMu.Lock() t.statsMu.Lock()
sendFramesByClass := copyStringUint64Map(t.sendFramesByClass) sendFramesByClass := copyStringUint64Map(t.sendFramesByClass)
sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass) sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass)
closeStreamFrames := t.closeStreamFrames
closeErrors := t.closeErrors
sendFramesByStream := make(map[string]uint64, len(t.sendFramesByStream)) sendFramesByStream := make(map[string]uint64, len(t.sendFramesByStream))
for streamID, count := range t.sendFramesByStream { for streamID, count := range t.sendFramesByStream {
sendFramesByStream[fmt.Sprintf("%d", streamID)] = count sendFramesByStream[fmt.Sprintf("%d", streamID)] = count
@@ -351,12 +363,32 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any {
"send_class_count": countNonZeroStringUint64Values(sendFramesByClass), "send_class_count": countNonZeroStringUint64Values(sendFramesByClass),
"send_stream_count": countNonZeroStringUint64Values(sendFramesByStream), "send_stream_count": countNonZeroStringUint64Values(sendFramesByStream),
"sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1, "sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1,
"close_stream_frames": closeStreamFrames,
"close_errors": closeErrors,
"send_frames_by_class": sendFramesByClass, "send_frames_by_class": sendFramesByClass,
"send_packets_by_class": sendPacketsByClass, "send_packets_by_class": sendPacketsByClass,
"send_frames_by_stream_id": sendFramesByStream, "send_frames_by_stream_id": sendFramesByStream,
} }
} }
func (t *FabricSessionPacketTransport) recordCloseStream() {
if t == nil {
return
}
t.statsMu.Lock()
t.closeStreamFrames++
t.statsMu.Unlock()
}
func (t *FabricSessionPacketTransport) recordCloseError() {
if t == nil {
return
}
t.statsMu.Lock()
t.closeErrors++
t.statsMu.Unlock()
}
func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) string { func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) string {
if fallback = normalizeFabricTrafficClass(fallback); fallback != "" && fallback != FabricTrafficClassBulk { if fallback = normalizeFabricTrafficClass(fallback); fallback != "" && fallback != FabricTrafficClassBulk {
return fallback return fallback
@@ -311,6 +311,10 @@ func TestFabricSessionPacketTransportClosesAllStreamShards(t *testing.T) {
t.Fatalf("stream %d was not closed; frames=%+v", streamID, sender.frames) t.Fatalf("stream %d was not closed; frames=%+v", streamID, sender.frames)
} }
} }
snapshot := transport.Snapshot()
if snapshot["close_stream_frames"] != uint64(5) || snapshot["close_errors"] != uint64(0) {
t.Fatalf("unexpected close counters: %+v", snapshot)
}
} }
func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) { func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) {
@@ -397,6 +397,8 @@ carrier capacity after reconnects or rollout restarts.
Gateway runtime cancellation now fans out to both upload and download loops Gateway runtime cancellation now fans out to both upload and download loops
when either direction exits, so transport cleanup runs promptly on one-sided when either direction exits, so transport cleanup runs promptly on one-sided
TUN or carrier failures. TUN or carrier failures.
Fabric-session packet transport snapshots include close-frame and close-error
counters for verifying that stream shard cleanup is actually happening.
Endpoint ranking treats `capacity_limited` observations as a soft pressure Endpoint ranking treats `capacity_limited` observations as a soft pressure
penalty instead of a hard recent failure, enabling load spreading without penalty instead of a hard recent failure, enabling load spreading without
marking the carrier unhealthy. marking the carrier unhealthy.