From 850bd67b6abf11442ceeb4d632362e66a08ac1ec Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 10:54:11 +0300 Subject: [PATCH] Fallback across VPN fabric endpoint candidates --- .../rap-node-agent/cmd/rap-node-agent/main.go | 121 +++++++++++------- .../cmd/rap-node-agent/main_test.go | 39 ++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 3 + 3 files changed, 116 insertions(+), 47 deletions(-) diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index 9af9e27..5e1bbec 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -4628,63 +4628,82 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" { return nil } - target, ok := vpnFabricSessionTarget(meshState, nextHop) - if !ok { + targets := vpnFabricSessionTargets(meshState, nextHop) + if len(targets) == 0 { log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop) return nil } - dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() if meshState.VPNFabricSessionPeers == nil { meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() } if meshState.VPNFabricTransport == nil { meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } - target.PeerID = nextHop - target.Token = fabricSessionGatewayToken(identity, assignment, nextHop) - target.Timeout = 3 * time.Second - target.OutboundBuffer = 256 - target.InboundBuffer = 256 - target.ErrorBuffer = 16 - carrier, target, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil) - if err != nil { - log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, err) - return nil - } - session, err := carrier.Connect(dialCtx, target) - if err != nil { - log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err) - return nil - } - streamID := uint64(time.Now().UnixNano()) - if streamID == 0 { - streamID = 1 - } - if err := session.Send(dialCtx, fabricproto.Frame{ - Type: fabricproto.FrameOpenStream, - StreamID: streamID, - TrafficClass: fabricproto.TrafficClassInteractive, - }); err != nil { - log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, err) - return nil - } - return &vpnruntime.FabricSessionPacketTransport{ - Sender: session, - Receiver: session, - Inbox: meshState.VPNFabricInbox, - StreamID: streamID, - VPNConnectionID: assignment.VPNConnectionID, - SendDirection: vpnruntime.FabricDirectionGatewayToClient, - ReceiveDirection: vpnruntime.FabricDirectionClientToGateway, - TrafficClass: vpnruntime.FabricTrafficClassInteractive, + token := fabricSessionGatewayToken(identity, assignment, nextHop) + for index, target := range targets { + dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + target.PeerID = nextHop + target.Token = token + target.Timeout = 3 * time.Second + target.OutboundBuffer = 256 + target.InboundBuffer = 256 + target.ErrorBuffer = 16 + carrier, selectedTarget, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil) + if err != nil { + cancel() + log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, index, target.Endpoint, target.Transport, err) + continue + } + session, err := carrier.Connect(dialCtx, selectedTarget) + if err != nil { + cancel() + log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) + continue + } + streamID := uint64(time.Now().UnixNano()) + if streamID == 0 { + streamID = 1 + } + if err := session.Send(dialCtx, fabricproto.Frame{ + Type: fabricproto.FrameOpenStream, + StreamID: streamID, + TrafficClass: fabricproto.TrafficClassInteractive, + }); err != nil { + cancel() + _ = session.Close() + log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) + continue + } + cancel() + return &vpnruntime.FabricSessionPacketTransport{ + Sender: session, + Receiver: session, + Inbox: meshState.VPNFabricInbox, + StreamID: streamID, + VPNConnectionID: assignment.VPNConnectionID, + SendDirection: vpnruntime.FabricDirectionGatewayToClient, + ReceiveDirection: vpnruntime.FabricDirectionClientToGateway, + TrafficClass: vpnruntime.FabricTrafficClassInteractive, + } } + log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=all_candidates_failed candidates=%d", assignment.VPNConnectionID, nextHop, len(targets)) + return nil } func vpnFabricSessionTarget(meshState *syntheticMeshState, nextHop string) (mesh.FabricTransportTarget, bool) { - if meshState == nil { + targets := vpnFabricSessionTargets(meshState, nextHop) + if len(targets) == 0 { return mesh.FabricTransportTarget{}, false } + return targets[0], true +} + +func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []mesh.FabricTransportTarget { + if meshState == nil { + return nil + } + out := make([]mesh.FabricTransportTarget, 0, len(meshState.PeerEndpointCandidates[nextHop])+1) + seen := map[string]struct{}{} if candidates := meshState.PeerEndpointCandidates[nextHop]; len(candidates) > 0 { ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{ ChannelClass: mesh.SyntheticChannelFabricControl, @@ -4696,18 +4715,26 @@ func vpnFabricSessionTarget(meshState *syntheticMeshState, nextHop string) (mesh if endpoint == "" { continue } - return mesh.FabricTransportTarget{ + key := item.Candidate.Transport + "\x00" + endpoint + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + out = append(out, mesh.FabricTransportTarget{ Endpoint: endpoint, Transport: item.Candidate.Transport, PeerCertSHA256: endpointCandidateTLSCertSHA256(item.Candidate), - }, true + }) } } endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/") - if endpoint == "" { - return mesh.FabricTransportTarget{}, false + if endpoint != "" { + key := "\x00" + endpoint + if _, ok := seen[key]; !ok { + out = append(out, mesh.FabricTransportTarget{Endpoint: endpoint}) + } } - return mesh.FabricTransportTarget{Endpoint: endpoint}, true + return out } func endpointCandidateTLSCertSHA256(candidate mesh.PeerEndpointCandidate) string { diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index d4bfa00..eba5f81 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -809,6 +809,45 @@ func TestVPNFabricSessionTargetFallsBackToLegacyPeerEndpoint(t *testing.T) { } } +func TestVPNFabricSessionTargetsIncludeRankedCandidatesThenLegacyFallback(t *testing.T) { + now := time.Now().UTC() + targets := vpnFabricSessionTargets(&syntheticMeshState{ + PeerEndpoints: map[string]string{ + "node-b": "https://node-b-legacy.example.test:443/", + }, + PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{ + "node-b": { + { + EndpointID: "node-b-wss", + NodeID: "node-b", + Transport: "wss", + Address: "https://node-b.example.test:443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + { + EndpointID: "node-b-quic", + NodeID: "node-b", + Transport: "direct_quic", + Address: "quic://node-b.example.test:19443", + Reachability: "public", + ConnectivityMode: "direct", + Priority: 10, + LastVerifiedAt: &now, + }, + }, + }, + }, "node-b") + if len(targets) != 3 { + t.Fatalf("target count = %d, want 3: %+v", len(targets), targets) + } + if targets[0].Transport != "direct_quic" || targets[1].Transport != "wss" || targets[2].Endpoint != "https://node-b-legacy.example.test:443" { + t.Fatalf("targets not ordered by ranked candidates then fallback: %+v", targets) + } +} + func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) { now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC) payload := heartbeatPayload(config.Config{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 6b1725a..3b9e9c7 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -316,6 +316,9 @@ legacy peer endpoints when the control plane has not published candidates yet. The temporary self-signed QUIC listener advertises its SHA-256 certificate fingerprint in endpoint metadata, and the QUIC client can pin that fingerprint instead of disabling verification while the cluster CA path is being finished. +VPN fabric-session dialing now walks all ranked endpoint candidates before +falling back to the legacy peer endpoint, so a failed QUIC candidate does not +block WebSocket/HTTPS compatibility transport. Deliverables: