Prune idle QUIC fabric connections

This commit is contained in:
2026-05-16 11:33:48 +03:00
parent 39a5e01a7b
commit 650b309686
3 changed files with 94 additions and 20 deletions
@@ -16,12 +16,14 @@ import (
)
const fabricQUICNextProto = "rap-fabric-data-session-v1"
const defaultQUICFabricConnIdleTTL = 5 * time.Minute
type QUICFabricTransport struct {
Config *quic.Config
mu sync.Mutex
conns map[string]*quic.Conn
stats QUICFabricTransportStats
Config *quic.Config
IdleTTL time.Duration
mu sync.Mutex
conns map[string]*quicFabricConnEntry
stats QUICFabricTransportStats
}
type QUICFabricTransportStats struct {
@@ -30,6 +32,7 @@ type QUICFabricTransportStats struct {
OpenFailures uint64 `json:"open_failures"`
ClosedEvicted uint64 `json:"closed_evicted"`
CloseAllCalls uint64 `json:"close_all_calls"`
IdleEvicted uint64 `json:"idle_evicted"`
}
type QUICFabricTransportSnapshot struct {
@@ -51,8 +54,13 @@ type quicFabricSession struct {
closeConn bool
}
type quicFabricConnEntry struct {
conn *quic.Conn
lastUsed time.Time
}
func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport {
return &QUICFabricTransport{Config: config, conns: map[string]*quic.Conn{}}
return &QUICFabricTransport{Config: config, IdleTTL: defaultQUICFabricConnIdleTTL, conns: map[string]*quicFabricConnEntry{}}
}
func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config {
@@ -146,15 +154,17 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran
return conn, true, err
}
t.mu.Lock()
if conn := t.conns[key]; conn != nil {
t.pruneIdleLocked(time.Now())
if entry := t.conns[key]; entry != nil && entry.conn != nil {
select {
case <-conn.Context().Done():
case <-entry.conn.Context().Done():
delete(t.conns, key)
t.stats.ClosedEvicted++
default:
entry.lastUsed = time.Now()
t.stats.Reuses++
t.mu.Unlock()
return conn, false, nil
return entry.conn, false, nil
}
}
t.mu.Unlock()
@@ -167,22 +177,24 @@ func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTran
return nil, false, err
}
t.mu.Lock()
if existing := t.conns[key]; existing != nil {
t.pruneIdleLocked(time.Now())
if existing := t.conns[key]; existing != nil && existing.conn != nil {
select {
case <-existing.Context().Done():
case <-existing.conn.Context().Done():
delete(t.conns, key)
t.stats.ClosedEvicted++
default:
existing.lastUsed = time.Now()
t.stats.Reuses++
t.mu.Unlock()
_ = conn.CloseWithError(0, "duplicate connection")
return existing, false, nil
return existing.conn, false, nil
}
}
if t.conns == nil {
t.conns = map[string]*quic.Conn{}
t.conns = map[string]*quicFabricConnEntry{}
}
t.conns[key] = conn
t.conns[key] = &quicFabricConnEntry{conn: conn, lastUsed: time.Now()}
t.stats.Opens++
t.mu.Unlock()
return conn, false, nil
@@ -197,13 +209,34 @@ func (t *QUICFabricTransport) evictConn(target FabricTransportTarget, conn *quic
return
}
t.mu.Lock()
if t.conns[key] == conn {
if entry := t.conns[key]; entry != nil && entry.conn == conn {
delete(t.conns, key)
t.stats.ClosedEvicted++
}
t.mu.Unlock()
}
func (t *QUICFabricTransport) pruneIdleLocked(now time.Time) {
if t == nil || len(t.conns) == 0 {
return
}
ttl := t.IdleTTL
if ttl <= 0 {
ttl = defaultQUICFabricConnIdleTTL
}
for key, entry := range t.conns {
if entry == nil || entry.conn == nil {
delete(t.conns, key)
continue
}
if !entry.lastUsed.IsZero() && now.Sub(entry.lastUsed) > ttl {
_ = entry.conn.CloseWithError(0, "idle")
delete(t.conns, key)
t.stats.IdleEvicted++
}
}
}
func quicFabricConnKey(target FabricTransportTarget) string {
peerID := strings.TrimSpace(target.PeerID)
endpoint := strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://")
@@ -220,10 +253,12 @@ func (t *QUICFabricTransport) Close() error {
t.mu.Lock()
t.stats.CloseAllCalls++
conns := t.conns
t.conns = map[string]*quic.Conn{}
t.conns = map[string]*quicFabricConnEntry{}
t.mu.Unlock()
for _, conn := range conns {
_ = conn.CloseWithError(0, "closed")
for _, entry := range conns {
if entry != nil && entry.conn != nil {
_ = entry.conn.CloseWithError(0, "closed")
}
}
return nil
}
@@ -234,17 +269,18 @@ func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot {
}
t.mu.Lock()
defer t.mu.Unlock()
t.pruneIdleLocked(time.Now())
snapshot := QUICFabricTransportSnapshot{
SchemaVersion: "rap.quic_fabric_transport.v1",
Stats: t.stats,
}
for key, conn := range t.conns {
if conn == nil {
for key, entry := range t.conns {
if entry == nil || entry.conn == nil {
delete(t.conns, key)
continue
}
select {
case <-conn.Context().Done():
case <-entry.conn.Context().Done():
delete(t.conns, key)
snapshot.Stats.ClosedEvicted++
default:
@@ -197,6 +197,42 @@ func TestQUICFabricTransportReusesConnectionForPeerEndpoint(t *testing.T) {
}
}
func TestQUICFabricTransportPrunesIdleConnections(t *testing.T) {
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: testQUICTLSConfig(t),
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
transport := NewQUICFabricTransport(nil)
transport.IdleTTL = time.Nanosecond
defer transport.Close()
session, err := transport.Connect(ctx, FabricTransportTarget{
PeerID: "node-b",
Endpoint: server.Addr().String(),
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{fabricQUICNextProto},
},
Timeout: time.Second,
})
if err != nil {
t.Fatalf("connect: %v", err)
}
defer session.Close()
time.Sleep(time.Millisecond)
snapshot := transport.Snapshot()
if snapshot.ActiveCount != 0 || snapshot.Stats.IdleEvicted != 1 {
t.Fatalf("idle connection was not pruned: %+v", snapshot)
}
}
func TestQUICFabricServerHandlesFabricFrames(t *testing.T) {
var events []FabricSessionEventLogEntry
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{