diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go index 34c5e84..c788461 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -17,27 +17,33 @@ import ( const fabricQUICNextProto = "rap-fabric-data-session-v1" const defaultQUICFabricConnIdleTTL = 5 * time.Minute +const defaultQUICFabricMaxStreamsPerConn = 64 type QUICFabricTransport struct { - Config *quic.Config - IdleTTL time.Duration - mu sync.Mutex - conns map[string]*quicFabricConnEntry - stats QUICFabricTransportStats + Config *quic.Config + IdleTTL time.Duration + MaxStreamsPerConn int + mu sync.Mutex + conns map[string]*quicFabricConnEntry + stats QUICFabricTransportStats } type QUICFabricTransportStats struct { - Opens uint64 `json:"opens"` - Reuses uint64 `json:"reuses"` - OpenFailures uint64 `json:"open_failures"` - ClosedEvicted uint64 `json:"closed_evicted"` - CloseAllCalls uint64 `json:"close_all_calls"` - IdleEvicted uint64 `json:"idle_evicted"` + Opens uint64 `json:"opens"` + Reuses uint64 `json:"reuses"` + OpenFailures uint64 `json:"open_failures"` + ClosedEvicted uint64 `json:"closed_evicted"` + CloseAllCalls uint64 `json:"close_all_calls"` + IdleEvicted uint64 `json:"idle_evicted"` + StreamOpens uint64 `json:"stream_opens"` + StreamCloses uint64 `json:"stream_closes"` + StreamLimitRejects uint64 `json:"stream_limit_rejects"` } type QUICFabricTransportSnapshot struct { SchemaVersion string `json:"schema_version"` ActiveCount int `json:"active_count"` + ActiveStreams int `json:"active_streams"` Stats QUICFabricTransportStats `json:"stats"` } @@ -52,15 +58,18 @@ type quicFabricSession struct { maxPayload int timeout time.Duration closeConn bool + transport *QUICFabricTransport + connKey string } type quicFabricConnEntry struct { - conn *quic.Conn - lastUsed time.Time + conn *quic.Conn + lastUsed time.Time + activeStreams int } func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport { - return &QUICFabricTransport{Config: config, IdleTTL: defaultQUICFabricConnIdleTTL, conns: map[string]*quicFabricConnEntry{}} + return &QUICFabricTransport{Config: config, IdleTTL: defaultQUICFabricConnIdleTTL, MaxStreamsPerConn: defaultQUICFabricMaxStreamsPerConn, conns: map[string]*quicFabricConnEntry{}} } func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config { @@ -105,12 +114,16 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor tlsConfig.NextProtos = []string{fabricQUICNextProto} } } - conn, closeConn, err := t.connectConn(ctx, target, tlsConfig) + conn, connKey, closeConn, err := t.connectConn(ctx, target, tlsConfig) if err != nil { return nil, err } + if err := t.reserveStream(connKey, conn); err != nil { + return nil, err + } stream, err := conn.OpenStreamSync(ctx) if err != nil { + t.releaseStream(connKey) t.evictConn(target, conn) if closeConn { _ = conn.CloseWithError(1, "open stream failed") @@ -138,20 +151,22 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor maxPayload: maxPayload, timeout: target.Timeout, closeConn: closeConn, + transport: t, + connKey: connKey, } go session.readLoop(context.Background()) return session, nil } -func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTransportTarget, tlsConfig *tls.Config) (*quic.Conn, bool, error) { +func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTransportTarget, tlsConfig *tls.Config) (*quic.Conn, string, bool, error) { if t == nil { conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, nil) - return conn, true, err + return conn, "", true, err } key := quicFabricConnKey(target) if key == "" { conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) - return conn, true, err + return conn, "", true, err } t.mu.Lock() t.pruneIdleLocked(time.Now()) @@ -164,7 +179,7 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran entry.lastUsed = time.Now() t.stats.Reuses++ t.mu.Unlock() - return entry.conn, false, nil + return entry.conn, key, false, nil } } t.mu.Unlock() @@ -174,7 +189,7 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran t.mu.Lock() t.stats.OpenFailures++ t.mu.Unlock() - return nil, false, err + return nil, "", false, err } t.mu.Lock() t.pruneIdleLocked(time.Now()) @@ -188,7 +203,7 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran t.stats.Reuses++ t.mu.Unlock() _ = conn.CloseWithError(0, "duplicate connection") - return existing.conn, false, nil + return existing.conn, key, false, nil } } if t.conns == nil { @@ -197,7 +212,46 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran t.conns[key] = &quicFabricConnEntry{conn: conn, lastUsed: time.Now()} t.stats.Opens++ t.mu.Unlock() - return conn, false, nil + return conn, key, false, nil +} + +func (t *QUICFabricTransport) reserveStream(key string, conn *quic.Conn) error { + if t == nil || key == "" { + return nil + } + t.mu.Lock() + defer t.mu.Unlock() + entry := t.conns[key] + if entry == nil || entry.conn != conn { + return fmt.Errorf("quic fabric connection is not cached") + } + limit := t.MaxStreamsPerConn + if limit <= 0 { + limit = defaultQUICFabricMaxStreamsPerConn + } + if entry.activeStreams >= limit { + t.stats.StreamLimitRejects++ + return fmt.Errorf("quic fabric stream limit reached") + } + entry.activeStreams++ + entry.lastUsed = time.Now() + t.stats.StreamOpens++ + return nil +} + +func (t *QUICFabricTransport) releaseStream(key string) { + if t == nil || key == "" { + return + } + t.mu.Lock() + if entry := t.conns[key]; entry != nil { + if entry.activeStreams > 0 { + entry.activeStreams-- + } + entry.lastUsed = time.Now() + t.stats.StreamCloses++ + } + t.mu.Unlock() } func (t *QUICFabricTransport) evictConn(target FabricTransportTarget, conn *quic.Conn) { @@ -230,6 +284,9 @@ func (t *QUICFabricTransport) pruneIdleLocked(now time.Time) { continue } if !entry.lastUsed.IsZero() && now.Sub(entry.lastUsed) > ttl { + if entry.activeStreams > 0 { + continue + } _ = entry.conn.CloseWithError(0, "idle") delete(t.conns, key) t.stats.IdleEvicted++ @@ -285,6 +342,7 @@ func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot { snapshot.Stats.ClosedEvicted++ default: snapshot.ActiveCount++ + snapshot.ActiveStreams += entry.activeStreams } } return snapshot @@ -329,6 +387,9 @@ func (s *quicFabricSession) Close() error { if s.stream != nil { err = s.stream.Close() } + if s.transport != nil { + s.transport.releaseStream(s.connKey) + } if s.conn != nil { if s.closeConn { _ = s.conn.CloseWithError(0, "closed") diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go index d4b17fa..02d8131 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go @@ -224,7 +224,9 @@ func TestQUICFabricTransportPrunesIdleConnections(t *testing.T) { if err != nil { t.Fatalf("connect: %v", err) } - defer session.Close() + if err := session.Close(); err != nil { + t.Fatalf("close session: %v", err) + } time.Sleep(time.Millisecond) snapshot := transport.Snapshot() @@ -233,6 +235,51 @@ func TestQUICFabricTransportPrunesIdleConnections(t *testing.T) { } } +func TestQUICFabricTransportLimitsStreamsPerConnection(t *testing.T) { + server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ + ListenAddr: "127.0.0.1:0", + TLSConfig: testQUICTLSConfig(t), + }) + if err != nil { + t.Fatalf("start quic fabric server: %v", err) + } + defer server.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + transport := NewQUICFabricTransport(nil) + transport.MaxStreamsPerConn = 1 + defer transport.Close() + target := FabricTransportTarget{ + PeerID: "node-b", + Endpoint: server.Addr().String(), + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{fabricQUICNextProto}, + }, + Timeout: time.Second, + } + first, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("first connect: %v", err) + } + if _, err := transport.Connect(ctx, target); err == nil { + t.Fatal("second connect succeeded past stream limit") + } + snapshot := transport.Snapshot() + if snapshot.ActiveStreams != 1 || snapshot.Stats.StreamLimitRejects != 1 { + t.Fatalf("unexpected stream limit snapshot: %+v", snapshot) + } + if err := first.Close(); err != nil { + t.Fatalf("close first stream: %v", err) + } + second, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("connect after release: %v", err) + } + defer second.Close() +} + func TestQUICFabricServerHandlesFabricFrames(t *testing.T) { var events []FabricSessionEventLogEntry server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index ff0ecdb..97f177e 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -361,6 +361,9 @@ opens logical fabric-session streams on top, with heartbeat telemetry for QUIC connection opens, reuses, evictions, and active count. Cached QUIC connections are pruned by idle TTL, preventing long-running agents from holding unused peer connections indefinitely. +QUIC carrier connections now track active logical streams and enforce a +per-connection stream limit, exposing stream opens/closes and limit rejects in +transport telemetry. Deliverables: