Reuse QUIC fabric connections

This commit is contained in:
2026-05-16 11:32:06 +03:00
parent dbbdaa63f3
commit 39a5e01a7b
4 changed files with 201 additions and 5 deletions
@@ -366,6 +366,7 @@ type syntheticMeshState struct {
VPNFabricIngress *vpnruntime.FabricClientPacketIngress VPNFabricIngress *vpnruntime.FabricClientPacketIngress
VPNFabricSessionPeers *mesh.FabricSessionPeerManager VPNFabricSessionPeers *mesh.FabricSessionPeerManager
VPNFabricTransport *mesh.WebSocketFabricTransport VPNFabricTransport *mesh.WebSocketFabricTransport
VPNFabricQUICTransport *mesh.QUICFabricTransport
VPNFabricSessionDialStats *vpnFabricSessionDialStats VPNFabricSessionDialStats *vpnFabricSessionDialStats
VPNFabricEndpointObservations *vpnFabricEndpointObservationStore VPNFabricEndpointObservations *vpnFabricEndpointObservationStore
PeerEndpoints map[string]string PeerEndpoints map[string]string
@@ -1094,6 +1095,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
VPNFabricIngress: vpnFabricIngress, VPNFabricIngress: vpnFabricIngress,
VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricSessionPeers: vpnFabricSessionPeers,
VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers),
VPNFabricQUICTransport: mesh.NewQUICFabricTransport(nil),
VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), VPNFabricSessionDialStats: newVPNFabricSessionDialStats(),
VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(identity.NodeID), VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(identity.NodeID),
PeerEndpoints: copyStringMap(peerEndpoints), PeerEndpoints: copyStringMap(peerEndpoints),
@@ -1984,8 +1986,12 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
meshState.ProductionForwardingEnabled = productionForwardingEnabled meshState.ProductionForwardingEnabled = productionForwardingEnabled
if (!sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) || !samePeerEndpointCandidatesMap(meshState.PeerEndpointCandidates, loadedConfig.PeerEndpointCandidates)) && meshState.VPNFabricSessionPeers != nil { if (!sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) || !samePeerEndpointCandidatesMap(meshState.PeerEndpointCandidates, loadedConfig.PeerEndpointCandidates)) && meshState.VPNFabricSessionPeers != nil {
_ = meshState.VPNFabricSessionPeers.Close() _ = meshState.VPNFabricSessionPeers.Close()
if meshState.VPNFabricQUICTransport != nil {
_ = meshState.VPNFabricQUICTransport.Close()
}
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil)
} }
if meshState.VPNFabricSessionPeers == nil { if meshState.VPNFabricSessionPeers == nil {
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
@@ -1993,6 +1999,9 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
if meshState.VPNFabricTransport == nil { if meshState.VPNFabricTransport == nil {
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
} }
if meshState.VPNFabricQUICTransport == nil {
meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil)
}
if meshState.VPNFabricSessionDialStats == nil { if meshState.VPNFabricSessionDialStats == nil {
meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats()
} }
@@ -2955,6 +2964,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
} else if meshState != nil && meshState.VPNFabricSessionPeers != nil { } else if meshState != nil && meshState.VPNFabricSessionPeers != nil {
report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot() report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot()
} }
if meshState != nil && meshState.VPNFabricQUICTransport != nil {
report["quic_sessions"] = meshState.VPNFabricQUICTransport.Snapshot()
}
if meshState != nil && meshState.VPNFabricSessionDialStats != nil { if meshState != nil && meshState.VPNFabricSessionDialStats != nil {
report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt)
} }
@@ -4989,7 +5001,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
target.OutboundBuffer = 256 target.OutboundBuffer = 256
target.InboundBuffer = 256 target.InboundBuffer = 256
target.ErrorBuffer = 16 target.ErrorBuffer = 16
carrier, selectedTarget, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil) carrier, selectedTarget, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, meshState.VPNFabricQUICTransport)
if err != nil { if err != nil {
cancel() cancel()
meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("transport_select_failed") meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("transport_select_failed")
@@ -19,6 +19,23 @@ const fabricQUICNextProto = "rap-fabric-data-session-v1"
type QUICFabricTransport struct { type QUICFabricTransport struct {
Config *quic.Config Config *quic.Config
mu sync.Mutex
conns map[string]*quic.Conn
stats QUICFabricTransportStats
}
type QUICFabricTransportStats struct {
Opens uint64 `json:"opens"`
Reuses uint64 `json:"reuses"`
OpenFailures uint64 `json:"open_failures"`
ClosedEvicted uint64 `json:"closed_evicted"`
CloseAllCalls uint64 `json:"close_all_calls"`
}
type QUICFabricTransportSnapshot struct {
SchemaVersion string `json:"schema_version"`
ActiveCount int `json:"active_count"`
Stats QUICFabricTransportStats `json:"stats"`
} }
type quicFabricSession struct { type quicFabricSession struct {
@@ -31,10 +48,11 @@ type quicFabricSession struct {
writeMu sync.Mutex writeMu sync.Mutex
maxPayload int maxPayload int
timeout time.Duration timeout time.Duration
closeConn bool
} }
func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport { func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport {
return &QUICFabricTransport{Config: config} return &QUICFabricTransport{Config: config, conns: map[string]*quic.Conn{}}
} }
func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config { func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config {
@@ -79,13 +97,16 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor
tlsConfig.NextProtos = []string{fabricQUICNextProto} tlsConfig.NextProtos = []string{fabricQUICNextProto}
} }
} }
conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) conn, closeConn, err := t.connectConn(ctx, target, tlsConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stream, err := conn.OpenStreamSync(ctx) stream, err := conn.OpenStreamSync(ctx)
if err != nil { if err != nil {
_ = conn.CloseWithError(1, "open stream failed") t.evictConn(target, conn)
if closeConn {
_ = conn.CloseWithError(1, "open stream failed")
}
return nil, err return nil, err
} }
maxPayload := target.MaxPayload maxPayload := target.MaxPayload
@@ -108,15 +129,131 @@ func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTranspor
done: make(chan struct{}), done: make(chan struct{}),
maxPayload: maxPayload, maxPayload: maxPayload,
timeout: target.Timeout, timeout: target.Timeout,
closeConn: closeConn,
} }
go session.readLoop(context.Background()) go session.readLoop(context.Background())
return session, nil return session, nil
} }
func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTransportTarget, tlsConfig *tls.Config) (*quic.Conn, bool, error) {
if t == nil {
conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, nil)
return conn, true, err
}
key := quicFabricConnKey(target)
if key == "" {
conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config)
return conn, true, err
}
t.mu.Lock()
if conn := t.conns[key]; conn != nil {
select {
case <-conn.Context().Done():
delete(t.conns, key)
t.stats.ClosedEvicted++
default:
t.stats.Reuses++
t.mu.Unlock()
return conn, false, nil
}
}
t.mu.Unlock()
conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config)
if err != nil {
t.mu.Lock()
t.stats.OpenFailures++
t.mu.Unlock()
return nil, false, err
}
t.mu.Lock()
if existing := t.conns[key]; existing != nil {
select {
case <-existing.Context().Done():
delete(t.conns, key)
t.stats.ClosedEvicted++
default:
t.stats.Reuses++
t.mu.Unlock()
_ = conn.CloseWithError(0, "duplicate connection")
return existing, false, nil
}
}
if t.conns == nil {
t.conns = map[string]*quic.Conn{}
}
t.conns[key] = conn
t.stats.Opens++
t.mu.Unlock()
return conn, false, nil
}
func (t *QUICFabricTransport) evictConn(target FabricTransportTarget, conn *quic.Conn) {
if t == nil || conn == nil {
return
}
key := quicFabricConnKey(target)
if key == "" {
return
}
t.mu.Lock()
if t.conns[key] == conn {
delete(t.conns, key)
t.stats.ClosedEvicted++
}
t.mu.Unlock()
}
func quicFabricConnKey(target FabricTransportTarget) string {
peerID := strings.TrimSpace(target.PeerID)
endpoint := strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://")
if peerID == "" || endpoint == "" {
return ""
}
return peerID + "\x00" + endpoint + "\x00" + normalizeCertSHA256(target.PeerCertSHA256)
}
func (t *QUICFabricTransport) Close() error { func (t *QUICFabricTransport) Close() error {
if t == nil {
return nil
}
t.mu.Lock()
t.stats.CloseAllCalls++
conns := t.conns
t.conns = map[string]*quic.Conn{}
t.mu.Unlock()
for _, conn := range conns {
_ = conn.CloseWithError(0, "closed")
}
return nil return nil
} }
func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot {
if t == nil {
return QUICFabricTransportSnapshot{SchemaVersion: "rap.quic_fabric_transport.v1"}
}
t.mu.Lock()
defer t.mu.Unlock()
snapshot := QUICFabricTransportSnapshot{
SchemaVersion: "rap.quic_fabric_transport.v1",
Stats: t.stats,
}
for key, conn := range t.conns {
if conn == nil {
delete(t.conns, key)
continue
}
select {
case <-conn.Context().Done():
delete(t.conns, key)
snapshot.Stats.ClosedEvicted++
default:
snapshot.ActiveCount++
}
}
return snapshot
}
func (s *quicFabricSession) Send(ctx context.Context, frame fabricproto.Frame) error { func (s *quicFabricSession) Send(ctx context.Context, frame fabricproto.Frame) error {
if s == nil || s.stream == nil { if s == nil || s.stream == nil {
return fmt.Errorf("quic fabric session is closed") return fmt.Errorf("quic fabric session is closed")
@@ -157,7 +294,9 @@ func (s *quicFabricSession) Close() error {
err = s.stream.Close() err = s.stream.Close()
} }
if s.conn != nil { if s.conn != nil {
_ = s.conn.CloseWithError(0, "closed") if s.closeConn {
_ = s.conn.CloseWithError(0, "closed")
}
} }
}) })
return err return err
@@ -155,6 +155,48 @@ func TestQUICFabricTransportRejectsPinnedCertificateMismatch(t *testing.T) {
} }
} }
func TestQUICFabricTransportReusesConnectionForPeerEndpoint(t *testing.T) {
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: testQUICTLSConfig(t),
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
transport := NewQUICFabricTransport(nil)
defer transport.Close()
target := FabricTransportTarget{
PeerID: "node-b",
Endpoint: server.Addr().String(),
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{fabricQUICNextProto},
},
Timeout: time.Second,
InboundBuffer: 4,
ErrorBuffer: 4,
}
first, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("first connect: %v", err)
}
defer first.Close()
second, err := transport.Connect(ctx, target)
if err != nil {
t.Fatalf("second connect: %v", err)
}
defer second.Close()
snapshot := transport.Snapshot()
if snapshot.ActiveCount != 1 || snapshot.Stats.Opens != 1 || snapshot.Stats.Reuses != 1 {
t.Fatalf("unexpected quic transport snapshot: %+v", snapshot)
}
}
func TestQUICFabricServerHandlesFabricFrames(t *testing.T) { func TestQUICFabricServerHandlesFabricFrames(t *testing.T) {
var events []FabricSessionEventLogEntry var events []FabricSessionEventLogEntry
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
@@ -356,6 +356,9 @@ chosen.
Heartbeat capabilities now advertise that peer-cache endpoint ranking consumes Heartbeat capabilities now advertise that peer-cache endpoint ranking consumes
health observations, allowing control plane and UI diagnostics to detect nodes health observations, allowing control plane and UI diagnostics to detect nodes
running the health-aware peer selection path. running the health-aware peer selection path.
VPN fabric QUIC transport now reuses QUIC connections per peer endpoint and
opens logical fabric-session streams on top, with heartbeat telemetry for QUIC
connection opens, reuses, evictions, and active count.
Deliverables: Deliverables: