diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index ad1bb03..0bd3eb4 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -1095,7 +1095,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c VPNFabricIngress: vpnFabricIngress, VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), - VPNFabricQUICTransport: mesh.NewQUICFabricTransport(nil), + VPNFabricQUICTransport: newVPNFabricQUICTransport(cfg), VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), VPNFabricEndpointObservations: newVPNFabricEndpointObservationStore(identity.NodeID), PeerEndpoints: copyStringMap(peerEndpoints), @@ -1151,6 +1151,14 @@ func newVPNFabricIngress(meshState *syntheticMeshState, identity state.Identity, return ingress } +func newVPNFabricQUICTransport(cfg config.Config) *mesh.QUICFabricTransport { + transport := mesh.NewQUICFabricTransport(nil) + if cfg.VPNFabricQUICMaxStreamsPerConn > 0 { + transport.MaxStreamsPerConn = cfg.VPNFabricQUICMaxStreamsPerConn + } + return transport +} + func vpnruntimeAdaptivePolicy(policy *client.FabricServiceChannelAdaptivePolicy) vpnruntime.FabricServiceChannelAdaptivePolicy { if policy == nil { return vpnruntime.FabricServiceChannelAdaptivePolicy{} @@ -1991,7 +1999,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i } meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) - meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil) + meshState.VPNFabricQUICTransport = newVPNFabricQUICTransport(cfg) } if meshState.VPNFabricSessionPeers == nil { meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() @@ -2000,7 +2008,9 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } if meshState.VPNFabricQUICTransport == nil { - meshState.VPNFabricQUICTransport = mesh.NewQUICFabricTransport(nil) + meshState.VPNFabricQUICTransport = newVPNFabricQUICTransport(cfg) + } else if cfg.VPNFabricQUICMaxStreamsPerConn > 0 { + meshState.VPNFabricQUICTransport.MaxStreamsPerConn = cfg.VPNFabricQUICMaxStreamsPerConn } if meshState.VPNFabricSessionDialStats == nil { meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() @@ -2965,7 +2975,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot() } if meshState != nil && meshState.VPNFabricQUICTransport != nil { - report["quic_sessions"] = meshState.VPNFabricQUICTransport.Snapshot() + quicSnapshot := meshState.VPNFabricQUICTransport.Snapshot() + report["quic_sessions"] = quicSnapshot + report["quic_max_streams_per_conn"] = meshState.VPNFabricQUICTransport.MaxStreamsPerConn } if meshState != nil && meshState.VPNFabricSessionDialStats != nil { report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 7253b9e..3e45b6d 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -707,6 +707,7 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { MeshProductionForwardingEnabled: true, MeshFabricSessionEnabled: true, VPNFabricSessionTransportEnabled: true, + VPNFabricQUICMaxStreamsPerConn: 24, MeshQUICFabricEnabled: true, MeshQUICFabricListenAddr: ":19443", }, state.Identity{ @@ -714,7 +715,12 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { NodeID: "node-a", }, &syntheticMeshState{ VPNFabricSessionPeers: mesh.NewFabricSessionPeerManager(), - QUICFabricListenAddr: "127.0.0.1:19443", + VPNFabricQUICTransport: func() *mesh.QUICFabricTransport { + transport := mesh.NewQUICFabricTransport(nil) + transport.MaxStreamsPerConn = 24 + return transport + }(), + QUICFabricListenAddr: "127.0.0.1:19443", }, time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)) report, ok := payload.Metadata["mesh_endpoint_report"].(map[string]any) @@ -754,6 +760,8 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { report["transport"] != "fabric_session_binary_frames" || report["peer_sessions"] == nil { t.Fatalf("vpn fabric session report missing: %+v", payload.Metadata) + } else if report["quic_sessions"] == nil || report["quic_max_streams_per_conn"] != 24 { + t.Fatalf("vpn fabric quic session report missing: %+v", report) } if payload.Capabilities["vpn_fabric_endpoint_health_feedback"] != true { t.Fatalf("vpn fabric endpoint health capability missing: %+v", payload.Capabilities) diff --git a/agents/rap-node-agent/internal/config/config.go b/agents/rap-node-agent/internal/config/config.go index 9906b8b..08ce438 100644 --- a/agents/rap-node-agent/internal/config/config.go +++ b/agents/rap-node-agent/internal/config/config.go @@ -30,6 +30,7 @@ type Config struct { VPNFabricSessionTransportEnabled bool MeshQUICFabricEnabled bool MeshQUICFabricListenAddr string + VPNFabricQUICMaxStreamsPerConn int MeshProductionObservationSinkCapacity int MeshListenAddr string MeshListenPortMode string @@ -71,6 +72,7 @@ func Load(args []string, env map[string]string) (Config, error) { fs.BoolVar(&cfg.VPNFabricSessionTransportEnabled, "vpn-fabric-session-transport-enabled", getEnvBool(env, "RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED", false), "Route VPN packet transport over persistent fabric session when explicitly enabled. Disabled by default.") fs.BoolVar(&cfg.MeshQUICFabricEnabled, "mesh-quic-fabric-enabled", getEnvBool(env, "RAP_MESH_QUIC_FABRIC_ENABLED", false), "Enable QUIC/UDP fabric listener. Disabled by default.") fs.StringVar(&cfg.MeshQUICFabricListenAddr, "mesh-quic-fabric-listen-addr", getEnv(env, "RAP_MESH_QUIC_FABRIC_LISTEN_ADDR", ""), "Listen address for QUIC/UDP fabric endpoint, for example :19443.") + fs.IntVar(&cfg.VPNFabricQUICMaxStreamsPerConn, "vpn-fabric-quic-max-streams-per-conn", getEnvInt(env, "RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN", 64), "Maximum logical fabric-session streams per cached VPN QUIC carrier connection.") fs.IntVar(&cfg.MeshProductionObservationSinkCapacity, "mesh-production-observation-sink-capacity", getEnvSignedInt(env, "RAP_MESH_PRODUCTION_OBSERVATION_SINK_CAPACITY", 0), "Bounded local metadata-only production envelope observation sink capacity. Disabled when 0.") fs.StringVar(&cfg.MeshListenAddr, "mesh-listen-addr", getEnv(env, "RAP_MESH_LISTEN_ADDR", ""), "Listen address for disabled-by-default C17E synthetic mesh HTTP endpoint.") fs.StringVar(&cfg.MeshListenPortMode, "mesh-listen-port-mode", getEnv(env, "RAP_MESH_LISTEN_PORT_MODE", "manual"), "Mesh listen port behavior: manual, auto, or disabled.") @@ -108,6 +110,9 @@ func Load(args []string, env map[string]string) (Config, error) { cfg.MeshListenAddr = strings.TrimSpace(cfg.MeshListenAddr) cfg.MeshQUICFabricListenAddr = strings.TrimSpace(cfg.MeshQUICFabricListenAddr) cfg.MeshListenPortMode = strings.ToLower(strings.TrimSpace(cfg.MeshListenPortMode)) + if cfg.VPNFabricQUICMaxStreamsPerConn <= 0 { + cfg.VPNFabricQUICMaxStreamsPerConn = 64 + } cfg.MeshAdvertiseEndpoint = strings.TrimRight(strings.TrimSpace(cfg.MeshAdvertiseEndpoint), "/") cfg.MeshAdvertiseEndpointsJSON = strings.TrimSpace(cfg.MeshAdvertiseEndpointsJSON) cfg.MeshAdvertiseTransport = strings.TrimSpace(cfg.MeshAdvertiseTransport) diff --git a/agents/rap-node-agent/internal/config/config_test.go b/agents/rap-node-agent/internal/config/config_test.go index de69a9b..34042f6 100644 --- a/agents/rap-node-agent/internal/config/config_test.go +++ b/agents/rap-node-agent/internal/config/config_test.go @@ -24,6 +24,7 @@ func TestLoadConfigFromEnvAndArgs(t *testing.T) { "RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED": "true", "RAP_MESH_QUIC_FABRIC_ENABLED": "true", "RAP_MESH_QUIC_FABRIC_LISTEN_ADDR": ":19443", + "RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN": "24", "RAP_MESH_PRODUCTION_OBSERVATION_SINK_CAPACITY": "5", "RAP_MESH_LISTEN_ADDR": "127.0.0.1:19001", "RAP_MESH_LISTEN_PORT_MODE": "auto", @@ -79,6 +80,9 @@ func TestLoadConfigFromEnvAndArgs(t *testing.T) { if !cfg.MeshQUICFabricEnabled || cfg.MeshQUICFabricListenAddr != ":19443" { t.Fatalf("unexpected QUIC fabric config: %+v", cfg) } + if cfg.VPNFabricQUICMaxStreamsPerConn != 24 { + t.Fatalf("VPNFabricQUICMaxStreamsPerConn = %d, want 24", cfg.VPNFabricQUICMaxStreamsPerConn) + } if cfg.MeshProductionObservationSinkCapacity != 5 { t.Fatalf("MeshProductionObservationSinkCapacity = %d, want 5", cfg.MeshProductionObservationSinkCapacity) } diff --git a/agents/rap-node-agent/internal/hostagent/config.go b/agents/rap-node-agent/internal/hostagent/config.go index c544712..ea28b15 100644 --- a/agents/rap-node-agent/internal/hostagent/config.go +++ b/agents/rap-node-agent/internal/hostagent/config.go @@ -33,6 +33,7 @@ type RuntimeConfig struct { VPNFabricSessionTransportEnabled bool MeshQUICFabricEnabled bool MeshQUICFabricListenAddr string + VPNFabricQUICMaxStreamsPerConn int MeshListenAddr string MeshListenPortMode string MeshListenAutoPortStart int @@ -66,6 +67,9 @@ func (cfg RuntimeConfig) Normalize() RuntimeConfig { cfg.RestartPolicy = firstNonEmpty(cfg.RestartPolicy, "unless-stopped") cfg.MeshListenAddr = strings.TrimSpace(cfg.MeshListenAddr) cfg.MeshQUICFabricListenAddr = strings.TrimSpace(cfg.MeshQUICFabricListenAddr) + if cfg.VPNFabricQUICMaxStreamsPerConn <= 0 { + cfg.VPNFabricQUICMaxStreamsPerConn = 64 + } cfg.MeshListenPortMode = strings.ToLower(strings.TrimSpace(cfg.MeshListenPortMode)) cfg.MeshAdvertiseEndpoint = strings.TrimRight(strings.TrimSpace(cfg.MeshAdvertiseEndpoint), "/") cfg.MeshAdvertiseEndpointsJSON = strings.TrimSpace(cfg.MeshAdvertiseEndpointsJSON) diff --git a/agents/rap-node-agent/internal/hostagent/docker.go b/agents/rap-node-agent/internal/hostagent/docker.go index ca5697b..80fcb14 100644 --- a/agents/rap-node-agent/internal/hostagent/docker.go +++ b/agents/rap-node-agent/internal/hostagent/docker.go @@ -267,6 +267,7 @@ func NodeAgentEnvWithStateDir(cfg RuntimeConfig, stateDir string) []string { "RAP_MESH_FABRIC_SESSION_ENABLED=" + boolString(cfg.MeshFabricSessionEnabled), "RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED=" + boolString(cfg.VPNFabricSessionTransportEnabled), "RAP_MESH_QUIC_FABRIC_ENABLED=" + boolString(cfg.MeshQUICFabricEnabled), + "RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN=" + strconv.Itoa(cfg.VPNFabricQUICMaxStreamsPerConn), } if cfg.JoinToken != "" { env = append(env, "RAP_JOIN_TOKEN="+cfg.JoinToken) diff --git a/agents/rap-node-agent/internal/hostagent/linux.go b/agents/rap-node-agent/internal/hostagent/linux.go index 774e961..7b5711e 100644 --- a/agents/rap-node-agent/internal/hostagent/linux.go +++ b/agents/rap-node-agent/internal/hostagent/linux.go @@ -76,6 +76,7 @@ func LinuxInstallConfigFromProfile(profile LinuxInstallProfile) LinuxInstallConf VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled, MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled, MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr, + VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn, MeshListenAddr: profile.MeshListenAddr, MeshListenPortMode: profile.MeshListenPortMode, MeshListenAutoPortStart: profile.MeshListenAutoPortStart, diff --git a/agents/rap-node-agent/internal/hostagent/profile.go b/agents/rap-node-agent/internal/hostagent/profile.go index f5d5667..c87ed07 100644 --- a/agents/rap-node-agent/internal/hostagent/profile.go +++ b/agents/rap-node-agent/internal/hostagent/profile.go @@ -34,6 +34,7 @@ type DockerInstallProfile struct { VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"` MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"` MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"` + VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"` MeshListenAddr string `json:"mesh_listen_addr"` MeshListenPortMode string `json:"mesh_listen_port_mode"` MeshListenAutoPortStart int `json:"mesh_listen_auto_port_start"` @@ -80,6 +81,7 @@ type WindowsInstallProfile struct { VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"` MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"` MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"` + VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"` MeshListenAddr string `json:"mesh_listen_addr"` MeshListenPortMode string `json:"mesh_listen_port_mode"` MeshListenAutoPortStart int `json:"mesh_listen_auto_port_start"` @@ -116,6 +118,7 @@ type LinuxInstallProfile struct { VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"` MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"` MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"` + VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"` MeshListenAddr string `json:"mesh_listen_addr"` MeshListenPortMode string `json:"mesh_listen_port_mode"` MeshListenAutoPortStart int `json:"mesh_listen_auto_port_start"` @@ -297,6 +300,7 @@ func RuntimeConfigFromProfile(profile DockerInstallProfile) RuntimeConfig { VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled, MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled, MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr, + VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn, MeshListenAddr: profile.MeshListenAddr, MeshListenPortMode: profile.MeshListenPortMode, MeshListenAutoPortStart: profile.MeshListenAutoPortStart, diff --git a/agents/rap-node-agent/internal/hostagent/update.go b/agents/rap-node-agent/internal/hostagent/update.go index 4224bc7..fc3278d 100644 --- a/agents/rap-node-agent/internal/hostagent/update.go +++ b/agents/rap-node-agent/internal/hostagent/update.go @@ -598,6 +598,7 @@ func (m DockerManager) runtimeConfigFromContainer(ctx context.Context, runner Co VPNFabricSessionTransportEnabled: parseBool(env["RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED"]), MeshQUICFabricEnabled: parseBool(env["RAP_MESH_QUIC_FABRIC_ENABLED"]), MeshQUICFabricListenAddr: env["RAP_MESH_QUIC_FABRIC_LISTEN_ADDR"], + VPNFabricQUICMaxStreamsPerConn: parseInt(env["RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN"]), MeshListenAddr: env["RAP_MESH_LISTEN_ADDR"], MeshListenPortMode: env["RAP_MESH_LISTEN_PORT_MODE"], MeshListenAutoPortStart: parseInt(env["RAP_MESH_LISTEN_AUTO_PORT_START"]), diff --git a/agents/rap-node-agent/internal/hostagent/windows.go b/agents/rap-node-agent/internal/hostagent/windows.go index 445b50f..2d66e68 100644 --- a/agents/rap-node-agent/internal/hostagent/windows.go +++ b/agents/rap-node-agent/internal/hostagent/windows.go @@ -70,6 +70,7 @@ func WindowsInstallConfigFromProfile(profile WindowsInstallProfile) WindowsInsta VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled, MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled, MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr, + VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn, MeshListenAddr: profile.MeshListenAddr, MeshListenPortMode: profile.MeshListenPortMode, MeshListenAutoPortStart: profile.MeshListenAutoPortStart, diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 97f177e..5a4b7f8 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -364,6 +364,10 @@ from holding unused peer connections indefinitely. QUIC carrier connections now track active logical streams and enforce a per-connection stream limit, exposing stream opens/closes and limit rejects in transport telemetry. +The per-connection QUIC stream limit is configurable through +`RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN` / +`-vpn-fabric-quic-max-streams-per-conn` and propagated by host-agent install +profiles. Deliverables: