Tune VPN fabric batching and flow windows

This commit is contained in:
2026-05-15 23:19:15 +03:00
parent fdf176bc5d
commit bf78af07a6
5 changed files with 22 additions and 16 deletions
@@ -7,7 +7,7 @@ import (
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/state" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/state"
) )
const Version = "0.2.278-vpnpreempt" const Version = "0.2.279-vpnperf"
func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest { func EnrollmentPayload(clusterID, joinToken string, identity state.Identity) client.EnrollRequest {
return client.EnrollRequest{ return client.EnrollRequest{
@@ -19,9 +19,9 @@ const (
FabricDirectionClientToGateway = "client_to_gateway" FabricDirectionClientToGateway = "client_to_gateway"
FabricDirectionGatewayToClient = "gateway_to_client" FabricDirectionGatewayToClient = "gateway_to_client"
defaultFabricFlowShardCount = 32 defaultFabricFlowShardCount = 8
defaultFabricFlowQueueCapacity = 1024 defaultFabricFlowQueueCapacity = 1024
defaultFabricFlowParallelSendWindow = 4 defaultFabricFlowParallelSendWindow = 8
defaultFabricFlowQualityWindowCapacity = 32 defaultFabricFlowQualityWindowCapacity = 32
defaultFabricFlowFailureThreshold = 2 defaultFabricFlowFailureThreshold = 2
defaultFabricFlowSlowSendThreshold = 2 * time.Second defaultFabricFlowSlowSendThreshold = 2 * time.Second
@@ -332,8 +332,8 @@ func defaultFabricServiceChannelAdaptivePolicy() FabricServiceChannelAdaptivePol
ClassWindows: map[string]int{ ClassWindows: map[string]int{
FabricTrafficClassControl: defaultFabricFlowParallelSendWindow, FabricTrafficClassControl: defaultFabricFlowParallelSendWindow,
FabricTrafficClassInteractive: defaultFabricFlowParallelSendWindow, FabricTrafficClassInteractive: defaultFabricFlowParallelSendWindow,
FabricTrafficClassReliable: 3, FabricTrafficClassReliable: 6,
FabricTrafficClassBulk: 1, FabricTrafficClassBulk: 4,
FabricTrafficClassDroppable: 1, FabricTrafficClassDroppable: 1,
}, },
}) })
@@ -361,8 +361,8 @@ func normalizeFabricServiceChannelAdaptivePolicy(policy FabricServiceChannelAdap
defaults := map[string]int{ defaults := map[string]int{
FabricTrafficClassControl: policy.MaxParallelWindow, FabricTrafficClassControl: policy.MaxParallelWindow,
FabricTrafficClassInteractive: policy.MaxParallelWindow, FabricTrafficClassInteractive: policy.MaxParallelWindow,
FabricTrafficClassReliable: minPositive(policy.MaxParallelWindow, 3), FabricTrafficClassReliable: minPositive(policy.MaxParallelWindow, 6),
FabricTrafficClassBulk: 1, FabricTrafficClassBulk: minPositive(policy.MaxParallelWindow, 4),
FabricTrafficClassDroppable: 1, FabricTrafficClassDroppable: 1,
} }
next := map[string]int{} next := map[string]int{}
@@ -538,12 +538,15 @@ func (s *FabricFlowScheduler) RecommendedParallelSendWindowForTrafficClass(traff
return maxWindow return maxWindow
} }
if classPressure.hasDrops { if classPressure.hasDrops {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
} }
if global.hasDrops { if global.hasDrops {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
} }
if global.highPressure && global.interactiveOrControlQueues > 0 { if global.highPressure && global.interactiveOrControlQueues > 0 {
if trafficClass == FabricTrafficClassBulk || trafficClass == FabricTrafficClassDroppable {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
} }
if global.highPressure { if global.highPressure {
@@ -821,9 +824,12 @@ func (s *FabricFlowScheduler) recommendedParallelSendWindowForTrafficClassLocked
return maxWindow return maxWindow
} }
if classPressure.hasDrops || globalPressure.hasDrops { if classPressure.hasDrops || globalPressure.hasDrops {
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
} }
if globalPressure.highPressure && globalPressure.interactiveOrControlQueues > 0 { if globalPressure.highPressure && globalPressure.interactiveOrControlQueues > 0 {
if trafficClass == FabricTrafficClassBulk || trafficClass == FabricTrafficClassDroppable {
return classWindowLimit(s.adaptivePolicy, trafficClass, 1)
}
return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2)) return classWindowLimit(s.adaptivePolicy, trafficClass, boundedParallelWindow(maxWindow/2))
} }
if globalPressure.highPressure { if globalPressure.highPressure {
@@ -1457,14 +1457,14 @@ func TestFabricFlowSchedulerProtectsInteractiveWindowDuringBulkPressure(t *testi
if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassBulk, 4); got != 1 { if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassBulk, 4); got != 1 {
t.Fatalf("bulk adaptive window = %d, want 1", got) t.Fatalf("bulk adaptive window = %d, want 1", got)
} }
if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassInteractive, 4); got != 4 { if got := scheduler.RecommendedParallelSendWindowForTrafficClass(FabricTrafficClassInteractive, 8); got != 8 {
t.Fatalf("interactive adaptive window = %d, want 4", got) t.Fatalf("interactive adaptive window = %d, want 8", got)
} }
snapshot := scheduler.Snapshot() snapshot := scheduler.Snapshot()
if !snapshot.AdaptiveBackpressureActive || snapshot.AdaptiveBackpressureReason != "bulk_window_reduced_to_protect_interactive" { if !snapshot.AdaptiveBackpressureActive || snapshot.AdaptiveBackpressureReason != "bulk_window_reduced_to_protect_interactive" {
t.Fatalf("adaptive snapshot = %+v", snapshot) t.Fatalf("adaptive snapshot = %+v", snapshot)
} }
if snapshot.RecommendedParallelWindows[FabricTrafficClassBulk] != 1 || snapshot.RecommendedParallelWindows[FabricTrafficClassInteractive] != 4 { if snapshot.RecommendedParallelWindows[FabricTrafficClassBulk] != 1 || snapshot.RecommendedParallelWindows[FabricTrafficClassInteractive] != 8 {
t.Fatalf("recommended class windows = %+v", snapshot.RecommendedParallelWindows) t.Fatalf("recommended class windows = %+v", snapshot.RecommendedParallelWindows)
} }
if snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 16 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 { if snapshot.TrafficClassCounts[FabricTrafficClassBulk] != 16 || snapshot.TrafficClassCounts[FabricTrafficClassInteractive] != 1 {
+2 -2
View File
@@ -30,8 +30,8 @@ android {
applicationId "su.cin.rapvpn" applicationId "su.cin.rapvpn"
minSdk 26 minSdk 26
targetSdk 35 targetSdk 35
versionCode 209 versionCode 210
versionName "0.2.209" versionName "0.2.210"
buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\"" buildConfigField "String", "DEFAULT_BACKEND_URL", "\"${normalizeGradleString(defaultBackendUrl)}\""
buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\"" buildConfigField "String", "DEFAULT_CLUSTER_ID", "\"${normalizeGradleString(defaultClusterId)}\""
buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\"" buildConfigField "String", "DEFAULT_ORGANIZATION_ID", "\"${normalizeGradleString(defaultOrganizationId)}\""
@@ -72,7 +72,7 @@ public class RapVpnService extends VpnService {
private static final int DOWNLINK_POLL_MS_MIN = 2; private static final int DOWNLINK_POLL_MS_MIN = 2;
private static final int DOWNLINK_POLL_MS_MAX = 40; private static final int DOWNLINK_POLL_MS_MAX = 40;
private static final int DOWNLINK_POLL_MS_STEP = 4; private static final int DOWNLINK_POLL_MS_STEP = 4;
private static final int UPLINK_BATCH_GATHER_MS = 2; private static final int UPLINK_BATCH_GATHER_MS = 12;
private static final int TUN_WRITE_MAX_RETRIES = 1000; private static final int TUN_WRITE_MAX_RETRIES = 1000;
private static final int TUN_EAGAIN_SLEEP_MS = 1; private static final int TUN_EAGAIN_SLEEP_MS = 1;
private static final int RUNTIME_DETAIL_INTERVAL_MS = 250; private static final int RUNTIME_DETAIL_INTERVAL_MS = 250;