Route VPN fabric sessions by endpoint candidates
This commit is contained in:
@@ -361,6 +361,7 @@ type syntheticMeshState struct {
|
|||||||
VPNFabricSessionPeers *mesh.FabricSessionPeerManager
|
VPNFabricSessionPeers *mesh.FabricSessionPeerManager
|
||||||
VPNFabricTransport *mesh.WebSocketFabricTransport
|
VPNFabricTransport *mesh.WebSocketFabricTransport
|
||||||
PeerEndpoints map[string]string
|
PeerEndpoints map[string]string
|
||||||
|
PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate
|
||||||
VPNGateway *vpnruntime.Gateway
|
VPNGateway *vpnruntime.Gateway
|
||||||
ServiceChannelAccessStats *fabricServiceChannelAccessStats
|
ServiceChannelAccessStats *fabricServiceChannelAccessStats
|
||||||
RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink
|
RemoteWorkspaceFrameSink *mesh.RemoteWorkspaceFrameProbeSink
|
||||||
@@ -815,6 +816,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c
|
|||||||
VPNFabricSessionPeers: vpnFabricSessionPeers,
|
VPNFabricSessionPeers: vpnFabricSessionPeers,
|
||||||
VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers),
|
VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers),
|
||||||
PeerEndpoints: copyStringMap(peerEndpoints),
|
PeerEndpoints: copyStringMap(peerEndpoints),
|
||||||
|
PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates),
|
||||||
VPNGateway: vpnGateway,
|
VPNGateway: vpnGateway,
|
||||||
ServiceChannelAccessStats: serviceChannelAccessStats,
|
ServiceChannelAccessStats: serviceChannelAccessStats,
|
||||||
RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink,
|
RemoteWorkspaceFrameSink: remoteWorkspaceFrameSink,
|
||||||
@@ -1693,7 +1695,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
|
|||||||
}
|
}
|
||||||
productionForwardingEnabled := cfg.MeshProductionForwardingEnabled || loadedConfig.ProductionForwarding
|
productionForwardingEnabled := cfg.MeshProductionForwardingEnabled || loadedConfig.ProductionForwarding
|
||||||
meshState.ProductionForwardingEnabled = productionForwardingEnabled
|
meshState.ProductionForwardingEnabled = productionForwardingEnabled
|
||||||
if !sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) && meshState.VPNFabricSessionPeers != nil {
|
if (!sameStringMap(meshState.PeerEndpoints, loadedConfig.PeerEndpoints) || !samePeerEndpointCandidatesMap(meshState.PeerEndpointCandidates, loadedConfig.PeerEndpointCandidates)) && meshState.VPNFabricSessionPeers != nil {
|
||||||
_ = meshState.VPNFabricSessionPeers.Close()
|
_ = meshState.VPNFabricSessionPeers.Close()
|
||||||
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
|
meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager()
|
||||||
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
|
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
|
||||||
@@ -1705,6 +1707,7 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i
|
|||||||
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
|
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
|
||||||
}
|
}
|
||||||
meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints)
|
meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints)
|
||||||
|
meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates)
|
||||||
if productionForwardingEnabled {
|
if productionForwardingEnabled {
|
||||||
meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints)
|
meshState.ProductionForwardTransport = mesh.NewHTTPProductionForwardTransport(loadedConfig.PeerEndpoints)
|
||||||
} else {
|
} else {
|
||||||
@@ -4169,6 +4172,42 @@ func copyStringMap(values map[string]string) map[string]string {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func samePeerEndpointCandidatesMap(left map[string][]mesh.PeerEndpointCandidate, right map[string][]mesh.PeerEndpointCandidate) bool {
|
||||||
|
if len(left) != len(right) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for key, leftValues := range left {
|
||||||
|
rightValues, ok := right[key]
|
||||||
|
if !ok || len(leftValues) != len(rightValues) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for index := range leftValues {
|
||||||
|
if leftValues[index].EndpointID != rightValues[index].EndpointID ||
|
||||||
|
leftValues[index].Transport != rightValues[index].Transport ||
|
||||||
|
leftValues[index].Address != rightValues[index].Address ||
|
||||||
|
leftValues[index].Priority != rightValues[index].Priority {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyPeerEndpointCandidatesMap(values map[string][]mesh.PeerEndpointCandidate) map[string][]mesh.PeerEndpointCandidate {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return map[string][]mesh.PeerEndpointCandidate{}
|
||||||
|
}
|
||||||
|
out := make(map[string][]mesh.PeerEndpointCandidate, len(values))
|
||||||
|
for nodeID, candidates := range values {
|
||||||
|
if len(candidates) == 0 {
|
||||||
|
out[nodeID] = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
out[nodeID] = append([]mesh.PeerEndpointCandidate(nil), candidates...)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func minInt(left, right int) int {
|
func minInt(left, right int) int {
|
||||||
if left < right {
|
if left < right {
|
||||||
return left
|
return left
|
||||||
@@ -4567,8 +4606,8 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
|
|||||||
if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" {
|
if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" || nextHop == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/")
|
target, ok := vpnFabricSessionTarget(meshState, nextHop)
|
||||||
if endpoint == "" {
|
if !ok {
|
||||||
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop)
|
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -4580,15 +4619,12 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
|
|||||||
if meshState.VPNFabricTransport == nil {
|
if meshState.VPNFabricTransport == nil {
|
||||||
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
|
meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers)
|
||||||
}
|
}
|
||||||
target := mesh.FabricTransportTarget{
|
target.PeerID = nextHop
|
||||||
PeerID: nextHop,
|
target.Token = fabricSessionGatewayToken(identity, assignment, nextHop)
|
||||||
Endpoint: endpoint,
|
target.Timeout = 3 * time.Second
|
||||||
Token: fabricSessionGatewayToken(identity, assignment, nextHop),
|
target.OutboundBuffer = 256
|
||||||
Timeout: 3 * time.Second,
|
target.InboundBuffer = 256
|
||||||
OutboundBuffer: 256,
|
target.ErrorBuffer = 16
|
||||||
InboundBuffer: 256,
|
|
||||||
ErrorBuffer: 16,
|
|
||||||
}
|
|
||||||
carrier, target, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil)
|
carrier, target, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, err)
|
log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, err)
|
||||||
@@ -4623,6 +4659,34 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func vpnFabricSessionTarget(meshState *syntheticMeshState, nextHop string) (mesh.FabricTransportTarget, bool) {
|
||||||
|
if meshState == nil {
|
||||||
|
return mesh.FabricTransportTarget{}, false
|
||||||
|
}
|
||||||
|
if candidates := meshState.PeerEndpointCandidates[nextHop]; len(candidates) > 0 {
|
||||||
|
ranked := mesh.RankPeerEndpointCandidates(candidates, mesh.EndpointCandidateScoreOptions{
|
||||||
|
ChannelClass: mesh.SyntheticChannelFabricControl,
|
||||||
|
Now: time.Now().UTC(),
|
||||||
|
MaxVerificationAge: 5 * time.Minute,
|
||||||
|
})
|
||||||
|
for _, item := range ranked {
|
||||||
|
endpoint := strings.TrimRight(strings.TrimSpace(item.Candidate.Address), "/")
|
||||||
|
if endpoint == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return mesh.FabricTransportTarget{
|
||||||
|
Endpoint: endpoint,
|
||||||
|
Transport: item.Candidate.Transport,
|
||||||
|
}, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
endpoint := strings.TrimRight(strings.TrimSpace(meshState.PeerEndpoints[nextHop]), "/")
|
||||||
|
if endpoint == "" {
|
||||||
|
return mesh.FabricTransportTarget{}, false
|
||||||
|
}
|
||||||
|
return mesh.FabricTransportTarget{Endpoint: endpoint}, true
|
||||||
|
}
|
||||||
|
|
||||||
func fabricSessionGatewayToken(identity state.Identity, assignment client.NodeVPNAssignment, nextHop string) string {
|
func fabricSessionGatewayToken(identity state.Identity, assignment client.NodeVPNAssignment, nextHop string) string {
|
||||||
tokenParts := []string{
|
tokenParts := []string{
|
||||||
"rap_fsn_vpn",
|
"rap_fsn_vpn",
|
||||||
|
|||||||
@@ -755,6 +755,59 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestVPNFabricSessionTargetPrefersRankedQUICCandidate(t *testing.T) {
|
||||||
|
now := time.Now().UTC()
|
||||||
|
target, ok := vpnFabricSessionTarget(&syntheticMeshState{
|
||||||
|
PeerEndpoints: map[string]string{
|
||||||
|
"node-b": "https://node-b.example.test:443",
|
||||||
|
},
|
||||||
|
PeerEndpointCandidates: map[string][]mesh.PeerEndpointCandidate{
|
||||||
|
"node-b": {
|
||||||
|
{
|
||||||
|
EndpointID: "node-b-wss",
|
||||||
|
NodeID: "node-b",
|
||||||
|
Transport: "wss",
|
||||||
|
Address: "https://node-b.example.test:443",
|
||||||
|
Reachability: "public",
|
||||||
|
ConnectivityMode: "direct",
|
||||||
|
Priority: 10,
|
||||||
|
LastVerifiedAt: &now,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
EndpointID: "node-b-quic",
|
||||||
|
NodeID: "node-b",
|
||||||
|
Transport: "direct_quic",
|
||||||
|
Address: "quic://node-b.example.test:19443",
|
||||||
|
Reachability: "public",
|
||||||
|
ConnectivityMode: "direct",
|
||||||
|
Priority: 10,
|
||||||
|
LastVerifiedAt: &now,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, "node-b")
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("target missing")
|
||||||
|
}
|
||||||
|
if target.Endpoint != "quic://node-b.example.test:19443" || target.Transport != "direct_quic" {
|
||||||
|
t.Fatalf("target = %+v, want direct quic candidate", target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVPNFabricSessionTargetFallsBackToLegacyPeerEndpoint(t *testing.T) {
|
||||||
|
target, ok := vpnFabricSessionTarget(&syntheticMeshState{
|
||||||
|
PeerEndpoints: map[string]string{
|
||||||
|
"node-b": "https://node-b.example.test:443/",
|
||||||
|
},
|
||||||
|
}, "node-b")
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("target missing")
|
||||||
|
}
|
||||||
|
if target.Endpoint != "https://node-b.example.test:443" || target.Transport != "" {
|
||||||
|
t.Fatalf("target = %+v, want legacy endpoint fallback", target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) {
|
func TestHeartbeatPayloadReportsMeshListenerFailureWithoutKillingHeartbeat(t *testing.T) {
|
||||||
now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC)
|
now := time.Date(2026, 4, 30, 9, 0, 0, 0, time.UTC)
|
||||||
payload := heartbeatPayload(config.Config{
|
payload := heartbeatPayload(config.Config{
|
||||||
|
|||||||
@@ -310,6 +310,9 @@ server and requiring a `PING`/`PONG` round trip over `QUICFabricTransport`.
|
|||||||
Nodes now advertise enabled QUIC fabric listeners as `direct_quic` fast-path
|
Nodes now advertise enabled QUIC fabric listeners as `direct_quic` fast-path
|
||||||
endpoint candidates, and endpoint ranking prefers QUIC over WebSocket/HTTPS
|
endpoint candidates, and endpoint ranking prefers QUIC over WebSocket/HTTPS
|
||||||
compatibility candidates for fabric sessions.
|
compatibility candidates for fabric sessions.
|
||||||
|
VPN fabric-session gateway transport now consumes ranked endpoint candidates,
|
||||||
|
so dataplane sessions can select QUIC fast-path candidates and fall back to
|
||||||
|
legacy peer endpoints when the control plane has not published candidates yet.
|
||||||
|
|
||||||
Deliverables:
|
Deliverables:
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user