Track QUIC fabric stream capacity

This commit is contained in:
2026-05-16 11:38:09 +03:00
parent 650b309686
commit ef458330aa
3 changed files with 134 additions and 23 deletions
@@ -17,10 +17,12 @@ import (
const fabricQUICNextProto = "rap-fabric-data-session-v1"
const defaultQUICFabricConnIdleTTL = 5 * time.Minute
const defaultQUICFabricMaxStreamsPerConn = 64
type QUICFabricTransport struct {
Config *quic.Config
IdleTTL time.Duration
MaxStreamsPerConn int
mu sync.Mutex
conns map[string]*quicFabricConnEntry
stats QUICFabricTransportStats
@@ -33,11 +35,15 @@ type QUICFabricTransportStats struct {
ClosedEvicted uint64 `json:"closed_evicted"`
CloseAllCalls uint64 `json:"close_all_calls"`
IdleEvicted uint64 `json:"idle_evicted"`
StreamOpens uint64 `json:"stream_opens"`
StreamCloses uint64 `json:"stream_closes"`
StreamLimitRejects uint64 `json:"stream_limit_rejects"`
}
type QUICFabricTransportSnapshot struct {
SchemaVersion string `json:"schema_version"`
ActiveCount int `json:"active_count"`
ActiveStreams int `json:"active_streams"`
Stats QUICFabricTransportStats `json:"stats"`
}
@@ -52,15 +58,18 @@ type quicFabricSession struct {
maxPayload int
timeout time.Duration
closeConn bool
transport *QUICFabricTransport
connKey string
}
type quicFabricConnEntry struct {
conn *quic.Conn
lastUsed time.Time
activeStreams int
}
func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport {
return &QUICFabricTransport{Config: config, IdleTTL: defaultQUICFabricConnIdleTTL, conns: map[string]*quicFabricConnEntry{}}
return &QUICFabricTransport{Config: config, IdleTTL: defaultQUICFabricConnIdleTTL, MaxStreamsPerConn: defaultQUICFabricMaxStreamsPerConn, conns: map[string]*quicFabricConnEntry{}}
}
func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config {
@@ -105,12 +114,16 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor
tlsConfig.NextProtos = []string{fabricQUICNextProto}
}
}
conn, closeConn, err := t.connectConn(ctx, target, tlsConfig)
conn, connKey, closeConn, err := t.connectConn(ctx, target, tlsConfig)
if err != nil {
return nil, err
}
if err := t.reserveStream(connKey, conn); err != nil {
return nil, err
}
stream, err := conn.OpenStreamSync(ctx)
if err != nil {
t.releaseStream(connKey)
t.evictConn(target, conn)
if closeConn {
_ = conn.CloseWithError(1, "open stream failed")
@@ -138,20 +151,22 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor
maxPayload: maxPayload,
timeout: target.Timeout,
closeConn: closeConn,
transport: t,
connKey: connKey,
}
go session.readLoop(context.Background())
return session, nil
}
func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTransportTarget, tlsConfig *tls.Config) (*quic.Conn, bool, error) {
func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTransportTarget, tlsConfig *tls.Config) (*quic.Conn, string, bool, error) {
if t == nil {
conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, nil)
return conn, true, err
return conn, "", true, err
}
key := quicFabricConnKey(target)
if key == "" {
conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config)
return conn, true, err
return conn, "", true, err
}
t.mu.Lock()
t.pruneIdleLocked(time.Now())
@@ -164,7 +179,7 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran
entry.lastUsed = time.Now()
t.stats.Reuses++
t.mu.Unlock()
return entry.conn, false, nil
return entry.conn, key, false, nil
}
}
t.mu.Unlock()
@@ -174,7 +189,7 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran
t.mu.Lock()
t.stats.OpenFailures++
t.mu.Unlock()
return nil, false, err
return nil, "", false, err
}
t.mu.Lock()
t.pruneIdleLocked(time.Now())
@@ -188,7 +203,7 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran
t.stats.Reuses++
t.mu.Unlock()
_ = conn.CloseWithError(0, "duplicate connection")
return existing.conn, false, nil
return existing.conn, key, false, nil
}
}
if t.conns == nil {
@@ -197,7 +212,46 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran
t.conns[key] = &quicFabricConnEntry{conn: conn, lastUsed: time.Now()}
t.stats.Opens++
t.mu.Unlock()
return conn, false, nil
return conn, key, false, nil
}
func (t *QUICFabricTransport) reserveStream(key string, conn *quic.Conn) error {
if t == nil || key == "" {
return nil
}
t.mu.Lock()
defer t.mu.Unlock()
entry := t.conns[key]
if entry == nil || entry.conn != conn {
return fmt.Errorf("quic fabric connection is not cached")
}
limit := t.MaxStreamsPerConn
if limit <= 0 {
limit = defaultQUICFabricMaxStreamsPerConn
}
if entry.activeStreams >= limit {
t.stats.StreamLimitRejects++
return fmt.Errorf("quic fabric stream limit reached")
}
entry.activeStreams++
entry.lastUsed = time.Now()
t.stats.StreamOpens++
return nil
}
func (t *QUICFabricTransport) releaseStream(key string) {
if t == nil || key == "" {
return
}
t.mu.Lock()
if entry := t.conns[key]; entry != nil {
if entry.activeStreams > 0 {
entry.activeStreams--
}
entry.lastUsed = time.Now()
t.stats.StreamCloses++
}
t.mu.Unlock()
}
func (t *QUICFabricTransport) evictConn(target FabricTransportTarget, conn *quic.Conn) {
@@ -230,6 +284,9 @@ func (t *QUICFabricTransport) pruneIdleLocked(now time.Time) {
continue
}
if !entry.lastUsed.IsZero() && now.Sub(entry.lastUsed) > ttl {
if entry.activeStreams > 0 {
continue
}
_ = entry.conn.CloseWithError(0, "idle")
delete(t.conns, key)
t.stats.IdleEvicted++
@@ -285,6 +342,7 @@ func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot {
snapshot.Stats.ClosedEvicted++
default:
snapshot.ActiveCount++
snapshot.ActiveStreams += entry.activeStreams
}
}
return snapshot
@@ -329,6 +387,9 @@ func (s *quicFabricSession) Close() error {
if s.stream != nil {
err = s.stream.Close()
}
if s.transport != nil {
s.transport.releaseStream(s.connKey)
}
if s.conn != nil {
if s.closeConn {
_ = s.conn.CloseWithError(0, "closed")
@@ -224,7 +224,9 @@ func TestQUICFabricTransportPrunesIdleConnections(t *testing.T) {
if err != nil {
t.Fatalf("connect: %v", err)
}
defer session.Close()
if err := session.Close(); err != nil {
t.Fatalf("close session: %v", err)
}
time.Sleep(time.Millisecond)
snapshot := transport.Snapshot()
@@ -233,6 +235,51 @@ func TestQUICFabricTransportPrunesIdleConnections(t *testing.T) {
}
}
func TestQUICFabricTransportLimitsStreamsPerConnection(t *testing.T) {
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: testQUICTLSConfig(t),
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
transport := NewQUICFabricTransport(nil)
transport.MaxStreamsPerConn = 1
defer transport.Close()
target := FabricTransportTarget{
PeerID: "node-b",
Endpoint: server.Addr().String(),
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{fabricQUICNextProto},
},
Timeout: time.Second,
}
first, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("first connect: %v", err)
}
if _, err := transport.Connect(ctx, target); err == nil {
t.Fatal("second connect succeeded past stream limit")
}
snapshot := transport.Snapshot()
if snapshot.ActiveStreams != 1 || snapshot.Stats.StreamLimitRejects != 1 {
t.Fatalf("unexpected stream limit snapshot: %+v", snapshot)
}
if err := first.Close(); err != nil {
t.Fatalf("close first stream: %v", err)
}
second, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("connect after release: %v", err)
}
defer second.Close()
}
func TestQUICFabricServerHandlesFabricFrames(t *testing.T) {
var events []FabricSessionEventLogEntry
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
@@ -361,6 +361,9 @@ opens logical fabric-session streams on top, with heartbeat telemetry for QUIC
connection opens, reuses, evictions, and active count.
Cached QUIC connections are pruned by idle TTL, preventing long-running agents
from holding unused peer connections indefinitely.
QUIC carrier connections now track active logical streams and enforce a
per-connection stream limit, exposing stream opens/closes and limit rejects in
transport telemetry.
Deliverables: