diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go index 95c51a2..d70a08f 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -46,6 +46,8 @@ type FabricSessionPacketTransport struct { sendFramesByClass map[string]uint64 sendPacketsByClass map[string]uint64 sendFramesByStream map[uint64]uint64 + closeStreamFrames uint64 + closeErrors uint64 closeOnce sync.Once closeErr error } @@ -184,14 +186,22 @@ func (t *FabricSessionPacketTransport) Close() error { if err := t.Sender.Send(ctx, fabricproto.Frame{ Type: fabricproto.FrameCloseStream, StreamID: streamID, - }); err != nil && t.closeErr == nil { - t.closeErr = err + }); err != nil { + t.recordCloseError() + if t.closeErr == nil { + t.closeErr = err + } + } else if err == nil { + t.recordCloseStream() } } } if closer, ok := t.Sender.(FabricSessionCloser); ok { - if err := closer.Close(); err != nil && t.closeErr == nil { - t.closeErr = err + if err := closer.Close(); err != nil { + t.recordCloseError() + if t.closeErr == nil { + t.closeErr = err + } } } }) @@ -336,6 +346,8 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any { t.statsMu.Lock() sendFramesByClass := copyStringUint64Map(t.sendFramesByClass) sendPacketsByClass := copyStringUint64Map(t.sendPacketsByClass) + closeStreamFrames := t.closeStreamFrames + closeErrors := t.closeErrors sendFramesByStream := make(map[string]uint64, len(t.sendFramesByStream)) for streamID, count := range t.sendFramesByStream { sendFramesByStream[fmt.Sprintf("%d", streamID)] = count @@ -351,12 +363,32 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any { "send_class_count": countNonZeroStringUint64Values(sendFramesByClass), "send_stream_count": countNonZeroStringUint64Values(sendFramesByStream), "sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1, + "close_stream_frames": closeStreamFrames, + "close_errors": closeErrors, "send_frames_by_class": sendFramesByClass, "send_packets_by_class": sendPacketsByClass, "send_frames_by_stream_id": sendFramesByStream, } } +func (t *FabricSessionPacketTransport) recordCloseStream() { + if t == nil { + return + } + t.statsMu.Lock() + t.closeStreamFrames++ + t.statsMu.Unlock() +} + +func (t *FabricSessionPacketTransport) recordCloseError() { + if t == nil { + return + } + t.statsMu.Lock() + t.closeErrors++ + t.statsMu.Unlock() +} + func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) string { if fallback = normalizeFabricTrafficClass(fallback); fallback != "" && fallback != FabricTrafficClassBulk { return fallback diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index aeaa7f1..38267f0 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -311,6 +311,10 @@ func TestFabricSessionPacketTransportClosesAllStreamShards(t *testing.T) { t.Fatalf("stream %d was not closed; frames=%+v", streamID, sender.frames) } } + snapshot := transport.Snapshot() + if snapshot["close_stream_frames"] != uint64(5) || snapshot["close_errors"] != uint64(0) { + t.Fatalf("unexpected close counters: %+v", snapshot) + } } func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index f36a904..f3f29d6 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -397,6 +397,8 @@ carrier capacity after reconnects or rollout restarts. Gateway runtime cancellation now fans out to both upload and download loops when either direction exits, so transport cleanup runs promptly on one-sided TUN or carrier failures. +Fabric-session packet transport snapshots include close-frame and close-error +counters for verifying that stream shard cleanup is actually happening. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.