Use fabric WebSocket dataplane for Android VPN
This commit is contained in:
@@ -7,7 +7,7 @@ import (
|
||||
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/state"
|
||||
)
|
||||
|
||||
const Version = "0.2.267-vpnfarmonly"
|
||||
const Version = "0.2.268-vpnwsfarm"
|
||||
|
||||
func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest {
|
||||
return client.EnrollRequest{
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -950,12 +951,7 @@ func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn
|
||||
}
|
||||
continue
|
||||
}
|
||||
var sendErr error
|
||||
if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok {
|
||||
sendErr = classIngress.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, trafficClass, packets)
|
||||
} else {
|
||||
sendErr = s.VPNPacketIngress.SendClientPacketBatch(ctx, clusterID, vpnConnectionID, packets)
|
||||
}
|
||||
sendErr := s.sendVPNPacketWebSocketBatch(ctx, clusterID, vpnConnectionID, trafficClass, packets, !backendFallbackAllowed)
|
||||
if sendErr != nil {
|
||||
if !backendFallbackAllowed {
|
||||
s.logFabricServiceChannelViolation(nil, clusterID, channelID, vpnConnectionID, backendRelayPolicy, "fabric_route_send_failed_backend_fallback_blocked", sendErr.Error())
|
||||
@@ -968,6 +964,44 @@ func (s Server) readVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn
|
||||
}
|
||||
}
|
||||
|
||||
func (s Server) sendVPNPacketWebSocketBatch(ctx context.Context, clusterID string, vpnConnectionID string, trafficClass string, packets [][]byte, retryRouteErrors bool) error {
|
||||
const maxAttempts = 6
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < maxAttempts; attempt++ {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
var sendErr error
|
||||
if classIngress, ok := s.VPNPacketIngress.(VPNPacketIngressTrafficClass); ok {
|
||||
sendErr = classIngress.SendClientPacketBatchWithTrafficClass(ctx, clusterID, vpnConnectionID, trafficClass, packets)
|
||||
} else {
|
||||
sendErr = s.VPNPacketIngress.SendClientPacketBatch(ctx, clusterID, vpnConnectionID, packets)
|
||||
}
|
||||
if sendErr == nil {
|
||||
return nil
|
||||
}
|
||||
lastErr = sendErr
|
||||
if !retryRouteErrors || !isRetryableVPNPacketIngressError(sendErr) {
|
||||
return sendErr
|
||||
}
|
||||
timer := time.NewTimer(time.Duration(75+attempt*50) * time.Millisecond)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func isRetryableVPNPacketIngressError(err error) bool {
|
||||
return errors.Is(err, ErrRouteNotFound) ||
|
||||
errors.Is(err, ErrForwardRuntimeUnavailable) ||
|
||||
errors.Is(err, ErrForwardPeerUnavailable) ||
|
||||
errors.Is(err, ErrSyntheticPeerUnavailable)
|
||||
}
|
||||
|
||||
func (s Server) writeVPNPacketWebSocket(ctx context.Context, conn *websocket.Conn, clusterID string, channelID string, vpnConnectionID string, forceBackendFallback bool, backendFallbackAllowed bool, backendRelayPolicy string) error {
|
||||
lastPing := time.Now()
|
||||
for {
|
||||
|
||||
@@ -30,8 +30,8 @@ android {
|
||||
applicationId "su.cin.rapvpn"
|
||||
minSdk 26
|
||||
targetSdk 35
|
||||
versionCode 187
|
||||
versionName "0.2.187"
|
||||
versionCode 188
|
||||
versionName "0.2.188"
|
||||
buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\""
|
||||
buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\""
|
||||
buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\""
|
||||
|
||||
@@ -53,7 +53,7 @@ public class RapVpnService extends VpnService {
|
||||
private static final String PREFS = "rap-vpn-runtime";
|
||||
private static final int DEFAULT_VPN_MTU = 1000;
|
||||
private static final int VPN_TCP_MSS_CLAMP = 900;
|
||||
private static final boolean PACKET_WEBSOCKET_DATAPLANE_ENABLED = false;
|
||||
private static final boolean PACKET_WEBSOCKET_DATAPLANE_ENABLED = true;
|
||||
private static final int VPN_BATCH_MAX_PACKETS = 512;
|
||||
private static final int VPN_BATCH_MAX_BYTES = 1024 * 1024;
|
||||
private static final int UPLINK_WORKER_MAX_COUNT = 1;
|
||||
@@ -603,6 +603,9 @@ public class RapVpnService extends VpnService {
|
||||
FabricServiceChannel channel = FabricServiceChannel.fromLease(serviceChannelLease);
|
||||
if (channel.enabled) {
|
||||
config.fabricServiceChannel = channel;
|
||||
if (!channel.webSocketPathTemplate.isEmpty()) {
|
||||
config.dataplaneSelectedTransport = "fabric_packet_websocket_v1";
|
||||
}
|
||||
config.configNotes.add("Fabric service channel enabled: " + channel.channelId);
|
||||
}
|
||||
}
|
||||
@@ -1040,6 +1043,9 @@ public class RapVpnService extends VpnService {
|
||||
configureBackendBypass(selectedRelayUrl);
|
||||
if (PACKET_WEBSOCKET_DATAPLANE_ENABLED) {
|
||||
startPacketWebSocketRelay(selectedRelayUrl, clusterId, vpnConnectionId);
|
||||
if (activeFabricServiceChannel.enabled) {
|
||||
writeRuntimeDetail("fabric_websocket_dataplane", "fabric websocket packet stream required; HTTP batch fallback disabled", "relay", 0, 0, "", -1);
|
||||
}
|
||||
} else {
|
||||
writeRuntimeDetail("http_packet_batch", "packet websocket disabled; using confirmed HTTP batches", "relay", 0, 0, "", -1);
|
||||
}
|
||||
@@ -2481,6 +2487,18 @@ public class RapVpnService extends VpnService {
|
||||
if (sendUplinkBatchOverWebSocket(relayUrl, clusterId, vpnConnectionId, batch, workerIndex)) {
|
||||
return true;
|
||||
}
|
||||
if (activeFabricServiceChannel.enabled) {
|
||||
for (int attempt = 0; attempt <= UPLINK_TRANSIENT_RETRY_COUNT && running; attempt++) {
|
||||
lastUplinkSendErrorMessage = "fabric websocket packet stream unavailable: " + lastWebSocketRelayError();
|
||||
writeRuntimeDetail("websocket_required_wait", "fabric websocket unavailable; HTTP batch fallback disabled relay=" + relayUrl + " attempt=" + attempt + " error=" + lastUplinkSendErrorMessage, "uplink_sender", -1, -1, "WEBSOCKET_REQUIRED", workerIndex);
|
||||
sleepQuietly(Math.min(UPLINK_TRANSIENT_RETRY_MAX_SLEEP_MS, UPLINK_TRANSIENT_RETRY_SLEEP_MS * (attempt + 1L)));
|
||||
if (sendUplinkBatchOverWebSocket(relayUrl, clusterId, vpnConnectionId, batch, workerIndex)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
switchPacketRelayUrl(relayUrl, "websocket_unavailable");
|
||||
continue;
|
||||
}
|
||||
RapApiClient client = packetRelayClientForUrl(relayUrl);
|
||||
int attempt = 0;
|
||||
while (running) {
|
||||
@@ -2516,6 +2534,21 @@ public class RapVpnService extends VpnService {
|
||||
return false;
|
||||
}
|
||||
|
||||
private String lastWebSocketRelayError() {
|
||||
VpnPacketWebSocketRelay relay = packetWebSocketRelay;
|
||||
if (relay == null) {
|
||||
return "relay_not_started";
|
||||
}
|
||||
String error = relay.lastError();
|
||||
if (error == null || error.isEmpty()) {
|
||||
if (relay.isOpen()) {
|
||||
return "open_no_error";
|
||||
}
|
||||
return "not_open";
|
||||
}
|
||||
return error;
|
||||
}
|
||||
|
||||
private boolean isTransientUplinkSendError(Exception e) {
|
||||
String message = e == null ? null : e.getMessage();
|
||||
if (message == null) {
|
||||
@@ -3003,6 +3036,10 @@ public class RapVpnService extends VpnService {
|
||||
}
|
||||
writeRuntimeDetail("websocket_receive_fallback", "websocket receive fallback " + relay.lastError(), "downlink", -1, -1, "WEBSOCKET_RECEIVE", -1);
|
||||
}
|
||||
if (activeFabricServiceChannel.enabled) {
|
||||
writeRuntimeDetail("websocket_receive_required", "fabric websocket unavailable; HTTP batch receive disabled " + lastWebSocketRelayError(), "downlink", -1, -1, "WEBSOCKET_REQUIRED", -1);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return client.receiveClientPacketBatch(clusterId, vpnConnectionId, timeoutMs);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user