diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index a99975b..efc904d 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -4202,6 +4202,10 @@ func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity s activeOwner := false for _, assignment := range assignments { if assignment.AssignmentReason == "eligible_candidate" && assignment.DesiredState == "enabled" { + if !vpnAssignmentLeaseAutoAcquireAllowed(identity.NodeID, assignment) { + log.Printf("vpn assignment lease auto-acquire skipped: vpn_connection_id=%s reason=local_node_is_not_selected_exit", assignment.VPNConnectionID) + continue + } lease, err := api.AcquireNodeVPNAssignmentLease(ctx, identity.ClusterID, identity.NodeID, assignment.VPNConnectionID, client.NodeVPNAssignmentLeaseAcquireRequest{ TTLSeconds: 300, Metadata: map[string]any{ @@ -4269,6 +4273,29 @@ func ensureVPNGatewayRuntime(ctx context.Context, api *client.Client, identity s return nil } +func vpnAssignmentLeaseAutoAcquireAllowed(localNodeID string, assignment client.NodeVPNAssignment) bool { + localNodeID = strings.TrimSpace(localNodeID) + if localNodeID == "" { + return false + } + var policy struct { + ExitNodeID string `json:"exit_node_id"` + ExitNodeIDs []string `json:"exit_node_ids"` + } + if len(assignment.PlacementPolicy) == 0 || json.Unmarshal(assignment.PlacementPolicy, &policy) != nil { + return true + } + if exitNodeID := strings.TrimSpace(policy.ExitNodeID); exitNodeID != "" { + return exitNodeID == localNodeID + } + for _, exitNodeID := range policy.ExitNodeIDs { + if strings.TrimSpace(exitNodeID) == localNodeID { + return true + } + } + return len(policy.ExitNodeIDs) == 0 +} + func localGatewayTransportForAssignment(identity state.Identity, assignment client.NodeVPNAssignment, meshState *syntheticMeshState, _ *client.Client) vpnruntime.PacketTransport { if meshState == nil || meshState.VPNFabricInbox == nil || assignment.VPNConnectionID == "" { return nil diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index fa6ff40..6fc62e0 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -235,6 +235,35 @@ func TestLocalGatewayTransportForAssignmentUsesLocalInboxWithoutBackendFallback( } } +func TestVPNAssignmentLeaseAutoAcquireAllowedRequiresSelectedExit(t *testing.T) { + assignment := client.NodeVPNAssignment{ + VPNConnectionID: "vpn-1", + PlacementPolicy: json.RawMessage(`{ + "entry_node_ids":["entry-1"], + "exit_node_id":"exit-1" + }`), + } + if vpnAssignmentLeaseAutoAcquireAllowed("entry-1", assignment) { + t.Fatal("entry node must not auto-acquire the gateway lease") + } + if !vpnAssignmentLeaseAutoAcquireAllowed("exit-1", assignment) { + t.Fatal("selected exit node should auto-acquire the gateway lease") + } +} + +func TestVPNAssignmentLeaseAutoAcquireAllowedSupportsExitPool(t *testing.T) { + assignment := client.NodeVPNAssignment{ + VPNConnectionID: "vpn-1", + PlacementPolicy: json.RawMessage(`{"exit_node_ids":["exit-1","exit-2"]}`), + } + if !vpnAssignmentLeaseAutoAcquireAllowed("exit-2", assignment) { + t.Fatal("node from exit pool should auto-acquire the gateway lease") + } + if vpnAssignmentLeaseAutoAcquireAllowed("entry-1", assignment) { + t.Fatal("node outside exit pool must not auto-acquire the gateway lease") + } +} + type noopProductionForwardTransport struct{} func (noopProductionForwardTransport) SendProduction(context.Context, string, mesh.ProductionEnvelope) (mesh.ProductionForwardResult, error) { diff --git a/agents/rap-node-agent/internal/agent/payload.go b/agents/rap-node-agent/internal/agent/payload.go index a9b198c..babc1d0 100644 --- a/agents/rap-node-agent/internal/agent/payload.go +++ b/agents/rap-node-agent/internal/agent/payload.go @@ -7,7 +7,7 @@ import ( "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" ) -const Version = "0.2.269-vpnwsfarm" +const Version = "0.2.271-vpnwsfarm" func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest { return client.EnrollRequest{ diff --git a/agents/rap-node-agent/internal/mesh/server.go b/agents/rap-node-agent/internal/mesh/server.go index a8f7963..8bd1a47 100644 --- a/agents/rap-node-agent/internal/mesh/server.go +++ b/agents/rap-node-agent/internal/mesh/server.go @@ -955,6 +955,9 @@ func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn if sendErr != nil { if !backendFallbackAllowed { s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error()) + if isRetryableVPNPacketIngressError(sendErr) { + continue + } return sendErr } if proxyErr := s.backendVPNPacketPost(ctx, clusterID, vpnConnectionID, payload); proxyErr != nil { diff --git a/clients/android/app/build.gradle b/clients/android/app/build.gradle index 85e7609..e673d21 100644 --- a/clients/android/app/build.gradle +++ b/clients/android/app/build.gradle @@ -30,8 +30,8 @@ android { applicationId "su.cin.rapvpn" minSdk 26 targetSdk 35 - versionCode 190 - versionName "0.2.190" + versionCode 191 + versionName "0.2.191" buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\"" buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\"" buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\"" diff --git a/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java b/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java index 6c10979..e8aca4d 100644 --- a/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java +++ b/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java @@ -25,6 +25,7 @@ final class VpnPacketWebSocketRelay { private static final int MAX_PACKET_BATCH_BYTES = 1024 * 1024; private static final int MAX_SINGLE_PACKET_BYTES = 65535; private static final long CONNECTING_STALE_MS = 8000; + private static final long OPEN_WAIT_MS = 3500; private final String baseUrl; private final VpnService vpnService; @@ -123,8 +124,12 @@ final class VpnPacketWebSocketRelay { return true; } connect(clusterId, vpnConnectionId); + if (!awaitOpen(OPEN_WAIT_MS)) { + return false; + } WebSocket socket = webSocket; - if (socket == null || !open) { + if (socket == null) { + lastError = "websocket missing after open"; return false; } byte[] payload = encodePacketBatch(packets); @@ -140,6 +145,7 @@ final class VpnPacketWebSocketRelay { List receiveClientPacketBatch(String clusterId, String vpnConnectionId, int timeoutMs) throws InterruptedException { connect(clusterId, vpnConnectionId); + awaitOpen(Math.min(OPEN_WAIT_MS, Math.max(1, timeoutMs))); int waitMs = Math.max(1, timeoutMs); List packets = incoming.poll(waitMs, TimeUnit.MILLISECONDS); return packets == null ? new ArrayList<>() : packets; @@ -165,6 +171,29 @@ final class VpnPacketWebSocketRelay { webSocket = null; } + private boolean awaitOpen(long timeoutMs) { + long deadline = System.currentTimeMillis() + Math.max(1, timeoutMs); + synchronized (lock) { + while (!open && connecting) { + long waitMs = deadline - System.currentTimeMillis(); + if (waitMs <= 0) { + break; + } + try { + lock.wait(waitMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + lastError = "interrupted waiting for websocket open"; + return false; + } + } + if (!open && "connecting".equals(lastError)) { + lastError = "connecting_timeout"; + } + return open; + } + } + private String webSocketUrl(String clusterId, String vpnConnectionId) { try { URI uri = URI.create(baseUrl); @@ -187,10 +216,13 @@ final class VpnPacketWebSocketRelay { private final class Listener extends WebSocketListener { @Override public void onOpen(WebSocket webSocket, Response response) { - open = true; - connecting = false; - reconnectAfterMs = 0; - lastError = ""; + synchronized (lock) { + open = true; + connecting = false; + reconnectAfterMs = 0; + lastError = ""; + lock.notifyAll(); + } Log.i(TAG, "vpn packet websocket opened " + baseUrl); } @@ -208,22 +240,28 @@ final class VpnPacketWebSocketRelay { @Override public void onClosed(WebSocket webSocket, int code, String reason) { - open = false; - connecting = false; - reconnectAfterMs = System.currentTimeMillis() + 1000; - lastError = "closed " + code + " " + reason; + synchronized (lock) { + open = false; + connecting = false; + reconnectAfterMs = System.currentTimeMillis() + 1000; + lastError = "closed " + code + " " + reason; + lock.notifyAll(); + } } @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { - open = false; - connecting = false; - reconnectAfterMs = System.currentTimeMillis() + 3000; String responseStatus = ""; if (response != null) { responseStatus = " status=" + response.code(); } - lastError = (t == null ? "websocket failure" : t.getClass().getSimpleName() + ": " + t.getMessage()) + responseStatus; + synchronized (lock) { + open = false; + connecting = false; + reconnectAfterMs = System.currentTimeMillis() + 3000; + lastError = (t == null ? "websocket failure" : t.getClass().getSimpleName() + ": " + t.getMessage()) + responseStatus; + lock.notifyAll(); + } Log.w(TAG, "vpn packet websocket failed " + baseUrl + ": " + lastError); } }