From 2cb6005cd32111355f0e6173def9af385c48f873 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Fri, 15 May 2026 09:33:31 +0300 Subject: [PATCH] Stabilize VPN farm WebSocket dataplane --- .../rap-node-agent/internal/agent/payload.go | 2 +- agents/rap-node-agent/internal/mesh/server.go | 31 ++++++++++++++++++- .../modules/cluster/postgres_store.go | 7 ++--- .../modules/cluster/postgres_store_test.go | 29 +++++++++++++++-- clients/android/app/build.gradle | 4 +-- .../java/su/cin/rapvpn/RapVpnService.java | 2 +- .../cin/rapvpn/VpnPacketWebSocketRelay.java | 7 ++++- 7 files changed, 68 insertions(+), 14 deletions(-) diff --git a/agents/rap-node-agent/internal/agent/payload.go b/agents/rap-node-agent/internal/agent/payload.go index 5817555..a9b198c 100644 --- a/agents/rap-node-agent/internal/agent/payload.go +++ b/agents/rap-node-agent/internal/agent/payload.go @@ -7,7 +7,7 @@ import ( "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" ) -const Version = "0.2.268-vpnwsfarm" +const Version = "0.2.269-vpnwsfarm" func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest { return client.EnrollRequest{ diff --git a/agents/rap-node-agent/internal/mesh/server.go b/agents/rap-node-agent/internal/mesh/server.go index 4e9c848..a8f7963 100644 --- a/agents/rap-node-agent/internal/mesh/server.go +++ b/agents/rap-node-agent/internal/mesh/server.go @@ -1002,6 +1002,35 @@ func isRetryableVPNPacketIngressError(err error) bool { errors.Is(err, ErrSyntheticPeerUnavailable) } +func (s Server) receiveVPNPacketWebSocketBatch(ctx context.Context, clusterID string, vpnConnectionID string, timeout time.Duration, retryRouteErrors bool) ([][]byte, error) { + const maxAttempts = 4 + var lastErr error + for attempt := 0; attempt < maxAttempts; attempt++ { + if err := ctx.Err(); err != nil { + return nil, err + } + packets, err := s.VPNPacketIngress.ReceiveClientPacketBatch(ctx, clusterID, vpnConnectionID, timeout) + if err == nil { + return packets, nil + } + lastErr = err + if !retryRouteErrors || !isRetryableVPNPacketIngressError(err) { + return nil, err + } + timer := time.NewTimer(time.Duration(75+attempt*50) * time.Millisecond) + select { + case <-ctx.Done(): + timer.Stop() + return nil, ctx.Err() + case <-timer.C: + } + } + if retryRouteErrors && isRetryableVPNPacketIngressError(lastErr) { + return nil, nil + } + return nil, lastErr +} + func (s Server) writeVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn, clusterID string, channelID string, vpnConnectionID string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) error { lastPing := time.Now() for { @@ -1013,7 +1042,7 @@ func (s Server) writeVPNPacketWebSocket(ctx context.Context, conn *websocket.Con var packets [][]byte var err error if !forceBackendFallback { - packets, err = s.VPNPacketIngress.ReceiveClientPacketBatch(ctx, clusterID, vpnConnectionID, 50*time.Millisecond) + packets, err = s.receiveVPNPacketWebSocketBatch(ctx, clusterID, vpnConnectionID, 50*time.Millisecond, !backendFallbackAllowed) } if forceBackendFallback && !backendFallbackAllowed { s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "backend_fallback_blocked_by_policy", ErrRouteNotFound.Error()) diff --git a/backend/internal/modules/cluster/postgres_store.go b/backend/internal/modules/cluster/postgres_store.go index 3cd63f8..27f2c9e 100644 --- a/backend/internal/modules/cluster/postgres_store.go +++ b/backend/internal/modules/cluster/postgres_store.go @@ -5101,13 +5101,10 @@ func enrichVPNClientFabricRoute(item VPNClientConnection, preferredEntryNodeID, preferredEntryNodeID = strings.TrimSpace(preferredEntryNodeID) selectedEntry := selectPreferredNode(entryPool, preferredEntryNodeID) - selectedExit := "" - if item.ActiveLease != nil && item.ActiveLease.OwnerNodeID != "" { + selectedExit := selectPreferredNode(exitPool, preferredExitNodeID) + if selectedExit == "" && item.ActiveLease != nil && item.ActiveLease.OwnerNodeID != "" { selectedExit = item.ActiveLease.OwnerNodeID } - if selectedExit == "" { - selectedExit = selectPreferredNode(exitPool, preferredExitNodeID) - } status := "waiting_for_entry_and_exit" switch { case selectedEntry != "" && selectedExit != "": diff --git a/backend/internal/modules/cluster/postgres_store_test.go b/backend/internal/modules/cluster/postgres_store_test.go index 23dc2ad..da871bc 100644 --- a/backend/internal/modules/cluster/postgres_store_test.go +++ b/backend/internal/modules/cluster/postgres_store_test.go @@ -33,7 +33,7 @@ func TestMeshLatestObservationKeyDefaults(t *testing.T) { } } -func TestEnrichVPNClientFabricRoutePrefersPlacementEntryAndActiveExit(t *testing.T) { +func TestEnrichVPNClientFabricRoutePrefersPlacementEntryAndPolicyExit(t *testing.T) { item := VPNClientConnection{ AllowedNodeIDs: []string{"node-a", "node-b", "node-b"}, EntryNodeIDs: []string{"entry-1", "entry-2"}, @@ -55,7 +55,7 @@ func TestEnrichVPNClientFabricRoutePrefersPlacementEntryAndActiveExit(t *testing if route["preferred_data_plane"] != "fabric_service_channel" || route["fallback_data_plane"] != "none" || route["backend_relay_fallback"] != false { t.Fatalf("unexpected data-plane route contract: %#v", route) } - if route["selected_entry_node_id"] != "entry-2" || route["selected_exit_node_id"] != "exit-active" { + if route["selected_entry_node_id"] != "entry-2" || route["selected_exit_node_id"] != "exit-policy" { t.Fatalf("unexpected selected route endpoints: %#v", route) } if route["route_candidate_count"].(float64) != 8 { @@ -63,7 +63,7 @@ func TestEnrichVPNClientFabricRoutePrefersPlacementEntryAndActiveExit(t *testing } candidates := route["route_candidates"].([]any) firstCandidate := candidates[0].(map[string]any) - if firstCandidate["role"] != "preferred" || firstCandidate["entry_node_id"] != "entry-2" || firstCandidate["exit_node_id"] != "exit-active" { + if firstCandidate["role"] != "preferred" || firstCandidate["entry_node_id"] != "entry-2" || firstCandidate["exit_node_id"] != "exit-policy" { t.Fatalf("preferred route candidate = %#v", firstCandidate) } entryPool := route["entry_pool_node_ids"].([]any) @@ -114,6 +114,29 @@ func TestEnrichVPNClientFabricRoutePrefersExplicitExit(t *testing.T) { } } +func TestEnrichVPNClientFabricRouteUsesActiveLeaseWhenNoPolicyExit(t *testing.T) { + item := VPNClientConnection{ + AllowedNodeIDs: []string{"node-a", "node-b"}, + EntryNodeIDs: []string{"entry-1"}, + ActiveLease: &NodeVPNAssignmentLease{ + OwnerNodeID: "node-b", + }, + ClientConfig: json.RawMessage(`{"routes":["0.0.0.0/0"]}`), + } + + var cfg map[string]any + if err := json.Unmarshal(enrichVPNClientFabricRoute(item, "entry-1", ""), &cfg); err != nil { + t.Fatalf("unmarshal enriched config: %v", err) + } + route, ok := cfg["vpn_fabric_route"].(map[string]any) + if !ok { + t.Fatalf("missing vpn_fabric_route in %#v", cfg) + } + if route["selected_exit_node_id"] != "node-b" { + t.Fatalf("unexpected selected exit: %#v", route["selected_exit_node_id"]) + } +} + func TestEnrichVPNClientEntryEndpointCandidatesAddsReportedEntryAPI(t *testing.T) { item := VPNClientConnection{ EntryNodeIDs: []string{"entry-1"}, diff --git a/clients/android/app/build.gradle b/clients/android/app/build.gradle index a4cf43e..7886e1b 100644 --- a/clients/android/app/build.gradle +++ b/clients/android/app/build.gradle @@ -30,8 +30,8 @@ android { applicationId "su.cin.rapvpn" minSdk 26 targetSdk 35 - versionCode 188 - versionName "0.2.188" + versionCode 189 + versionName "0.2.189" buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\"" buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\"" buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\"" diff --git a/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java b/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java index 47aec39..af104e5 100644 --- a/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java +++ b/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java @@ -1190,7 +1190,7 @@ public class RapVpnService extends VpnService { private void startPacketWebSocketRelay(String relayUrl, String clusterId, String vpnConnectionId) { try { VpnPacketWebSocketRelay old = packetWebSocketRelay; - if (old != null && !relayUrl.equals(old.baseUrl())) { + if (old != null && (!relayUrl.equals(old.baseUrl()) || !old.isOpen())) { old.close(); packetWebSocketRelay = null; } diff --git a/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java b/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java index a6868ba..ece1810 100644 --- a/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java +++ b/clients/android/app/src/main/java/su/cin/rapvpn/VpnPacketWebSocketRelay.java @@ -105,6 +105,7 @@ final class VpnPacketWebSocketRelay { Request.Builder requestBuilder = new Request.Builder().url(wsUrl); this.fabricServiceChannel.applyHeaders(requestBuilder); Request request = requestBuilder.build(); + lastError = "connecting"; webSocket = httpClient.newWebSocket(request, new Listener()); } } @@ -210,7 +211,11 @@ final class VpnPacketWebSocketRelay { open = false; connecting = false; reconnectAfterMs = System.currentTimeMillis() + 3000; - lastError = t == null ? "websocket failure" : t.getClass().getSimpleName() + ": " + t.getMessage(); + String responseStatus = ""; + if (response != null) { + responseStatus = " status=" + response.code(); + } + lastError = (t == null ? "websocket failure" : t.getClass().getSimpleName() + ": " + t.getMessage()) + responseStatus; Log.w(TAG, "vpn packet websocket failed " + baseUrl + ": " + lastError); } }