From e50070c005c3e74ec3bd09cd9ad0d7cc67f0bec3 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 12:13:59 +0300 Subject: [PATCH] Shard VPN fabric session streams --- .../rap-node-agent/cmd/rap-node-agent/main.go | 96 +++++++++++---- .../rap-node-agent/internal/config/config.go | 8 ++ .../internal/config/config_test.go | 4 + .../internal/hostagent/config.go | 7 ++ .../internal/hostagent/docker.go | 1 + .../internal/hostagent/linux.go | 1 + .../internal/hostagent/profile.go | 4 + .../internal/hostagent/update.go | 1 + .../internal/hostagent/windows.go | 1 + .../vpnruntime/fabric_session_transport.go | 116 ++++++++++++++++-- .../vpnruntime/fabric_transport_test.go | 36 ++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 12 files changed, 242 insertions(+), 36 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 0a65caa..4c30cfc 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -3149,13 +3149,18 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn } if cfg.VPNFabricSessionTransportEnabled { report := map[string]any{ - "schema_version": "rap.vpn_fabric_session_transport_report.v1", - "enabled": true, - "transport": "fabric_session_binary_frames", - "carriers": []string{"quic", "websocket"}, - "packet_payload": "rap.vpn_packet_batch.fabric.v1", - "gated": true, - "observed_at": observedAt.UTC().Format(time.RFC3339Nano), + "schema_version": "rap.vpn_fabric_session_transport_report.v1", + "enabled": true, + "transport": "fabric_session_binary_frames", + "carriers": []string{"quic", "websocket"}, + "packet_payload": "rap.vpn_packet_batch.fabric.v1", + "gated": true, + "stream_shards_per_class": cfg.VPNFabricSessionStreamShards, + "stream_classes": []string{ + vpnruntime.FabricTrafficClassInteractive, + vpnruntime.FabricTrafficClassBulk, + }, + "observed_at": observedAt.UTC().Format(time.RFC3339Nano), } if meshState != nil && meshState.VPNFabricTransport != nil { report["peer_sessions"] = meshState.VPNFabricTransport.Snapshot() @@ -3174,6 +3179,7 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn payload.Metadata["vpn_fabric_session_transport_report"] = report payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true + payload.Capabilities["vpn_fabric_session_stream_shards"] = true if meshState != nil && meshState.VPNFabricEndpointObservations != nil { payload.Metadata["vpn_fabric_endpoint_health_report"] = meshState.VPNFabricEndpointObservations.Report(observedAt, maxVPNFabricEndpointHealthReportEntries) } else { @@ -5151,7 +5157,7 @@ func fabricGatewayTransportForAssignment(ctx context.Context, cfg config.Config, return nil } if cfg.VPNFabricSessionTransportEnabled { - if transport := fabricSessionGatewayTransportForAssignment(ctx, identity, assignment, meshState, nextHop); transport != nil { + if transport := fabricSessionGatewayTransportForAssignment(ctx, cfg, identity, assignment, meshState, nextHop); transport != nil { return transport } } @@ -5170,7 +5176,7 @@ func fabricGatewayTransportForAssignment(ctx context.Context, cfg config.Config, } } -func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, nextHop string) vpnruntime.PacketTransport { +func fabricSessionGatewayTransportForAssignment(ctx context.Context, cfg config.Config, identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, nextHop string) vpnruntime.PacketTransport { if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" { return nil } @@ -5224,15 +5230,8 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=%s error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, reason, err) continue } - streamID := uint64(time.Now().UnixNano()) - if streamID == 0 { - streamID = 1 - } - if err := session.Send(dialCtx, fabricproto.Frame{ - Type: fabricproto.FrameOpenStream, - StreamID: streamID, - TrafficClass: fabricproto.TrafficClassInteractive, - }); err != nil { + streamIDsByClass, streamID, err := openVPNFabricSessionStreams(dialCtx, session, cfg.VPNFabricSessionStreamShards) + if err != nil { cancel() _ = session.Close() meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("stream_open_failed") @@ -5245,14 +5244,14 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st meshState.VPNFabricEndpointObservations.ObserveSuccess(selectedTarget.EndpointID, time.Since(startedAt)) log.Printf("vpn fabric session transport selected: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s pinned_cert=%t fallback_candidates=%d", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, selectedTarget.PeerCertSHA256 != "", len(targets)-index-1) return &vpnruntime.FabricSessionPacketTransport{ - Sender: session, - Receiver: session, - Inbox: meshState.VPNFabricInbox, - StreamID: streamID, - VPNConnectionID: assignment.VPNConnectionID, - SendDirection: vpnruntime.FabricDirectionGatewayToClient, - ReceiveDirection: vpnruntime.FabricDirectionClientToGateway, - TrafficClass: vpnruntime.FabricTrafficClassInteractive, + Sender: session, + Receiver: session, + Inbox: meshState.VPNFabricInbox, + StreamID: streamID, + StreamIDsByTrafficClass: streamIDsByClass, + VPNConnectionID: assignment.VPNConnectionID, + SendDirection: vpnruntime.FabricDirectionGatewayToClient, + ReceiveDirection: vpnruntime.FabricDirectionClientToGateway, } } meshState.VPNFabricSessionDialStats.ObserveAllCandidatesFailed() @@ -5260,6 +5259,51 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st return nil } +func openVPNFabricSessionStreams(ctx context.Context, session mesh.FabricTransportSession, shards int) (map[string][]uint64, uint64, error) { + if session == nil { + return nil, 0, mesh.ErrForwardRuntimeUnavailable + } + if shards <= 0 { + shards = 4 + } + if shards > 64 { + shards = 64 + } + base := uint64(time.Now().UnixNano()) + if base == 0 { + base = 1 + } + classes := []struct { + name string + trafficClass fabricproto.TrafficClass + }{ + {name: vpnruntime.FabricTrafficClassInteractive, trafficClass: fabricproto.TrafficClassInteractive}, + {name: vpnruntime.FabricTrafficClassBulk, trafficClass: fabricproto.TrafficClassBulk}, + } + out := make(map[string][]uint64, len(classes)) + var primary uint64 + for classIndex, class := range classes { + for shard := 0; shard < shards; shard++ { + streamID := base + uint64(classIndex*shards+shard) + if streamID == 0 { + streamID = uint64(classIndex*shards + shard + 1) + } + if err := session.Send(ctx, fabricproto.Frame{ + Type: fabricproto.FrameOpenStream, + StreamID: streamID, + TrafficClass: class.trafficClass, + }); err != nil { + return nil, 0, err + } + if primary == 0 { + primary = streamID + } + out[class.name] = append(out[class.name], streamID) + } + } + return out, primary, nil +} + func fabricSessionOpenFailureReason(err error) string { if err == nil { return "" diff --git a/agents/rap-node-agent/internal/config/config.go b/agents/rap-node-agent/internal/config/config.go index 49377a9..7462a68 100644 --- a/agents/rap-node-agent/internal/config/config.go +++ b/agents/rap-node-agent/internal/config/config.go @@ -30,6 +30,7 @@ type Config struct { VPNFabricSessionTransportEnabled bool MeshQUICFabricEnabled bool MeshQUICFabricListenAddr string + VPNFabricSessionStreamShards int VPNFabricQUICMaxStreamsPerConn int VPNFabricQUICIdleTTL time.Duration MeshProductionObservationSinkCapacity int @@ -73,6 +74,7 @@ func Load(args []string, env map[string]string) (Config, error) { fs.BoolVar(&cfg.VPNFabricSessionTransportEnabled, "vpn-fabric-session-transport-enabled", getEnvBool(env, "RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED", false), "Route VPN packet transport over persistent fabric session when explicitly enabled. Disabled by default.") fs.BoolVar(&cfg.MeshQUICFabricEnabled, "mesh-quic-fabric-enabled", getEnvBool(env, "RAP_MESH_QUIC_FABRIC_ENABLED", false), "Enable QUIC/UDP fabric listener. Disabled by default.") fs.StringVar(&cfg.MeshQUICFabricListenAddr, "mesh-quic-fabric-listen-addr", getEnv(env, "RAP_MESH_QUIC_FABRIC_LISTEN_ADDR", ""), "Listen address for QUIC/UDP fabric endpoint, for example :19443.") + fs.IntVar(&cfg.VPNFabricSessionStreamShards, "vpn-fabric-session-stream-shards", getEnvInt(env, "RAP_VPN_FABRIC_SESSION_STREAM_SHARDS", 4), "VPN fabric-session stream shards per traffic class.") fs.IntVar(&cfg.VPNFabricQUICMaxStreamsPerConn, "vpn-fabric-quic-max-streams-per-conn", getEnvInt(env, "RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN", 64), "Maximum logical fabric-session streams per cached VPN QUIC carrier connection.") fs.DurationVar(&cfg.VPNFabricQUICIdleTTL, "vpn-fabric-quic-idle-ttl", time.Duration(getEnvInt(env, "RAP_VPN_FABRIC_QUIC_IDLE_TTL_SECONDS", 300))*time.Second, "Idle TTL for cached VPN QUIC carrier connections.") fs.IntVar(&cfg.MeshProductionObservationSinkCapacity, "mesh-production-observation-sink-capacity", getEnvSignedInt(env, "RAP_MESH_PRODUCTION_OBSERVATION_SINK_CAPACITY", 0), "Bounded local metadata-only production envelope observation sink capacity. Disabled when 0.") @@ -112,6 +114,12 @@ func Load(args []string, env map[string]string) (Config, error) { cfg.MeshListenAddr = strings.TrimSpace(cfg.MeshListenAddr) cfg.MeshQUICFabricListenAddr = strings.TrimSpace(cfg.MeshQUICFabricListenAddr) cfg.MeshListenPortMode = strings.ToLower(strings.TrimSpace(cfg.MeshListenPortMode)) + if cfg.VPNFabricSessionStreamShards <= 0 { + cfg.VPNFabricSessionStreamShards = 4 + } + if cfg.VPNFabricSessionStreamShards > 64 { + cfg.VPNFabricSessionStreamShards = 64 + } if cfg.VPNFabricQUICMaxStreamsPerConn <= 0 { cfg.VPNFabricQUICMaxStreamsPerConn = 64 } diff --git a/agents/rap-node-agent/internal/config/config_test.go b/agents/rap-node-agent/internal/config/config_test.go index 95521f5..b74ac33 100644 --- a/agents/rap-node-agent/internal/config/config_test.go +++ b/agents/rap-node-agent/internal/config/config_test.go @@ -24,6 +24,7 @@ func TestLoadConfigFromEnvAndArgs(t *testing.T) { "RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED": "true", "RAP_MESH_QUIC_FABRIC_ENABLED": "true", "RAP_MESH_QUIC_FABRIC_LISTEN_ADDR": ":19443", + "RAP_VPN_FABRIC_SESSION_STREAM_SHARDS": "6", "RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN": "24", "RAP_VPN_FABRIC_QUIC_IDLE_TTL_SECONDS": "120", "RAP_MESH_PRODUCTION_OBSERVATION_SINK_CAPACITY": "5", @@ -81,6 +82,9 @@ func TestLoadConfigFromEnvAndArgs(t *testing.T) { if !cfg.MeshQUICFabricEnabled || cfg.MeshQUICFabricListenAddr != ":19443" { t.Fatalf("unexpected QUIC fabric config: %+v", cfg) } + if cfg.VPNFabricSessionStreamShards != 6 { + t.Fatalf("VPNFabricSessionStreamShards = %d, want 6", cfg.VPNFabricSessionStreamShards) + } if cfg.VPNFabricQUICMaxStreamsPerConn != 24 { t.Fatalf("VPNFabricQUICMaxStreamsPerConn = %d, want 24", cfg.VPNFabricQUICMaxStreamsPerConn) } diff --git a/agents/rap-node-agent/internal/hostagent/config.go b/agents/rap-node-agent/internal/hostagent/config.go index 864f349..035f0ab 100644 --- a/agents/rap-node-agent/internal/hostagent/config.go +++ b/agents/rap-node-agent/internal/hostagent/config.go @@ -33,6 +33,7 @@ type RuntimeConfig struct { VPNFabricSessionTransportEnabled bool MeshQUICFabricEnabled bool MeshQUICFabricListenAddr string + VPNFabricSessionStreamShards int VPNFabricQUICMaxStreamsPerConn int VPNFabricQUICIdleTTLSeconds int MeshListenAddr string @@ -68,6 +69,12 @@ func (cfg RuntimeConfig) Normalize() RuntimeConfig { cfg.RestartPolicy = firstNonEmpty(cfg.RestartPolicy, "unless-stopped") cfg.MeshListenAddr = strings.TrimSpace(cfg.MeshListenAddr) cfg.MeshQUICFabricListenAddr = strings.TrimSpace(cfg.MeshQUICFabricListenAddr) + if cfg.VPNFabricSessionStreamShards <= 0 { + cfg.VPNFabricSessionStreamShards = 4 + } + if cfg.VPNFabricSessionStreamShards > 64 { + cfg.VPNFabricSessionStreamShards = 64 + } if cfg.VPNFabricQUICMaxStreamsPerConn <= 0 { cfg.VPNFabricQUICMaxStreamsPerConn = 64 } diff --git a/agents/rap-node-agent/internal/hostagent/docker.go b/agents/rap-node-agent/internal/hostagent/docker.go index be0178b..ab7f7a5 100644 --- a/agents/rap-node-agent/internal/hostagent/docker.go +++ b/agents/rap-node-agent/internal/hostagent/docker.go @@ -267,6 +267,7 @@ func NodeAgentEnvWithStateDir(cfg RuntimeConfig, stateDir string) []string { "RAP_MESH_FABRIC_SESSION_ENABLED=" + boolString(cfg.MeshFabricSessionEnabled), "RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED=" + boolString(cfg.VPNFabricSessionTransportEnabled), "RAP_MESH_QUIC_FABRIC_ENABLED=" + boolString(cfg.MeshQUICFabricEnabled), + "RAP_VPN_FABRIC_SESSION_STREAM_SHARDS=" + strconv.Itoa(cfg.VPNFabricSessionStreamShards), "RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN=" + strconv.Itoa(cfg.VPNFabricQUICMaxStreamsPerConn), "RAP_VPN_FABRIC_QUIC_IDLE_TTL_SECONDS=" + strconv.Itoa(cfg.VPNFabricQUICIdleTTLSeconds), } diff --git a/agents/rap-node-agent/internal/hostagent/linux.go b/agents/rap-node-agent/internal/hostagent/linux.go index 6eeead1..b770559 100644 --- a/agents/rap-node-agent/internal/hostagent/linux.go +++ b/agents/rap-node-agent/internal/hostagent/linux.go @@ -76,6 +76,7 @@ func LinuxInstallConfigFromProfile(profile LinuxInstallProfile) LinuxInstallConf VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled, MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled, MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr, + VPNFabricSessionStreamShards: profile.VPNFabricSessionStreamShards, VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn, VPNFabricQUICIdleTTLSeconds: profile.VPNFabricQUICIdleTTLSeconds, MeshListenAddr: profile.MeshListenAddr, diff --git a/agents/rap-node-agent/internal/hostagent/profile.go b/agents/rap-node-agent/internal/hostagent/profile.go index 0091fd4..cd4eca8 100644 --- a/agents/rap-node-agent/internal/hostagent/profile.go +++ b/agents/rap-node-agent/internal/hostagent/profile.go @@ -34,6 +34,7 @@ type DockerInstallProfile struct { VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"` MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"` MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"` + VPNFabricSessionStreamShards int `json:"vpn_fabric_session_stream_shards"` VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"` VPNFabricQUICIdleTTLSeconds int `json:"vpn_fabric_quic_idle_ttl_seconds"` MeshListenAddr string `json:"mesh_listen_addr"` @@ -82,6 +83,7 @@ type WindowsInstallProfile struct { VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"` MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"` MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"` + VPNFabricSessionStreamShards int `json:"vpn_fabric_session_stream_shards"` VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"` VPNFabricQUICIdleTTLSeconds int `json:"vpn_fabric_quic_idle_ttl_seconds"` MeshListenAddr string `json:"mesh_listen_addr"` @@ -120,6 +122,7 @@ type LinuxInstallProfile struct { VPNFabricSessionTransportEnabled bool `json:"vpn_fabric_session_transport_enabled"` MeshQUICFabricEnabled bool `json:"mesh_quic_fabric_enabled"` MeshQUICFabricListenAddr string `json:"mesh_quic_fabric_listen_addr"` + VPNFabricSessionStreamShards int `json:"vpn_fabric_session_stream_shards"` VPNFabricQUICMaxStreamsPerConn int `json:"vpn_fabric_quic_max_streams_per_conn"` VPNFabricQUICIdleTTLSeconds int `json:"vpn_fabric_quic_idle_ttl_seconds"` MeshListenAddr string `json:"mesh_listen_addr"` @@ -303,6 +306,7 @@ func RuntimeConfigFromProfile(profile DockerInstallProfile) RuntimeConfig { VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled, MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled, MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr, + VPNFabricSessionStreamShards: profile.VPNFabricSessionStreamShards, VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn, VPNFabricQUICIdleTTLSeconds: profile.VPNFabricQUICIdleTTLSeconds, MeshListenAddr: profile.MeshListenAddr, diff --git a/agents/rap-node-agent/internal/hostagent/update.go b/agents/rap-node-agent/internal/hostagent/update.go index 249e4fb..3dcfc10 100644 --- a/agents/rap-node-agent/internal/hostagent/update.go +++ b/agents/rap-node-agent/internal/hostagent/update.go @@ -598,6 +598,7 @@ func (m DockerManager) runtimeConfigFromContainer(ctx context.Context, runner Co VPNFabricSessionTransportEnabled: parseBool(env["RAP_VPN_FABRIC_SESSION_TRANSPORT_ENABLED"]), MeshQUICFabricEnabled: parseBool(env["RAP_MESH_QUIC_FABRIC_ENABLED"]), MeshQUICFabricListenAddr: env["RAP_MESH_QUIC_FABRIC_LISTEN_ADDR"], + VPNFabricSessionStreamShards: parseInt(env["RAP_VPN_FABRIC_SESSION_STREAM_SHARDS"]), VPNFabricQUICMaxStreamsPerConn: parseInt(env["RAP_VPN_FABRIC_QUIC_MAX_STREAMS_PER_CONN"]), VPNFabricQUICIdleTTLSeconds: parseInt(env["RAP_VPN_FABRIC_QUIC_IDLE_TTL_SECONDS"]), MeshListenAddr: env["RAP_MESH_LISTEN_ADDR"], diff --git a/agents/rap-node-agent/internal/hostagent/windows.go b/agents/rap-node-agent/internal/hostagent/windows.go index 89dc212..dcf15fb 100644 --- a/agents/rap-node-agent/internal/hostagent/windows.go +++ b/agents/rap-node-agent/internal/hostagent/windows.go @@ -70,6 +70,7 @@ func WindowsInstallConfigFromProfile(profile WindowsInstallProfile) WindowsInsta VPNFabricSessionTransportEnabled: profile.VPNFabricSessionTransportEnabled, MeshQUICFabricEnabled: profile.MeshQUICFabricEnabled, MeshQUICFabricListenAddr: profile.MeshQUICFabricListenAddr, + VPNFabricSessionStreamShards: profile.VPNFabricSessionStreamShards, VPNFabricQUICMaxStreamsPerConn: profile.VPNFabricQUICMaxStreamsPerConn, VPNFabricQUICIdleTTLSeconds: profile.VPNFabricQUICIdleTTLSeconds, MeshListenAddr: profile.MeshListenAddr, diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go index 8025a6c..8cebc85 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_session_transport.go @@ -3,6 +3,7 @@ package vpnruntime import ( "context" "errors" + "sync" "sync/atomic" "time" @@ -30,7 +31,12 @@ type FabricSessionPacketTransport struct { ReceiveDirection string TrafficClass string - sequence uint64 + StreamIDsByTrafficClass map[string][]uint64 + StreamIDs []uint64 + + sequence uint64 + sequenceMu sync.Mutex + sequenceByStream map[uint64]uint64 } func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Context, packets [][]byte) error { @@ -41,19 +47,20 @@ func (t *FabricSessionPacketTransport) SendGatewayPacketBatch(ctx context.Contex if t == nil || t.Sender == nil { return mesh.ErrForwardRuntimeUnavailable } - if t.StreamID == 0 || t.VPNConnectionID == "" { + if !t.hasSendStream() || t.VPNConnectionID == "" { return errors.New("fabric session packet transport identity is incomplete") } direction := t.SendDirection if direction == "" { direction = FabricDirectionGatewayToClient } + streamID, trafficClass := t.selectStreamForPackets(packets) frame, err := NewFabricVPNPacketDataFrame(FabricVPNPacketFrameInput{ - StreamID: t.StreamID, - Sequence: atomic.AddUint64(&t.sequence, 1), + StreamID: streamID, + Sequence: t.nextSequence(streamID), VPNConnectionID: t.VPNConnectionID, Direction: direction, - TrafficClass: t.TrafficClass, + TrafficClass: trafficClass, Packets: packets, }) if err != nil { @@ -101,7 +108,7 @@ func (t *FabricSessionPacketTransport) ReceiveGatewayPacketBatch(ctx context.Con if !ok { return t.Inbox.Receive(ctx, t.VPNConnectionID, direction, 5*time.Millisecond) } - if frame.Type != fabricproto.FrameData || (t.StreamID != 0 && frame.StreamID != t.StreamID) { + if frame.Type != fabricproto.FrameData || !t.acceptsStream(frame.StreamID) { continue } payload, err := DecodeFabricVPNPacketDataFrame(frame) @@ -140,10 +147,7 @@ func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) erro if !ok { return nil } - if frame.Type != fabricproto.FrameData { - continue - } - if t.StreamID != 0 && frame.StreamID != t.StreamID { + if frame.Type != fabricproto.FrameData || !t.acceptsStream(frame.StreamID) { continue } if err := t.Inbox.DeliverFabricSessionFrame(ctx, frame); err != nil { @@ -152,3 +156,95 @@ func (t *FabricSessionPacketTransport) RunFrameIngress(ctx context.Context) erro } } } + +func (t *FabricSessionPacketTransport) selectStreamForPackets(packets [][]byte) (uint64, string) { + trafficClass := fabricSessionTrafficClassForPackets(t.TrafficClass, packets) + if ids := t.streamIDsForTrafficClass(trafficClass); len(ids) > 0 { + if len(ids) == 1 || len(packets) == 0 { + return ids[0], trafficClass + } + _, shard := classifyPacketFlow(packets[0], len(ids)) + return ids[shard], trafficClass + } + if len(t.StreamIDs) > 0 { + if len(t.StreamIDs) == 1 || len(packets) == 0 { + return t.StreamIDs[0], trafficClass + } + _, shard := classifyPacketFlow(packets[0], len(t.StreamIDs)) + return t.StreamIDs[shard], trafficClass + } + return t.StreamID, trafficClass +} + +func (t *FabricSessionPacketTransport) hasSendStream() bool { + if t == nil { + return false + } + if t.StreamID != 0 || len(t.StreamIDs) > 0 { + return true + } + for _, ids := range t.StreamIDsByTrafficClass { + if len(ids) > 0 { + return true + } + } + return false +} + +func (t *FabricSessionPacketTransport) streamIDsForTrafficClass(trafficClass string) []uint64 { + if t == nil || len(t.StreamIDsByTrafficClass) == 0 { + return nil + } + if ids := t.StreamIDsByTrafficClass[normalizeFabricTrafficClass(trafficClass)]; len(ids) > 0 { + return ids + } + if normalizeFabricTrafficClass(trafficClass) == FabricTrafficClassReliable { + return t.StreamIDsByTrafficClass[FabricTrafficClassBulk] + } + return nil +} + +func (t *FabricSessionPacketTransport) acceptsStream(streamID uint64) bool { + if t == nil || streamID == 0 { + return false + } + if t.StreamID != 0 && streamID == t.StreamID { + return true + } + for _, id := range t.StreamIDs { + if id == streamID { + return true + } + } + for _, ids := range t.StreamIDsByTrafficClass { + for _, id := range ids { + if id == streamID { + return true + } + } + } + return t.StreamID == 0 && len(t.StreamIDs) == 0 && len(t.StreamIDsByTrafficClass) == 0 +} + +func (t *FabricSessionPacketTransport) nextSequence(streamID uint64) uint64 { + if streamID == 0 { + return atomic.AddUint64(&t.sequence, 1) + } + t.sequenceMu.Lock() + defer t.sequenceMu.Unlock() + if t.sequenceByStream == nil { + t.sequenceByStream = map[uint64]uint64{} + } + t.sequenceByStream[streamID]++ + return t.sequenceByStream[streamID] +} + +func fabricSessionTrafficClassForPackets(fallback string, packets [][]byte) string { + if fallback = normalizeFabricTrafficClass(fallback); fallback != "" && fallback != FabricTrafficClassBulk { + return fallback + } + if batchHasTCPControlPacket(packets) { + return FabricTrafficClassInteractive + } + return FabricTrafficClassBulk +} diff --git a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go index 1f28f4a..a185444 100644 --- a/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go +++ b/agents/rap-node-agent/internal/vpnruntime/fabric_transport_test.go @@ -227,6 +227,42 @@ func TestFabricSessionPacketTransportSendsDataFrame(t *testing.T) { } } +func TestFabricSessionPacketTransportShardsStreamsByTrafficClass(t *testing.T) { + sender := &captureFabricSessionSender{} + transport := &FabricSessionPacketTransport{ + Sender: sender, + StreamID: 700, + VPNConnectionID: "vpn-1", + SendDirection: FabricDirectionClientToGateway, + StreamIDsByTrafficClass: map[string][]uint64{ + FabricTrafficClassInteractive: []uint64{801, 802}, + FabricTrafficClassBulk: []uint64{901, 902}, + }, + } + bulkPacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51000, 443) + controlPacket := testIPv4TCPPacket([4]byte{10, 77, 0, 2}, [4]byte{192, 168, 200, 95}, 51001, 3389) + controlPacket[33] = 0x02 + + if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{bulkPacket}); err != nil { + t.Fatalf("send bulk packet: %v", err) + } + if err := transport.SendGatewayPacketBatch(context.Background(), [][]byte{controlPacket}); err != nil { + t.Fatalf("send control packet: %v", err) + } + if len(sender.frames) != 2 { + t.Fatalf("sent frames = %d, want 2", len(sender.frames)) + } + if sender.frames[0].TrafficClass != fabricproto.TrafficClassBulk || sender.frames[0].StreamID < 901 || sender.frames[0].StreamID > 902 { + t.Fatalf("bulk frame did not use bulk shard: %+v", sender.frames[0]) + } + if sender.frames[1].TrafficClass != fabricproto.TrafficClassInteractive || sender.frames[1].StreamID < 801 || sender.frames[1].StreamID > 802 { + t.Fatalf("control frame did not use interactive shard: %+v", sender.frames[1]) + } + if sender.frames[0].Sequence != 1 || sender.frames[1].Sequence != 1 { + t.Fatalf("per-stream sequences = %d/%d, want 1/1", sender.frames[0].Sequence, sender.frames[1].Sequence) + } +} + func TestFabricSessionPacketTransportRunFrameIngressDeliversInbox(t *testing.T) { inbox := NewFabricPacketInbox(4) receiver := memoryFabricSessionReceiver{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index e370f16..9679fe4 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -380,6 +380,9 @@ concentrated on a specific QUIC carrier. Fresh local capacity-pressure counters also feed endpoint ranking as a bounded penalty, spreading new fabric sessions away from a saturated carrier without declaring that carrier failed. +VPN fabric-session transport now opens configurable per-class stream shards +for interactive and bulk packet traffic, so heavy browser flows do not share a +single logical stream with latency-sensitive RDP/control packets. Endpoint ranking treats `capacity_limited` observations as a soft pressure penalty instead of a hard recent failure, enabling load spreading without marking the carrier unhealthy.