diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go index e45e329..34c5e84 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -16,12 +16,14 @@ import ( ) const fabricQUICNextProto = "rap-fabric-data-session-v1" +const defaultQUICFabricConnIdleTTL = 5 * time.Minute type QUICFabricTransport struct { - Config *quic.Config - mu sync.Mutex - conns map[string]*quic.Conn - stats QUICFabricTransportStats + Config *quic.Config + IdleTTL time.Duration + mu sync.Mutex + conns map[string]*quicFabricConnEntry + stats QUICFabricTransportStats } type QUICFabricTransportStats struct { @@ -30,6 +32,7 @@ type QUICFabricTransportStats struct { OpenFailures uint64 `json:"open_failures"` ClosedEvicted uint64 `json:"closed_evicted"` CloseAllCalls uint64 `json:"close_all_calls"` + IdleEvicted uint64 `json:"idle_evicted"` } type QUICFabricTransportSnapshot struct { @@ -51,8 +54,13 @@ type quicFabricSession struct { closeConn bool } +type quicFabricConnEntry struct { + conn *quic.Conn + lastUsed time.Time +} + func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport { - return &QUICFabricTransport{Config: config, conns: map[string]*quic.Conn{}} + return &QUICFabricTransport{Config: config, IdleTTL: defaultQUICFabricConnIdleTTL, conns: map[string]*quicFabricConnEntry{}} } func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config { @@ -146,15 +154,17 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran return conn, true, err } t.mu.Lock() - if conn := t.conns[key]; conn != nil { + t.pruneIdleLocked(time.Now()) + if entry := t.conns[key]; entry != nil && entry.conn != nil { select { - case <-conn.Context().Done(): + case <-entry.conn.Context().Done(): delete(t.conns, key) t.stats.ClosedEvicted++ default: + entry.lastUsed = time.Now() t.stats.Reuses++ t.mu.Unlock() - return conn, false, nil + return entry.conn, false, nil } } t.mu.Unlock() @@ -167,22 +177,24 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran return nil, false, err } t.mu.Lock() - if existing := t.conns[key]; existing != nil { + t.pruneIdleLocked(time.Now()) + if existing := t.conns[key]; existing != nil && existing.conn != nil { select { - case <-existing.Context().Done(): + case <-existing.conn.Context().Done(): delete(t.conns, key) t.stats.ClosedEvicted++ default: + existing.lastUsed = time.Now() t.stats.Reuses++ t.mu.Unlock() _ = conn.CloseWithError(0, "duplicate connection") - return existing, false, nil + return existing.conn, false, nil } } if t.conns == nil { - t.conns = map[string]*quic.Conn{} + t.conns = map[string]*quicFabricConnEntry{} } - t.conns[key] = conn + t.conns[key] = &quicFabricConnEntry{conn: conn, lastUsed: time.Now()} t.stats.Opens++ t.mu.Unlock() return conn, false, nil @@ -197,13 +209,34 @@ func (t *QUICFabricTransport) evictConn(target FabricTransportTarget, conn *quic return } t.mu.Lock() - if t.conns[key] == conn { + if entry := t.conns[key]; entry != nil && entry.conn == conn { delete(t.conns, key) t.stats.ClosedEvicted++ } t.mu.Unlock() } +func (t *QUICFabricTransport) pruneIdleLocked(now time.Time) { + if t == nil || len(t.conns) == 0 { + return + } + ttl := t.IdleTTL + if ttl <= 0 { + ttl = defaultQUICFabricConnIdleTTL + } + for key, entry := range t.conns { + if entry == nil || entry.conn == nil { + delete(t.conns, key) + continue + } + if !entry.lastUsed.IsZero() && now.Sub(entry.lastUsed) > ttl { + _ = entry.conn.CloseWithError(0, "idle") + delete(t.conns, key) + t.stats.IdleEvicted++ + } + } +} + func quicFabricConnKey(target FabricTransportTarget) string { peerID := strings.TrimSpace(target.PeerID) endpoint := strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://") @@ -220,10 +253,12 @@ func (t *QUICFabricTransport) Close() error { t.mu.Lock() t.stats.CloseAllCalls++ conns := t.conns - t.conns = map[string]*quic.Conn{} + t.conns = map[string]*quicFabricConnEntry{} t.mu.Unlock() - for _, conn := range conns { - _ = conn.CloseWithError(0, "closed") + for _, entry := range conns { + if entry != nil && entry.conn != nil { + _ = entry.conn.CloseWithError(0, "closed") + } } return nil } @@ -234,17 +269,18 @@ func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot { } t.mu.Lock() defer t.mu.Unlock() + t.pruneIdleLocked(time.Now()) snapshot := QUICFabricTransportSnapshot{ SchemaVersion: "rap.quic_fabric_transport.v1", Stats: t.stats, } - for key, conn := range t.conns { - if conn == nil { + for key, entry := range t.conns { + if entry == nil || entry.conn == nil { delete(t.conns, key) continue } select { - case <-conn.Context().Done(): + case <-entry.conn.Context().Done(): delete(t.conns, key) snapshot.Stats.ClosedEvicted++ default: diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go index 31d0a09..d4b17fa 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go @@ -197,6 +197,42 @@ func TestQUICFabricTransportReusesConnectionForPeerEndpoint(t *testing.T) { } } +func TestQUICFabricTransportPrunesIdleConnections(t *testing.T) { + server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ + ListenAddr: "127.0.0.1:0", + TLSConfig: testQUICTLSConfig(t), + }) + if err != nil { + t.Fatalf("start quic fabric server: %v", err) + } + defer server.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + transport := NewQUICFabricTransport(nil) + transport.IdleTTL = time.Nanosecond + defer transport.Close() + session, err := transport.Connect(ctx, FabricTransportTarget{ + PeerID: "node-b", + Endpoint: server.Addr().String(), + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{fabricQUICNextProto}, + }, + Timeout: time.Second, + }) + if err != nil { + t.Fatalf("connect: %v", err) + } + defer session.Close() + time.Sleep(time.Millisecond) + + snapshot := transport.Snapshot() + if snapshot.ActiveCount != 0 || snapshot.Stats.IdleEvicted != 1 { + t.Fatalf("idle connection was not pruned: %+v", snapshot) + } +} + func TestQUICFabricServerHandlesFabricFrames(t *testing.T) { var events []FabricSessionEventLogEntry server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 3587f43..ff0ecdb 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -359,6 +359,8 @@ running the health-aware peer selection path. VPN fabric QUIC transport now reuses QUIC connections per peer endpoint and opens logical fabric-session streams on top, with heartbeat telemetry for QUIC connection opens, reuses, evictions, and active count. +Cached QUIC connections are pruned by idle TTL, preventing long-running agents +from holding unused peer connections indefinitely. Deliverables: