Stabilize VPN farm startup path

This commit is contained in:
2026-05-15 10:31:29 +03:00
parent 96566cbe55
commit e3f21d591f
6 changed files with 113 additions and 16 deletions
@@ -4202,6 +4202,10 @@ func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity s
activeOwner := false activeOwner := false
for _, assignment := range assignments { for _, assignment := range assignments {
if assignment.AssignmentReason == "eligible_candidate" && assignment.DesiredState == "enabled" { if assignment.AssignmentReason == "eligible_candidate" && assignment.DesiredState == "enabled" {
if !vpnAssignmentLeaseAutoAcquireAllowed(identity.NodeID, assignment) {
log.Printf("vpn assignment lease auto-acquire skipped: vpn_connection_id=%s reason=local_node_is_not_selected_exit", assignment.VPNConnectionID)
continue
}
lease, err := api.AcquireNodeVPNAssignmentLease(ctx, identity.ClusterID, identity.NodeID, assignment.VPNConnectionID, client.NodeVPNAssignmentLeaseAcquireRequest{ lease, err := api.AcquireNodeVPNAssignmentLease(ctx, identity.ClusterID, identity.NodeID, assignment.VPNConnectionID, client.NodeVPNAssignmentLeaseAcquireRequest{
TTLSeconds: 300, TTLSeconds: 300,
Metadata: map[string]any{ Metadata: map[string]any{
@@ -4269,6 +4273,29 @@ func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity s
return nil return nil
} }
func vpnAssignmentLeaseAutoAcquireAllowed(localNodeID string, assignment client.NodeVPNAssignment) bool {
localNodeID = strings.TrimSpace(localNodeID)
if localNodeID == "" {
return false
}
var policy struct {
ExitNodeID string `json:"exit_node_id"`
ExitNodeIDs []string `json:"exit_node_ids"`
}
if len(assignment.PlacementPolicy) == 0 || json.Unmarshal(assignment.PlacementPolicy, &policy) != nil {
return true
}
if exitNodeID := strings.TrimSpace(policy.ExitNodeID); exitNodeID != "" {
return exitNodeID == localNodeID
}
for _, exitNodeID := range policy.ExitNodeIDs {
if strings.TrimSpace(exitNodeID) == localNodeID {
return true
}
}
return len(policy.ExitNodeIDs) == 0
}
func localGatewayTransportForAssignment(identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, _ *client.Client) vpnruntime.PacketTransport { func localGatewayTransportForAssignment(identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, _ *client.Client) vpnruntime.PacketTransport {
if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" { if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" {
return nil return nil
@@ -235,6 +235,35 @@ func TestLocalGatewayTransportForAssignmentUsesLocalInboxWithoutBackendFallback(
} }
} }
func TestVPNAssignmentLeaseAutoAcquireAllowedRequiresSelectedExit(t *testing.T) {
assignment := client.NodeVPNAssignment{
VPNConnectionID: "vpn-1",
PlacementPolicy: json.RawMessage(`{
"entry_node_ids":["entry-1"],
"exit_node_id":"exit-1"
}`),
}
if vpnAssignmentLeaseAutoAcquireAllowed("entry-1", assignment) {
t.Fatal("entry node must not auto-acquire the gateway lease")
}
if !vpnAssignmentLeaseAutoAcquireAllowed("exit-1", assignment) {
t.Fatal("selected exit node should auto-acquire the gateway lease")
}
}
func TestVPNAssignmentLeaseAutoAcquireAllowedSupportsExitPool(t *testing.T) {
assignment := client.NodeVPNAssignment{
VPNConnectionID: "vpn-1",
PlacementPolicy: json.RawMessage(`{"exit_node_ids":["exit-1","exit-2"]}`),
}
if !vpnAssignmentLeaseAutoAcquireAllowed("exit-2", assignment) {
t.Fatal("node from exit pool should auto-acquire the gateway lease")
}
if vpnAssignmentLeaseAutoAcquireAllowed("entry-1", assignment) {
t.Fatal("node outside exit pool must not auto-acquire the gateway lease")
}
}
type noopProductionForwardTransport struct{} type noopProductionForwardTransport struct{}
func (noopProductionForwardTransport) SendProduction(context.Context, string, mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { func (noopProductionForwardTransport) SendProduction(context.Context, string, mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) {
@@ -7,7 +7,7 @@ import (
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state"
) )
const Version = "0.2.269-vpnwsfarm" const Version = "0.2.271-vpnwsfarm"
func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest { func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest {
return client.EnrollRequest{ return client.EnrollRequest{
@@ -955,6 +955,9 @@ func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn
if sendErr != nil { if sendErr != nil {
if !backendFallbackAllowed { if !backendFallbackAllowed {
s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error()) s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error())
if isRetryableVPNPacketIngressError(sendErr) {
continue
}
return sendErr return sendErr
} }
if proxyErr := s.backendVPNPacketPost(ctx, clusterID, vpnConnectionID, payload); proxyErr != nil { if proxyErr := s.backendVPNPacketPost(ctx, clusterID, vpnConnectionID, payload); proxyErr != nil {
+2 -2
View File
@@ -30,8 +30,8 @@ android {
applicationId "su.cin.rapvpn" applicationId "su.cin.rapvpn"
minSdk 26 minSdk 26
targetSdk 35 targetSdk 35
versionCode 190 versionCode 191
versionName "0.2.190" versionName "0.2.191"
buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\"" buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\""
buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\"" buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\""
buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\"" buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\""
@@ -25,6 +25,7 @@ final class VpnPacketWebSocketRelay {
private static final int MAX_PACKET_BATCH_BYTES = 1024 * 1024; private static final int MAX_PACKET_BATCH_BYTES = 1024 * 1024;
private static final int MAX_SINGLE_PACKET_BYTES = 65535; private static final int MAX_SINGLE_PACKET_BYTES = 65535;
private static final long CONNECTING_STALE_MS = 8000; private static final long CONNECTING_STALE_MS = 8000;
private static final long OPEN_WAIT_MS = 3500;
private final String baseUrl; private final String baseUrl;
private final VpnService vpnService; private final VpnService vpnService;
@@ -123,8 +124,12 @@ final class VpnPacketWebSocketRelay {
return true; return true;
} }
connect(clusterId, vpnConnectionId); connect(clusterId, vpnConnectionId);
if (!awaitOpen(OPEN_WAIT_MS)) {
return false;
}
WebSocket socket = webSocket; WebSocket socket = webSocket;
if (socket == null || !open) { if (socket == null) {
lastError = "websocket missing after open";
return false; return false;
} }
byte[] payload = encodePacketBatch(packets); byte[] payload = encodePacketBatch(packets);
@@ -140,6 +145,7 @@ final class VpnPacketWebSocketRelay {
List<byte[]> receiveClientPacketBatch(String clusterId, String vpnConnectionId, int timeoutMs) throws InterruptedException { List<byte[]> receiveClientPacketBatch(String clusterId, String vpnConnectionId, int timeoutMs) throws InterruptedException {
connect(clusterId, vpnConnectionId); connect(clusterId, vpnConnectionId);
awaitOpen(Math.min(OPEN_WAIT_MS, Math.max(1, timeoutMs)));
int waitMs = Math.max(1, timeoutMs); int waitMs = Math.max(1, timeoutMs);
List<byte[]> packets = incoming.poll(waitMs, TimeUnit.MILLISECONDS); List<byte[]> packets = incoming.poll(waitMs, TimeUnit.MILLISECONDS);
return packets == null ? new ArrayList<>() : packets; return packets == null ? new ArrayList<>() : packets;
@@ -165,6 +171,29 @@ final class VpnPacketWebSocketRelay {
webSocket = null; webSocket = null;
} }
private boolean awaitOpen(long timeoutMs) {
long deadline = System.currentTimeMillis() + Math.max(1, timeoutMs);
synchronized (lock) {
while (!open && connecting) {
long waitMs = deadline - System.currentTimeMillis();
if (waitMs <= 0) {
break;
}
try {
lock.wait(waitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
lastError = "interrupted waiting for websocket open";
return false;
}
}
if (!open && "connecting".equals(lastError)) {
lastError = "connecting_timeout";
}
return open;
}
}
private String webSocketUrl(String clusterId, String vpnConnectionId) { private String webSocketUrl(String clusterId, String vpnConnectionId) {
try { try {
URI uri = URI.create(baseUrl); URI uri = URI.create(baseUrl);
@@ -187,10 +216,13 @@ final class VpnPacketWebSocketRelay {
private final class Listener extends WebSocketListener { private final class Listener extends WebSocketListener {
@Override @Override
public void onOpen(WebSocket webSocket, Response response) { public void onOpen(WebSocket webSocket, Response response) {
open = true; synchronized (lock) {
connecting = false; open = true;
reconnectAfterMs = 0; connecting = false;
lastError = ""; reconnectAfterMs = 0;
lastError = "";
lock.notifyAll();
}
Log.i(TAG, "vpn packet websocket opened " + baseUrl); Log.i(TAG, "vpn packet websocket opened " + baseUrl);
} }
@@ -208,22 +240,28 @@ final class VpnPacketWebSocketRelay {
@Override @Override
public void onClosed(WebSocket webSocket, int code, String reason) { public void onClosed(WebSocket webSocket, int code, String reason) {
open = false; synchronized (lock) {
connecting = false; open = false;
reconnectAfterMs = System.currentTimeMillis() + 1000; connecting = false;
lastError = "closed " + code + " " + reason; reconnectAfterMs = System.currentTimeMillis() + 1000;
lastError = "closed " + code + " " + reason;
lock.notifyAll();
}
} }
@Override @Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) { public void onFailure(WebSocket webSocket, Throwable t, Response response) {
open = false;
connecting = false;
reconnectAfterMs = System.currentTimeMillis() + 3000;
String responseStatus = ""; String responseStatus = "";
if (response != null) { if (response != null) {
responseStatus = " status=" + response.code(); responseStatus = " status=" + response.code();
} }
lastError = (t == null ? "websocket failure" : t.getClass().getSimpleName() + ": " + t.getMessage()) + responseStatus; synchronized (lock) {
open = false;
connecting = false;
reconnectAfterMs = System.currentTimeMillis() + 3000;
lastError = (t == null ? "websocket failure" : t.getClass().getSimpleName() + ": " + t.getMessage()) + responseStatus;
lock.notifyAll();
}
Log.w(TAG, "vpn packet websocket failed " + baseUrl + ": " + lastError); Log.w(TAG, "vpn packet websocket failed " + baseUrl + ": " + lastError);
} }
} }