Wire gated QUIC fabric listener

This commit is contained in:
2026-05-16 10:34:28 +03:00
parent f84b088580
commit 6c62c14e2c
11 changed files with 160 additions and 1 deletions
@@ -2,9 +2,15 @@ package main
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"log"
"math/big"
"net"
"net/http"
"os"
@@ -364,6 +370,9 @@ type syntheticMeshState struct {
ListenerRuntimeConfig config.Config
ListenerHandler *dynamicHTTPHandler
StopListener func()
QUICFabricServer *mesh.QUICFabricServer
QUICFabricListenAddr string
QUICFabricError string
ConfigLoadError string
}
@@ -778,6 +787,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
listenerCfg := meshListenerRuntimeConfig(cfg, loadedConfig.MeshListener)
listenerReport, stopListener := startSyntheticMeshHTTPServer(ctx, listenerCfg, identity, dynamicListenerHandler, len(peerEndpoints), len(routes), gateEnabled, runtimeEnabled)
vpnFabricSessionPeers := mesh.NewFabricSessionPeerManager()
quicFabricServer, quicFabricAddr, quicFabricErr := startQUICFabricEndpoint(ctx, cfg, identity)
return &syntheticMeshState{
Runtime: runtime,
Routes: routes,
@@ -813,6 +823,9 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
ListenerRuntimeConfig: listenerCfg,
ListenerHandler: dynamicListenerHandler,
StopListener: stopListener,
QUICFabricServer: quicFabricServer,
QUICFabricListenAddr: quicFabricAddr,
QUICFabricError: errorString(quicFabricErr),
ConfigLoadError: errorString(err),
}, stopListener, nil
}
@@ -1124,6 +1137,8 @@ func meshListenerConfigKey(cfg config.Config) string {
strings.TrimSpace(cfg.MeshRegion),
fmt.Sprintf("%t", cfg.MeshProductionForwardingEnabled),
fmt.Sprintf("%t", cfg.VPNFabricSessionTransportEnabled),
fmt.Sprintf("%t", cfg.MeshQUICFabricEnabled),
strings.TrimSpace(cfg.MeshQUICFabricListenAddr),
}, "|")
}
@@ -1149,6 +1164,68 @@ func bindSyntheticMeshListener(cfg config.Config) (net.Listener, string, bool, e
return nil, "", false, err
}
func startQUICFabricEndpoint(ctx context.Context, cfg config.Config, identity state.Identity) (*mesh.QUICFabricServer, string, error) {
if !cfg.MeshQUICFabricEnabled {
return nil, "", nil
}
if strings.TrimSpace(cfg.MeshQUICFabricListenAddr) == "" {
return nil, "", fmt.Errorf("quic fabric enabled but listen addr is empty")
}
tlsConfig, err := quicFabricTLSConfig(identity)
if err != nil {
return nil, "", err
}
server, err := mesh.StartQUICFabricServer(ctx, mesh.QUICFabricServerConfig{
ListenAddr: cfg.MeshQUICFabricListenAddr,
TLSConfig: tlsConfig,
Logger: func(entry mesh.FabricSessionEventLogEntry) {
payload, err := json.Marshal(entry)
if err != nil {
log.Printf("fabric quic event marshal failed: %v", err)
return
}
log.Printf("fabric_quic_event=%s", string(payload))
},
})
if err != nil {
return nil, "", err
}
addr := ""
if server.Addr() != nil {
addr = server.Addr().String()
}
log.Printf("quic fabric endpoint enabled: listen_addr=%s effective_addr=%s node_id=%s cluster_id=%s", cfg.MeshQUICFabricListenAddr, addr, identity.NodeID, identity.ClusterID)
return server, addr, nil
}
func quicFabricTLSConfig(identity state.Identity) (*tls.Config, error) {
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, err
}
commonName := firstNonEmpty(identity.NodeID, "rap-fabric-node")
template := x509.Certificate{
SerialNumber: big.NewInt(time.Now().UnixNano()),
Subject: pkix.Name{CommonName: commonName},
NotBefore: time.Now().Add(-time.Minute),
NotAfter: time.Now().Add(24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
DNSNames: []string{commonName, "localhost"},
}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
return nil, err
}
return &tls.Config{
Certificates: []tls.Certificate{{
Certificate: [][]byte{certDER},
PrivateKey: key,
}},
NextProtos: []string{"rap-fabric-data-session-v1"},
}, nil
}
func isAddressInUse(err error) bool {
if err == nil {
return false
@@ -1689,6 +1766,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
} else {
meshState.ListenerHandler.Update(nextListenerHandler)
}
applyQUICFabricConfigIfChanged(ctx, cfg, identity, meshState)
applyMeshListenerConfigIfChanged(ctx, cfg, identity, meshState, loadedConfig, observedAt)
meshState.Routes = loadedConfig.Routes
meshState.RouteHealthRoutes = routeHealthRoutes
@@ -1735,6 +1813,32 @@ func applyMeshListenerConfigIfChanged(ctx context.Context, base config.Config, i
)
}
func applyQUICFabricConfigIfChanged(ctx context.Context, cfg config.Config, identity state.Identity, meshState *syntheticMeshState) {
if meshState == nil {
return
}
desiredAddr := strings.TrimSpace(cfg.MeshQUICFabricListenAddr)
if meshState.QUICFabricServer != nil && (!cfg.MeshQUICFabricEnabled || meshState.QUICFabricListenAddr != desiredAddr) {
_ = meshState.QUICFabricServer.Close()
meshState.QUICFabricServer = nil
meshState.QUICFabricListenAddr = ""
}
if !cfg.MeshQUICFabricEnabled {
meshState.QUICFabricError = ""
return
}
if meshState.QUICFabricServer != nil {
return
}
server, addr, err := startQUICFabricEndpoint(ctx, cfg, identity)
meshState.QUICFabricServer = server
meshState.QUICFabricListenAddr = addr
meshState.QUICFabricError = errorString(err)
if err != nil {
log.Printf("quic fabric endpoint unavailable: listen_addr=%s node_id=%s cluster_id=%s err=%v", cfg.MeshQUICFabricListenAddr, identity.NodeID, identity.ClusterID, err)
}
}
func meshRendezvousLeasePostureForState(meshState *syntheticMeshState, identity state.Identity, observedAt time.Time) meshRendezvousLeasePosture {
posture := meshRendezvousLeasePosture{}
if meshState == nil {
@@ -2487,7 +2591,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
payload.Capabilities["mesh_production_forwarding"] = true
}
if cfg.MeshFabricSessionEnabled {
payload.Metadata["fabric_session_endpoint_report"] = map[string]any{
report := map[string]any{
"schema_version": "rap.fabric_session_endpoint_report.v1",
"enabled": true,
"transport": "websocket_binary_frames",
@@ -2498,8 +2602,20 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
"traffic_isolation": "logical_streams",
"observed_at": observedAt.UTC().Format(time.RFC3339Nano),
}
if meshState != nil && cfg.MeshQUICFabricEnabled {
report["quic"] = map[string]any{
"enabled": meshState.QUICFabricServer != nil,
"listen_addr": cfg.MeshQUICFabricListenAddr,
"effective_listen_addr": meshState.QUICFabricListenAddr,
"error": meshState.QUICFabricError,
}
}
payload.Metadata["fabric_session_endpoint_report"] = report
payload.Capabilities["fabric_session_websocket_endpoint"] = true
payload.Capabilities["fabric_data_session_v1"] = true
if cfg.MeshQUICFabricEnabled {
payload.Capabilities["fabric_quic_endpoint"] = true
}
}
if cfg.VPNFabricSessionTransportEnabled {
report := map[string]any{
@@ -706,11 +706,14 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
MeshProductionForwardingEnabled: true,
MeshFabricSessionEnabled: true,
VPNFabricSessionTransportEnabled: true,
MeshQUICFabricEnabled: true,
MeshQUICFabricListenAddr: ":19443",
}, state.Identity{
ClusterID: "cluster-1",
NodeID: "node-a",
}, &syntheticMeshState{
VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(),
QUICFabricListenAddr: "127.0.0.1:19443",
}, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC))
report, ok := payload.Metadata["mesh_endpoint_report"].(map[string]any)
@@ -731,6 +734,11 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
}
if report, ok := payload.Metadata["fabric_session_endpoint_report"].(map[string]any); !ok || report["path"] != "/mesh/v1/fabric/session/ws" {
t.Fatalf("fabric session endpoint report missing: %+v", payload.Metadata)
} else if quic, ok := report["quic"].(map[string]any); !ok || quic["listen_addr"] != ":19443" || quic["effective_listen_addr"] != "127.0.0.1:19443" {
t.Fatalf("fabric quic endpoint report missing: %+v", report)
}
if payload.Capabilities["fabric_quic_endpoint"] != true {
t.Fatalf("fabric quic capability missing: %+v", payload.Capabilities)
}
if payload.Capabilities["vpn_fabric_session_transport"] != true || payload.Capabilities["vpn_packet_batch_binary_frames"] != true {
t.Fatalf("vpn fabric session capabilities missing: %+v", payload.Capabilities)