From 28c26a510365ef6e78f155b9ac9190e91474c1ec Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:46:36 +0300 Subject: [PATCH] Expose QUIC fabric capacity pressure --- .../internal/mesh/fabric_quic_transport.go | 29 +++++++++++++++---- .../mesh/fabric_quic_transport_test.go | 6 +++- .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 ++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go index f798f50..963c040 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -48,10 +48,13 @@ type QUICFabricTransportStats struct { } type QUICFabricTransportSnapshot struct { - SchemaVersion string `json:"schema_version"` - ActiveCount int `json:"active_count"` - ActiveStreams int `json:"active_streams"` - Stats QUICFabricTransportStats `json:"stats"` + SchemaVersion string `json:"schema_version"` + ActiveCount int `json:"active_count"` + ActiveStreams int `json:"active_streams"` + MaxStreamsPerConn int `json:"max_streams_per_conn"` + SaturatedConnections int `json:"saturated_connections"` + CapacityPressurePercent int `json:"capacity_pressure_percent"` + Stats QUICFabricTransportStats `json:"stats"` } type quicFabricSession struct { @@ -334,9 +337,14 @@ func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot { t.mu.Lock() defer t.mu.Unlock() t.pruneIdleLocked(time.Now()) + limit := t.MaxStreamsPerConn + if limit <= 0 { + limit = defaultQUICFabricMaxStreamsPerConn + } snapshot := QUICFabricTransportSnapshot{ - SchemaVersion: "rap.quic_fabric_transport.v1", - Stats: t.stats, + SchemaVersion: "rap.quic_fabric_transport.v1", + MaxStreamsPerConn: limit, + Stats: t.stats, } for key, entry := range t.conns { if entry == nil || entry.conn == nil { @@ -350,6 +358,15 @@ func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot { default: snapshot.ActiveCount++ snapshot.ActiveStreams += entry.activeStreams + if entry.activeStreams >= limit { + snapshot.SaturatedConnections++ + } + } + } + if snapshot.ActiveCount > 0 && limit > 0 { + capacity := snapshot.ActiveCount * limit + if capacity > 0 { + snapshot.CapacityPressurePercent = (snapshot.ActiveStreams * 100) / capacity } } return snapshot diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go index 02d8131..7237a69 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go @@ -267,7 +267,11 @@ func TestQUICFabricTransportLimitsStreamsPerConnection(t *testing.T) { t.Fatal("second connect succeeded past stream limit") } snapshot := transport.Snapshot() - if snapshot.ActiveStreams != 1 || snapshot.Stats.StreamLimitRejects != 1 { + if snapshot.ActiveStreams != 1 || + snapshot.MaxStreamsPerConn != 1 || + snapshot.SaturatedConnections != 1 || + snapshot.CapacityPressurePercent != 100 || + snapshot.Stats.StreamLimitRejects != 1 { t.Fatalf("unexpected stream limit snapshot: %+v", snapshot) } if err := first.Close(); err != nil { diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 1861902..31ad11c 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -416,6 +416,8 @@ gateway status. Receive-side fabric-session packet counters are reported by traffic class and stream id as well, so gateway status can compare TX and RX distribution under browser/RDP load. +QUIC fabric transport snapshots expose the configured stream limit, saturated +connection count, and capacity pressure percentage next to stream limit rejects. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.