diff --git a/agents/rap-node-agent/internal/agent/payload.go b/agents/rap-node-agent/internal/agent/payload.go index 536b5fb..5817555 100644 --- a/agents/rap-node-agent/internal/agent/payload.go +++ b/agents/rap-node-agent/internal/agent/payload.go @@ -7,7 +7,7 @@ import ( "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" ) -const Version = "0.2.267-vpnfarmonly" +const Version = "0.2.268-vpnwsfarm" func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest { return client.EnrollRequest{ diff --git a/agents/rap-node-agent/internal/mesh/server.go b/agents/rap-node-agent/internal/mesh/server.go index 6c04211..4e9c848 100644 --- a/agents/rap-node-agent/internal/mesh/server.go +++ b/agents/rap-node-agent/internal/mesh/server.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -950,12 +951,7 @@ func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn } continue } - var sendErr error - if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok { - sendErr = classIngress.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, trafficClass, packets) - } else { - sendErr = s.VPNPacketIngress.SendClientPacketBatch(ctx, clusterID, vpnConnectionID, packets) - } + sendErr := s.sendVPNPacketWebSocketBatch(ctx, clusterID, vpnConnectionID, trafficClass, packets, !backendFallbackAllowed) if sendErr != nil { if !backendFallbackAllowed { s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error()) @@ -968,6 +964,44 @@ func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn } } +func (s Server) sendVPNPacketWebSocketBatch(ctx context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte, retryRouteErrors bool) error { + const maxAttempts = 6 + var lastErr error + for attempt := 0; attempt < maxAttempts; attempt++ { + if err := ctx.Err(); err != nil { + return err + } + var sendErr error + if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok { + sendErr = classIngress.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, trafficClass, packets) + } else { + sendErr = s.VPNPacketIngress.SendClientPacketBatch(ctx, clusterID, vpnConnectionID, packets) + } + if sendErr == nil { + return nil + } + lastErr = sendErr + if !retryRouteErrors || !isRetryableVPNPacketIngressError(sendErr) { + return sendErr + } + timer := time.NewTimer(time.Duration(75+attempt*50) * time.Millisecond) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } + return lastErr +} + +func isRetryableVPNPacketIngressError(err error) bool { + return errors.Is(err, ErrRouteNotFound) || + errors.Is(err, ErrForwardRuntimeUnavailable) || + errors.Is(err, ErrForwardPeerUnavailable) || + errors.Is(err, ErrSyntheticPeerUnavailable) +} + func (s Server) writeVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn, clusterID string, channelID string, vpnConnectionID string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) error { lastPing := time.Now() for { diff --git a/clients/android/app/build.gradle b/clients/android/app/build.gradle index b4f89c3..a4cf43e 100644 --- a/clients/android/app/build.gradle +++ b/clients/android/app/build.gradle @@ -30,8 +30,8 @@ android { applicationId "su.cin.rapvpn" minSdk 26 targetSdk 35 - versionCode 187 - versionName "0.2.187" + versionCode 188 + versionName "0.2.188" buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\"" buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\"" buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\"" diff --git a/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java b/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java index 4055ec3..47aec39 100644 --- a/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java +++ b/clients/android/app/src/main/java/su/cin/rapvpn/RapVpnService.java @@ -53,7 +53,7 @@ public class RapVpnService extends VpnService { private static final String PREFS = "rap-vpn-runtime"; private static final int DEFAULT_VPN_MTU = 1000; private static final int VPN_TCP_MSS_CLAMP = 900; - private static final boolean PACKET_WEBSOCKET_DATAPLANE_ENABLED = false; + private static final boolean PACKET_WEBSOCKET_DATAPLANE_ENABLED = true; private static final int VPN_BATCH_MAX_PACKETS = 512; private static final int VPN_BATCH_MAX_BYTES = 1024 * 1024; private static final int UPLINK_WORKER_MAX_COUNT = 1; @@ -603,6 +603,9 @@ public class RapVpnService extends VpnService { FabricServiceChannel channel = FabricServiceChannel.fromLease(serviceChannelLease); if (channel.enabled) { config.fabricServiceChannel = channel; + if (!channel.webSocketPathTemplate.isEmpty()) { + config.dataplaneSelectedTransport = "fabric_packet_websocket_v1"; + } config.configNotes.add("Fabric service channel enabled: " + channel.channelId); } } @@ -1040,6 +1043,9 @@ public class RapVpnService extends VpnService { configureBackendBypass(selectedRelayUrl); if (PACKET_WEBSOCKET_DATAPLANE_ENABLED) { startPacketWebSocketRelay(selectedRelayUrl, clusterId, vpnConnectionId); + if (activeFabricServiceChannel.enabled) { + writeRuntimeDetail("fabric_websocket_dataplane", "fabric websocket packet stream required; HTTP batch fallback disabled", "relay", 0, 0, "", -1); + } } else { writeRuntimeDetail("http_packet_batch", "packet websocket disabled; using confirmed HTTP batches", "relay", 0, 0, "", -1); } @@ -2481,6 +2487,18 @@ public class RapVpnService extends VpnService { if (sendUplinkBatchOverWebSocket(relayUrl, clusterId, vpnConnectionId, batch, workerIndex)) { return true; } + if (activeFabricServiceChannel.enabled) { + for (int attempt = 0; attempt <= UPLINK_TRANSIENT_RETRY_COUNT && running; attempt++) { + lastUplinkSendErrorMessage = "fabric websocket packet stream unavailable: " + lastWebSocketRelayError(); + writeRuntimeDetail("websocket_required_wait", "fabric websocket unavailable; HTTP batch fallback disabled relay=" + relayUrl + " attempt=" + attempt + " error=" + lastUplinkSendErrorMessage, "uplink_sender", -1, -1, "WEBSOCKET_REQUIRED", workerIndex); + sleepQuietly(Math.min(UPLINK_TRANSIENT_RETRY_MAX_SLEEP_MS, UPLINK_TRANSIENT_RETRY_SLEEP_MS * (attempt + 1L))); + if (sendUplinkBatchOverWebSocket(relayUrl, clusterId, vpnConnectionId, batch, workerIndex)) { + return true; + } + } + switchPacketRelayUrl(relayUrl, "websocket_unavailable"); + continue; + } RapApiClient client = packetRelayClientForUrl(relayUrl); int attempt = 0; while (running) { @@ -2516,6 +2534,21 @@ public class RapVpnService extends VpnService { return false; } + private String lastWebSocketRelayError() { + VpnPacketWebSocketRelay relay = packetWebSocketRelay; + if (relay == null) { + return "relay_not_started"; + } + String error = relay.lastError(); + if (error == null || error.isEmpty()) { + if (relay.isOpen()) { + return "open_no_error"; + } + return "not_open"; + } + return error; + } + private boolean isTransientUplinkSendError(Exception e) { String message = e == null ? null : e.getMessage(); if (message == null) { @@ -3003,6 +3036,10 @@ public class RapVpnService extends VpnService { } writeRuntimeDetail("websocket_receive_fallback", "websocket receive fallback " + relay.lastError(), "downlink", -1, -1, "WEBSOCKET_RECEIVE", -1); } + if (activeFabricServiceChannel.enabled) { + writeRuntimeDetail("websocket_receive_required", "fabric websocket unavailable; HTTP batch receive disabled " + lastWebSocketRelayError(), "downlink", -1, -1, "WEBSOCKET_REQUIRED", -1); + return new ArrayList<>(); + } return client.receiveClientPacketBatch(clusterId, vpnConnectionId, timeoutMs); }