From 39a5e01a7ba91dacf3aaec275d3191f7457d0ea6 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 11:32:06 +0300 Subject: [PATCH] Reuse QUIC fabric connections --- .../rap-node-agent/cmd/rap-node-agent/main.go | 14 +- .../internal/mesh/fabric_quic_transport.go | 147 +++++++++++++++++- .../mesh/fabric_quic_transport_test.go | 42 +++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 4 files changed, 201 insertions(+), 5 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index ba87bb8..ad1bb03 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -366,6 +366,7 @@ type syntheticMeshState struct { VPNFabricIngress *vpnruntime.FabricClientPacketIngress VPNFabricSessionPeers *mesh.FabricSessionPeerManager VPNFabricTransport *mesh.WebSocketFabricTransport + VPNFabricQUICTransport *mesh.QUICFabricTransport VPNFabricSessionDialStats *vpnFabricSessionDialStats VPNFabricEndpointObservations *vpnFabricEndpointObservationStore PeerEndpoints map[string]string @@ -1094,6 +1095,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c VPNFabricIngress: vpnFabricIngress, VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), + VPNFabricQUICTransport: mesh.NewQUICFabricTransport(nil), VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(identity.NodeID), PeerEndpoints: copyStringMap(peerEndpoints), @@ -1984,8 +1986,12 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i meshState.ProductionForwardingEnabled = productionForwardingEnabled if (!sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) || !samePeerEndpointCandidatesMap(meshState.PeerEndpointCandidates, loadedConfig.PeerEndpointCandidates)) && meshState.VPNFabricSessionPeers != nil { _ = meshState.VPNFabricSessionPeers.Close() + if meshState.VPNFabricQUICTransport != nil { + _ = meshState.VPNFabricQUICTransport.Close() + } meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) + meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil) } if meshState.VPNFabricSessionPeers == nil { meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() @@ -1993,6 +1999,9 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i if meshState.VPNFabricTransport == nil { meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } + if meshState.VPNFabricQUICTransport == nil { + meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil) + } if meshState.VPNFabricSessionDialStats == nil { meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() } @@ -2955,6 +2964,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn } else if meshState != nil && meshState.VPNFabricSessionPeers != nil { report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot() } + if meshState != nil && meshState.VPNFabricQUICTransport != nil { + report["quic_sessions"] = meshState.VPNFabricQUICTransport.Snapshot() + } if meshState != nil && meshState.VPNFabricSessionDialStats != nil { report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) } @@ -4989,7 +5001,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st target.OutboundBuffer = 256 target.InboundBuffer = 256 target.ErrorBuffer = 16 - carrier, selectedTarget, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil) + carrier, selectedTarget, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, meshState.VPNFabricQUICTransport) if err != nil { cancel() meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("transport_select_failed") diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go index 40cf860..e45e329 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -19,6 +19,23 @@ const fabricQUICNextProto = "rap-fabric-data-session-v1" type QUICFabricTransport struct { Config *quic.Config + mu sync.Mutex + conns map[string]*quic.Conn + stats QUICFabricTransportStats +} + +type QUICFabricTransportStats struct { + Opens uint64 `json:"opens"` + Reuses uint64 `json:"reuses"` + OpenFailures uint64 `json:"open_failures"` + ClosedEvicted uint64 `json:"closed_evicted"` + CloseAllCalls uint64 `json:"close_all_calls"` +} + +type QUICFabricTransportSnapshot struct { + SchemaVersion string `json:"schema_version"` + ActiveCount int `json:"active_count"` + Stats QUICFabricTransportStats `json:"stats"` } type quicFabricSession struct { @@ -31,10 +48,11 @@ type quicFabricSession struct { writeMu sync.Mutex maxPayload int timeout time.Duration + closeConn bool } func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport { - return &QUICFabricTransport{Config: config} + return &QUICFabricTransport{Config: config, conns: map[string]*quic.Conn{}} } func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config { @@ -79,13 +97,16 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor tlsConfig.NextProtos = []string{fabricQUICNextProto} } } - conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) + conn, closeConn, err := t.connectConn(ctx, target, tlsConfig) if err != nil { return nil, err } stream, err := conn.OpenStreamSync(ctx) if err != nil { - _ = conn.CloseWithError(1, "open stream failed") + t.evictConn(target, conn) + if closeConn { + _ = conn.CloseWithError(1, "open stream failed") + } return nil, err } maxPayload := target.MaxPayload @@ -108,15 +129,131 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor done: make(chan struct{}), maxPayload: maxPayload, timeout: target.Timeout, + closeConn: closeConn, } go session.readLoop(context.Background()) return session, nil } +func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTransportTarget, tlsConfig *tls.Config) (*quic.Conn, bool, error) { + if t == nil { + conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, nil) + return conn, true, err + } + key := quicFabricConnKey(target) + if key == "" { + conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) + return conn, true, err + } + t.mu.Lock() + if conn := t.conns[key]; conn != nil { + select { + case <-conn.Context().Done(): + delete(t.conns, key) + t.stats.ClosedEvicted++ + default: + t.stats.Reuses++ + t.mu.Unlock() + return conn, false, nil + } + } + t.mu.Unlock() + + conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) + if err != nil { + t.mu.Lock() + t.stats.OpenFailures++ + t.mu.Unlock() + return nil, false, err + } + t.mu.Lock() + if existing := t.conns[key]; existing != nil { + select { + case <-existing.Context().Done(): + delete(t.conns, key) + t.stats.ClosedEvicted++ + default: + t.stats.Reuses++ + t.mu.Unlock() + _ = conn.CloseWithError(0, "duplicate connection") + return existing, false, nil + } + } + if t.conns == nil { + t.conns = map[string]*quic.Conn{} + } + t.conns[key] = conn + t.stats.Opens++ + t.mu.Unlock() + return conn, false, nil +} + +func (t *QUICFabricTransport) evictConn(target FabricTransportTarget, conn *quic.Conn) { + if t == nil || conn == nil { + return + } + key := quicFabricConnKey(target) + if key == "" { + return + } + t.mu.Lock() + if t.conns[key] == conn { + delete(t.conns, key) + t.stats.ClosedEvicted++ + } + t.mu.Unlock() +} + +func quicFabricConnKey(target FabricTransportTarget) string { + peerID := strings.TrimSpace(target.PeerID) + endpoint := strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://") + if peerID == "" || endpoint == "" { + return "" + } + return peerID + "\x00" + endpoint + "\x00" + normalizeCertSHA256(target.PeerCertSHA256) +} + func (t *QUICFabricTransport) Close() error { + if t == nil { + return nil + } + t.mu.Lock() + t.stats.CloseAllCalls++ + conns := t.conns + t.conns = map[string]*quic.Conn{} + t.mu.Unlock() + for _, conn := range conns { + _ = conn.CloseWithError(0, "closed") + } return nil } +func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot { + if t == nil { + return QUICFabricTransportSnapshot{SchemaVersion: "rap.quic_fabric_transport.v1"} + } + t.mu.Lock() + defer t.mu.Unlock() + snapshot := QUICFabricTransportSnapshot{ + SchemaVersion: "rap.quic_fabric_transport.v1", + Stats: t.stats, + } + for key, conn := range t.conns { + if conn == nil { + delete(t.conns, key) + continue + } + select { + case <-conn.Context().Done(): + delete(t.conns, key) + snapshot.Stats.ClosedEvicted++ + default: + snapshot.ActiveCount++ + } + } + return snapshot +} + func (s *quicFabricSession) Send(ctx context.Context, frame fabricproto.Frame) error { if s == nil || s.stream == nil { return fmt.Errorf("quic fabric session is closed") @@ -157,7 +294,9 @@ func (s *quicFabricSession) Close() error { err = s.stream.Close() } if s.conn != nil { - _ = s.conn.CloseWithError(0, "closed") + if s.closeConn { + _ = s.conn.CloseWithError(0, "closed") + } } }) return err diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go index d765487..31d0a09 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go @@ -155,6 +155,48 @@ func TestQUICFabricTransportRejectsPinnedCertificateMismatch(t *testing.T) { } } +func TestQUICFabricTransportReusesConnectionForPeerEndpoint(t *testing.T) { + server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ + ListenAddr: "127.0.0.1:0", + TLSConfig: testQUICTLSConfig(t), + }) + if err != nil { + t.Fatalf("start quic fabric server: %v", err) + } + defer server.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + transport := NewQUICFabricTransport(nil) + defer transport.Close() + target := FabricTransportTarget{ + PeerID: "node-b", + Endpoint: server.Addr().String(), + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{fabricQUICNextProto}, + }, + Timeout: time.Second, + InboundBuffer: 4, + ErrorBuffer: 4, + } + first, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("first connect: %v", err) + } + defer first.Close() + second, err := transport.Connect(ctx, target) + if err != nil { + t.Fatalf("second connect: %v", err) + } + defer second.Close() + + snapshot := transport.Snapshot() + if snapshot.ActiveCount != 1 || snapshot.Stats.Opens != 1 || snapshot.Stats.Reuses != 1 { + t.Fatalf("unexpected quic transport snapshot: %+v", snapshot) + } +} + func TestQUICFabricServerHandlesFabricFrames(t *testing.T) { var events []FabricSessionEventLogEntry server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index ad27c2d..3587f43 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -356,6 +356,9 @@ chosen. Heartbeat capabilities now advertise that peer-cache endpoint ranking consumes health observations, allowing control plane and UI diagnostics to detect nodes running the health-aware peer selection path. +VPN fabric QUIC transport now reuses QUIC connections per peer endpoint and +opens logical fabric-session streams on top, with heartbeat telemetry for QUIC +connection opens, reuses, evictions, and active count. Deliverables: