Fallback across VPN fabric endpoint candidates
This commit is contained in:
@@ -4628,63 +4628,82 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
|
||||
if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" {
|
||||
return nil
|
||||
}
|
||||
target, ok := vpnFabricSessionTarget(meshState, nextHop)
|
||||
if !ok {
|
||||
targets := vpnFabricSessionTargets(meshState, nextHop)
|
||||
if len(targets) == 0 {
|
||||
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop)
|
||||
return nil
|
||||
}
|
||||
dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
if meshState.VPNFabricSessionPeers == nil {
|
||||
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
|
||||
}
|
||||
if meshState.VPNFabricTransport == nil {
|
||||
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
|
||||
}
|
||||
target.PeerID = nextHop
|
||||
target.Token = fabricSessionGatewayToken(identity, assignment, nextHop)
|
||||
target.Timeout = 3 * time.Second
|
||||
target.OutboundBuffer = 256
|
||||
target.InboundBuffer = 256
|
||||
target.ErrorBuffer = 16
|
||||
carrier, target, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil)
|
||||
if err != nil {
|
||||
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, err)
|
||||
return nil
|
||||
}
|
||||
session, err := carrier.Connect(dialCtx, target)
|
||||
if err != nil {
|
||||
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, err)
|
||||
return nil
|
||||
}
|
||||
streamID := uint64(time.Now().UnixNano())
|
||||
if streamID == 0 {
|
||||
streamID = 1
|
||||
}
|
||||
if err := session.Send(dialCtx, fabricproto.Frame{
|
||||
Type: fabricproto.FrameOpenStream,
|
||||
StreamID: streamID,
|
||||
TrafficClass: fabricproto.TrafficClassInteractive,
|
||||
}); err != nil {
|
||||
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, err)
|
||||
return nil
|
||||
}
|
||||
return &vpnruntime.FabricSessionPacketTransport{
|
||||
Sender: session,
|
||||
Receiver: session,
|
||||
Inbox: meshState.VPNFabricInbox,
|
||||
StreamID: streamID,
|
||||
VPNConnectionID: assignment.VPNConnectionID,
|
||||
SendDirection: vpnruntime.FabricDirectionGatewayToClient,
|
||||
ReceiveDirection: vpnruntime.FabricDirectionClientToGateway,
|
||||
TrafficClass: vpnruntime.FabricTrafficClassInteractive,
|
||||
token := fabricSessionGatewayToken(identity, assignment, nextHop)
|
||||
for index, target := range targets {
|
||||
dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
target.PeerID = nextHop
|
||||
target.Token = token
|
||||
target.Timeout = 3 * time.Second
|
||||
target.OutboundBuffer = 256
|
||||
target.InboundBuffer = 256
|
||||
target.ErrorBuffer = 16
|
||||
carrier, selectedTarget, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil)
|
||||
if err != nil {
|
||||
cancel()
|
||||
log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, index, target.Endpoint, target.Transport, err)
|
||||
continue
|
||||
}
|
||||
session, err := carrier.Connect(dialCtx, selectedTarget)
|
||||
if err != nil {
|
||||
cancel()
|
||||
log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err)
|
||||
continue
|
||||
}
|
||||
streamID := uint64(time.Now().UnixNano())
|
||||
if streamID == 0 {
|
||||
streamID = 1
|
||||
}
|
||||
if err := session.Send(dialCtx, fabricproto.Frame{
|
||||
Type: fabricproto.FrameOpenStream,
|
||||
StreamID: streamID,
|
||||
TrafficClass: fabricproto.TrafficClassInteractive,
|
||||
}); err != nil {
|
||||
cancel()
|
||||
_ = session.Close()
|
||||
log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err)
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
return &vpnruntime.FabricSessionPacketTransport{
|
||||
Sender: session,
|
||||
Receiver: session,
|
||||
Inbox: meshState.VPNFabricInbox,
|
||||
StreamID: streamID,
|
||||
VPNConnectionID: assignment.VPNConnectionID,
|
||||
SendDirection: vpnruntime.FabricDirectionGatewayToClient,
|
||||
ReceiveDirection: vpnruntime.FabricDirectionClientToGateway,
|
||||
TrafficClass: vpnruntime.FabricTrafficClassInteractive,
|
||||
}
|
||||
}
|
||||
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=all_candidates_failed candidates=%d", assignment.VPNConnectionID, nextHop, len(targets))
|
||||
return nil
|
||||
}
|
||||
|
||||
func vpnFabricSessionTarget(meshState *syntheticMeshState, nextHop string) (mesh.FabricTransportTarget, bool) {
|
||||
if meshState == nil {
|
||||
targets := vpnFabricSessionTargets(meshState, nextHop)
|
||||
if len(targets) == 0 {
|
||||
return mesh.FabricTransportTarget{}, false
|
||||
}
|
||||
return targets[0], true
|
||||
}
|
||||
|
||||
func vpnFabricSessionTargets(meshState *syntheticMeshState, nextHop string) []mesh.FabricTransportTarget {
|
||||
if meshState == nil {
|
||||
return nil
|
||||
}
|
||||
out := make([]mesh.FabricTransportTarget, 0, len(meshState.PeerEndpointCandidates[nextHop])+1)
|
||||
seen := map[string]struct{}{}
|
||||
if candidates := meshState.PeerEndpointCandidates[nextHop]; len(candidates) > 0 {
|
||||
ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{
|
||||
ChannelClass: mesh.SyntheticChannelFabricControl,
|
||||
@@ -4696,18 +4715,26 @@ func vpnFabricSessionTarget(meshState *syntheticMeshState, nextHop string) (mesh
|
||||
if endpoint == "" {
|
||||
continue
|
||||
}
|
||||
return mesh.FabricTransportTarget{
|
||||
key := item.Candidate.Transport + "\x00" + endpoint
|
||||
if _, ok := seen[key]; ok {
|
||||
continue
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
out = append(out, mesh.FabricTransportTarget{
|
||||
Endpoint: endpoint,
|
||||
Transport: item.Candidate.Transport,
|
||||
PeerCertSHA256: endpointCandidateTLSCertSHA256(item.Candidate),
|
||||
}, true
|
||||
})
|
||||
}
|
||||
}
|
||||
endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/")
|
||||
if endpoint == "" {
|
||||
return mesh.FabricTransportTarget{}, false
|
||||
if endpoint != "" {
|
||||
key := "\x00" + endpoint
|
||||
if _, ok := seen[key]; !ok {
|
||||
out = append(out, mesh.FabricTransportTarget{Endpoint: endpoint})
|
||||
}
|
||||
}
|
||||
return mesh.FabricTransportTarget{Endpoint: endpoint}, true
|
||||
return out
|
||||
}
|
||||
|
||||
func endpointCandidateTLSCertSHA256(candidate mesh.PeerEndpointCandidate) string {
|
||||
|
||||
@@ -809,6 +809,45 @@ func TestVPNFabricSessionTargetFallsBackToLegacyPeerEndpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestVPNFabricSessionTargetsIncludeRankedCandidatesThenLegacyFallback(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
targets := vpnFabricSessionTargets(&syntheticMeshState{
|
||||
PeerEndpoints: map[string]string{
|
||||
"node-b": "https://node-b-legacy.example.test:443/",
|
||||
},
|
||||
PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{
|
||||
"node-b": {
|
||||
{
|
||||
EndpointID: "node-b-wss",
|
||||
NodeID: "node-b",
|
||||
Transport: "wss",
|
||||
Address: "https://node-b.example.test:443",
|
||||
Reachability: "public",
|
||||
ConnectivityMode: "direct",
|
||||
Priority: 10,
|
||||
LastVerifiedAt: &now,
|
||||
},
|
||||
{
|
||||
EndpointID: "node-b-quic",
|
||||
NodeID: "node-b",
|
||||
Transport: "direct_quic",
|
||||
Address: "quic://node-b.example.test:19443",
|
||||
Reachability: "public",
|
||||
ConnectivityMode: "direct",
|
||||
Priority: 10,
|
||||
LastVerifiedAt: &now,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, "node-b")
|
||||
if len(targets) != 3 {
|
||||
t.Fatalf("target count = %d, want 3: %+v", len(targets), targets)
|
||||
}
|
||||
if targets[0].Transport != "direct_quic" || targets[1].Transport != "wss" || targets[2].Endpoint != "https://node-b-legacy.example.test:443" {
|
||||
t.Fatalf("targets not ordered by ranked candidates then fallback: %+v", targets)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) {
|
||||
now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC)
|
||||
payload := heartbeatPayload(config.Config{
|
||||
|
||||
Reference in New Issue
Block a user