diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 5022220..4c2db34 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -361,6 +361,7 @@ type syntheticMeshState struct { VPNFabricSessionPeers *mesh.FabricSessionPeerManager VPNFabricTransport *mesh.WebSocketFabricTransport PeerEndpoints map[string]string + PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate VPNGateway *vpnruntime.Gateway ServiceChannelAccessStats *fabricServiceChannelAccessStats RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink @@ -815,6 +816,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), PeerEndpoints: copyStringMap(peerEndpoints), + PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates), VPNGateway: vpnGateway, ServiceChannelAccessStats: serviceChannelAccessStats, RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink, @@ -1693,7 +1695,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i } productionForwardingEnabled := cfg.MeshProductionForwardingEnabled || loadedConfig.ProductionForwarding meshState.ProductionForwardingEnabled = productionForwardingEnabled - if !sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) && meshState.VPNFabricSessionPeers != nil { + if (!sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) || !samePeerEndpointCandidatesMap(meshState.PeerEndpointCandidates, loadedConfig.PeerEndpointCandidates)) && meshState.VPNFabricSessionPeers != nil { _ = meshState.VPNFabricSessionPeers.Close() meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) @@ -1705,6 +1707,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) + meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates) if productionForwardingEnabled { meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints) } else { @@ -4169,6 +4172,42 @@ func copyStringMap(values map[string]string) map[string]string { return out } +func samePeerEndpointCandidatesMap(left map[string][]mesh.PeerEndpointCandidate, right map[string][]mesh.PeerEndpointCandidate) bool { + if len(left) != len(right) { + return false + } + for key, leftValues := range left { + rightValues, ok := right[key] + if !ok || len(leftValues) != len(rightValues) { + return false + } + for index := range leftValues { + if leftValues[index].EndpointID != rightValues[index].EndpointID || + leftValues[index].Transport != rightValues[index].Transport || + leftValues[index].Address != rightValues[index].Address || + leftValues[index].Priority != rightValues[index].Priority { + return false + } + } + } + return true +} + +func copyPeerEndpointCandidatesMap(values map[string][]mesh.PeerEndpointCandidate) map[string][]mesh.PeerEndpointCandidate { + if len(values) == 0 { + return map[string][]mesh.PeerEndpointCandidate{} + } + out := make(map[string][]mesh.PeerEndpointCandidate, len(values)) + for nodeID, candidates := range values { + if len(candidates) == 0 { + out[nodeID] = nil + continue + } + out[nodeID] = append([]mesh.PeerEndpointCandidate(nil), candidates...) + } + return out +} + func minInt(left, right int) int { if left < right { return left @@ -4567,8 +4606,8 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" { return nil } - endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/") - if endpoint == "" { + target, ok := vpnFabricSessionTarget(meshState, nextHop) + if !ok { log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop) return nil } @@ -4580,15 +4619,12 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if meshState.VPNFabricTransport == nil { meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } - target := mesh.FabricTransportTarget{ - PeerID: nextHop, - Endpoint: endpoint, - Token: fabricSessionGatewayToken(identity, assignment, nextHop), - Timeout: 3 * time.Second, - OutboundBuffer: 256, - InboundBuffer: 256, - ErrorBuffer: 16, - } + target.PeerID = nextHop + target.Token = fabricSessionGatewayToken(identity, assignment, nextHop) + target.Timeout = 3 * time.Second + target.OutboundBuffer = 256 + target.InboundBuffer = 256 + target.ErrorBuffer = 16 carrier, target, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil) if err != nil { log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, err) @@ -4623,6 +4659,34 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st } } +func vpnFabricSessionTarget(meshState *syntheticMeshState, nextHop string) (mesh.FabricTransportTarget, bool) { + if meshState == nil { + return mesh.FabricTransportTarget{}, false + } + if candidates := meshState.PeerEndpointCandidates[nextHop]; len(candidates) > 0 { + ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{ + ChannelClass: mesh.SyntheticChannelFabricControl, + Now: time.Now().UTC(), + MaxVerificationAge: 5 * time.Minute, + }) + for _, item := range ranked { + endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/") + if endpoint == "" { + continue + } + return mesh.FabricTransportTarget{ + Endpoint: endpoint, + Transport: item.Candidate.Transport, + }, true + } + } + endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/") + if endpoint == "" { + return mesh.FabricTransportTarget{}, false + } + return mesh.FabricTransportTarget{Endpoint: endpoint}, true +} + func fabricSessionGatewayToken(identity state.Identity, assignment client.NodeVPNAssignment, nextHop string) string { tokenParts := []string{ "rap_fsn_vpn", diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index 0ad6ef3..d3c6293 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -755,6 +755,59 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { } } +func TestVPNFabricSessionTargetPrefersRankedQUICCandidate(t *testing.T) { + now := time.Now().UTC() + target, ok := vpnFabricSessionTarget(&syntheticMeshState{ + PeerEndpoints: map[string]string{ + "node-b": "https://node-b.example.test:443", + }, + PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{ + "node-b": { + { + EndpointID: "node-b-wss", + NodeID: "node-b", + Transport: "wss", + Address: "https://node-b.example.test:443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-quic", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + }, + }, + }, "node-b") + if !ok { + t.Fatal("target missing") + } + if target.Endpoint != "quic://node-b.example.test:19443" || target.Transport != "direct_quic" { + t.Fatalf("target = %+v, want direct quic candidate", target) + } +} + +func TestVPNFabricSessionTargetFallsBackToLegacyPeerEndpoint(t *testing.T) { + target, ok := vpnFabricSessionTarget(&syntheticMeshState{ + PeerEndpoints: map[string]string{ + "node-b": "https://node-b.example.test:443/", + }, + }, "node-b") + if !ok { + t.Fatal("target missing") + } + if target.Endpoint != "https://node-b.example.test:443" || target.Transport != "" { + t.Fatalf("target = %+v, want legacy endpoint fallback", target) + } +} + func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) { now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC) payload := heartbeatPayload(config.Config{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index f9cde82..5b812c1 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -310,6 +310,9 @@ server and requiring a `PING`/`PONG` round trip over `QUICFabricTransport`. Nodes now advertise enabled QUIC fabric listeners as `direct_quic` fast-path endpoint candidates, and endpoint ranking prefers QUIC over WebSocket/HTTPS compatibility candidates for fabric sessions. +VPN fabric-session gateway transport now consumes ranked endpoint candidates, +so dataplane sessions can select QUIC fast-path candidates and fall back to +legacy peer endpoints when the control plane has not published candidates yet. Deliverables: