Configure QUIC fabric stream limits

This commit is contained in:
2026-05-16 11:44:13 +03:00
parent ef458330aa
commit 0f7caf5bb4
11 changed files with 50 additions and 5 deletions
@@ -1095,7 +1095,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
VPNFabricIngress: vpnFabricIngress,
VPNFabricSessionPeers: vpnFabricSessionPeers,
VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers),
VPNFabricQUICTransport: mesh.NewQUICFabricTransport(nil),
VPNFabricQUICTransport: newVPNFabricQUICTransport(cfg),
VPNFabricSessionDialStats: newVPNFabricSessionDialStats(),
VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(identity.NodeID),
PeerEndpoints: copyStringMap(peerEndpoints),
@@ -1151,6 +1151,14 @@ func newVPNFabricIngress(meshState *syntheticMeshState, identity state.Identity,
return ingress
}
func newVPNFabricQUICTransport(cfg config.Config) *mesh.QUICFabricTransport {
transport := mesh.NewQUICFabricTransport(nil)
if cfg.VPNFabricQUICMaxStreamsPerConn > 0 {
transport.MaxStreamsPerConn = cfg.VPNFabricQUICMaxStreamsPerConn
}
return transport
}
func vpnruntimeAdaptivePolicy(policy *client.FabricServiceChannelAdaptivePolicy) vpnruntime.FabricServiceChannelAdaptivePolicy {
if policy == nil {
return vpnruntime.FabricServiceChannelAdaptivePolicy{}
@@ -1991,7 +1999,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
}
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil)
meshState.VPNFabricQUICTransport = newVPNFabricQUICTransport(cfg)
}
if meshState.VPNFabricSessionPeers == nil {
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
@@ -2000,7 +2008,9 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
}
if meshState.VPNFabricQUICTransport == nil {
meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil)
meshState.VPNFabricQUICTransport = newVPNFabricQUICTransport(cfg)
} else if cfg.VPNFabricQUICMaxStreamsPerConn > 0 {
meshState.VPNFabricQUICTransport.MaxStreamsPerConn = cfg.VPNFabricQUICMaxStreamsPerConn
}
if meshState.VPNFabricSessionDialStats == nil {
meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats()
@@ -2965,7 +2975,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn
report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot()
}
if meshState != nil && meshState.VPNFabricQUICTransport != nil {
report["quic_sessions"] = meshState.VPNFabricQUICTransport.Snapshot()
quicSnapshot := meshState.VPNFabricQUICTransport.Snapshot()
report["quic_sessions"] = quicSnapshot
report["quic_max_streams_per_conn"] = meshState.VPNFabricQUICTransport.MaxStreamsPerConn
}
if meshState != nil && meshState.VPNFabricSessionDialStats != nil {
report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt)
@@ -707,6 +707,7 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
MeshProductionForwardingEnabled: true,
MeshFabricSessionEnabled: true,
VPNFabricSessionTransportEnabled: true,
VPNFabricQUICMaxStreamsPerConn: 24,
MeshQUICFabricEnabled: true,
MeshQUICFabricListenAddr: ":19443",
}, state.Identity{
@@ -714,7 +715,12 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
NodeID: "node-a",
}, &syntheticMeshState{
VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(),
QUICFabricListenAddr: "127.0.0.1:19443",
VPNFabricQUICTransport: func() *mesh.QUICFabricTransport {
transport := mesh.NewQUICFabricTransport(nil)
transport.MaxStreamsPerConn = 24
return transport
}(),
QUICFabricListenAddr: "127.0.0.1:19443",
}, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC))
report, ok := payload.Metadata["mesh_endpoint_report"].(map[string]any)
@@ -754,6 +760,8 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
report["transport"] != "fabric_session_binary_frames" ||
report["peer_sessions"] == nil {
t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata)
} else if report["quic_sessions"] == nil || report["quic_max_streams_per_conn"] != 24 {
t.Fatalf("vpn fabric quic session report missing: %+v", report)
}
if payload.Capabilities["vpn_fabric_endpoint_health_feedback"] != true {
t.Fatalf("vpn fabric endpoint health capability missing: %+v", payload.Capabilities)
@@ -30,6 +30,7 @@ type Config struct {
VPNFabricSessionTransportEnabled bool
MeshQUICFabricEnabled bool
MeshQUICFabricListenAddr string
VPNFabricQUICMaxStreamsPerConn int
MeshProductionObservationSinkCapacity int
MeshListenAddr string
MeshListenPortMode string
@@ -71,6 +72,7 @@ func Load(args []string, env map[string]string) (Config, error) {
fs.BoolVar(&cfg.VPNFabricSessionTransportEnabled, "vpn-fabric-session-transport-enabled", getEnvBool(env, "RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED", false), "Route VPN packet transport over persistent fabric session when explicitly enabled. Disabled by default.")
fs.BoolVar(&cfg.MeshQUICFabricEnabled, "mesh-quic-fabric-enabled", getEnvBool(env, "RAP_MESH_QUIC_FABRIC_ENABLED", false), "Enable QUIC/UDP fabric listener. Disabled by default.")
fs.StringVar(&cfg.MeshQUICFabricListenAddr, "mesh-quic-fabric-listen-addr", getEnv(env, "RAP_MESH_QUIC_FABRIC_LISTEN_ADDR", ""), "Listen address for QUIC/UDP fabric endpoint, for example :19443.")
fs.IntVar(&cfg.VPNFabricQUICMaxStreamsPerConn, "vpn-fabric-quic-max-streams-per-conn", getEnvInt(env, "RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN", 64), "Maximum logical fabric-session streams per cached VPN QUIC carrier connection.")
fs.IntVar(&cfg.MeshProductionObservationSinkCapacity, "mesh-production-observation-sink-capacity", getEnvSignedInt(env, "RAP_MESH_PRODUCTION_OBSERVATION_SINK_CAPACITY", 0), "Bounded local metadata-only production envelope observation sink capacity. Disabled when 0.")
fs.StringVar(&cfg.MeshListenAddr, "mesh-listen-addr", getEnv(env, "RAP_MESH_LISTEN_ADDR", ""), "Listen address for disabled-by-default C17E synthetic mesh HTTP endpoint.")
fs.StringVar(&cfg.MeshListenPortMode, "mesh-listen-port-mode", getEnv(env, "RAP_MESH_LISTEN_PORT_MODE", "manual"), "Mesh listen port behavior: manual, auto, or disabled.")
@@ -108,6 +110,9 @@ func Load(args []string, env map[string]string) (Config, error) {
cfg.MeshListenAddr = strings.TrimSpace(cfg.MeshListenAddr)
cfg.MeshQUICFabricListenAddr = strings.TrimSpace(cfg.MeshQUICFabricListenAddr)
cfg.MeshListenPortMode = strings.ToLower(strings.TrimSpace(cfg.MeshListenPortMode))
if cfg.VPNFabricQUICMaxStreamsPerConn <= 0 {
cfg.VPNFabricQUICMaxStreamsPerConn = 64
}
cfg.MeshAdvertiseEndpoint = strings.TrimRight(strings.TrimSpace(cfg.MeshAdvertiseEndpoint), "/")
cfg.MeshAdvertiseEndpointsJSON = strings.TrimSpace(cfg.MeshAdvertiseEndpointsJSON)
cfg.MeshAdvertiseTransport = strings.TrimSpace(cfg.MeshAdvertiseTransport)
@@ -24,6 +24,7 @@ func TestLoadConfigFromEnvAndArgs(t *testing.T) {
"RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED": "true",
"RAP_MESH_QUIC_FABRIC_ENABLED": "true",
"RAP_MESH_QUIC_FABRIC_LISTEN_ADDR": ":19443",
"RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN": "24",
"RAP_MESH_PRODUCTION_OBSERVATION_SINK_CAPACITY": "5",
"RAP_MESH_LISTEN_ADDR": "127.0.0.1:19001",
"RAP_MESH_LISTEN_PORT_MODE": "auto",
@@ -79,6 +80,9 @@ func TestLoadConfigFromEnvAndArgs(t *testing.T) {
if !cfg.MeshQUICFabricEnabled || cfg.MeshQUICFabricListenAddr != ":19443" {
t.Fatalf("unexpected QUIC fabric config: %+v", cfg)
}
if cfg.VPNFabricQUICMaxStreamsPerConn != 24 {
t.Fatalf("VPNFabricQUICMaxStreamsPerConn = %d, want 24", cfg.VPNFabricQUICMaxStreamsPerConn)
}
if cfg.MeshProductionObservationSinkCapacity != 5 {
t.Fatalf("MeshProductionObservationSinkCapacity = %d, want 5", cfg.MeshProductionObservationSinkCapacity)
}
@@ -33,6 +33,7 @@ type RuntimeConfig struct {
VPNFabricSessionTransportEnabled bool
MeshQUICFabricEnabled bool
MeshQUICFabricListenAddr string
VPNFabricQUICMaxStreamsPerConn int
MeshListenAddr string
MeshListenPortMode string
MeshListenAutoPortStart int
@@ -66,6 +67,9 @@ func (cfg RuntimeConfig) Normalize() RuntimeConfig {
cfg.RestartPolicy = firstNonEmpty(cfg.RestartPolicy, "unless-stopped")
cfg.MeshListenAddr = strings.TrimSpace(cfg.MeshListenAddr)
cfg.MeshQUICFabricListenAddr = strings.TrimSpace(cfg.MeshQUICFabricListenAddr)
if cfg.VPNFabricQUICMaxStreamsPerConn <= 0 {
cfg.VPNFabricQUICMaxStreamsPerConn = 64
}
cfg.MeshListenPortMode = strings.ToLower(strings.TrimSpace(cfg.MeshListenPortMode))
cfg.MeshAdvertiseEndpoint = strings.TrimRight(strings.TrimSpace(cfg.MeshAdvertiseEndpoint), "/")
cfg.MeshAdvertiseEndpointsJSON = strings.TrimSpace(cfg.MeshAdvertiseEndpointsJSON)
@@ -267,6 +267,7 @@ func NodeAgentEnvWithStateDir(cfg RuntimeConfig, stateDir string) []string {
"RAP_MESH_FABRIC_SESSION_ENABLED=" + boolString(cfg.MeshFabricSessionEnabled),
"RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED=" + boolString(cfg.VPNFabricSessionTransportEnabled),
"RAP_MESH_QUIC_FABRIC_ENABLED=" + boolString(cfg.MeshQUICFabricEnabled),
"RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN=" + strconv.Itoa(cfg.VPNFabricQUICMaxStreamsPerConn),
}
if cfg.JoinToken != "" {
env = append(env, "RAP_JOIN_TOKEN="+cfg.JoinToken)
@@ -76,6 +76,7 @@ func LinuxInstallConfigFromProfile(profile LinuxInstallProfile) LinuxInstallConf
VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled,
MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled,
MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr,
VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn,
MeshListenAddr: profile.MeshListenAddr,
MeshListenPortMode: profile.MeshListenPortMode,
MeshListenAutoPortStart: profile.MeshListenAutoPortStart,
@@ -34,6 +34,7 @@ type DockerInstallProfile struct {
VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"`
MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"`
MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"`
VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"`
MeshListenAddr string `json:"mesh_listen_addr"`
MeshListenPortMode string `json:"mesh_listen_port_mode"`
MeshListenAutoPortStart int `json:"mesh_listen_auto_port_start"`
@@ -80,6 +81,7 @@ type WindowsInstallProfile struct {
VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"`
MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"`
MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"`
VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"`
MeshListenAddr string `json:"mesh_listen_addr"`
MeshListenPortMode string `json:"mesh_listen_port_mode"`
MeshListenAutoPortStart int `json:"mesh_listen_auto_port_start"`
@@ -116,6 +118,7 @@ type LinuxInstallProfile struct {
VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"`
MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"`
MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"`
VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"`
MeshListenAddr string `json:"mesh_listen_addr"`
MeshListenPortMode string `json:"mesh_listen_port_mode"`
MeshListenAutoPortStart int `json:"mesh_listen_auto_port_start"`
@@ -297,6 +300,7 @@ func RuntimeConfigFromProfile(profile DockerInstallProfile) RuntimeConfig {
VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled,
MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled,
MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr,
VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn,
MeshListenAddr: profile.MeshListenAddr,
MeshListenPortMode: profile.MeshListenPortMode,
MeshListenAutoPortStart: profile.MeshListenAutoPortStart,
@@ -598,6 +598,7 @@ func (m DockerManager) runtimeConfigFromContainer(ctx context.Context, runner Co
VPNFabricSessionTransportEnabled: parseBool(env["RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED"]),
MeshQUICFabricEnabled: parseBool(env["RAP_MESH_QUIC_FABRIC_ENABLED"]),
MeshQUICFabricListenAddr: env["RAP_MESH_QUIC_FABRIC_LISTEN_ADDR"],
VPNFabricQUICMaxStreamsPerConn: parseInt(env["RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN"]),
MeshListenAddr: env["RAP_MESH_LISTEN_ADDR"],
MeshListenPortMode: env["RAP_MESH_LISTEN_PORT_MODE"],
MeshListenAutoPortStart: parseInt(env["RAP_MESH_LISTEN_AUTO_PORT_START"]),
@@ -70,6 +70,7 @@ func WindowsInstallConfigFromProfile(profile WindowsInstallProfile) WindowsInsta
VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled,
MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled,
MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr,
VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn,
MeshListenAddr: profile.MeshListenAddr,
MeshListenPortMode: profile.MeshListenPortMode,
MeshListenAutoPortStart: profile.MeshListenAutoPortStart,
@@ -364,6 +364,10 @@ from holding unused peer connections indefinitely.
QUIC carrier connections now track active logical streams and enforce a
per-connection stream limit, exposing stream opens/closes and limit rejects in
transport telemetry.
The per-connection QUIC stream limit is configurable through
`RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN` /
`-vpn-fabric-quic-max-streams-per-conn` and propagated by host-agent install
profiles.
Deliverables: