package cluster import ( "context" "testing" "time" ) func TestVPNPacketHubPopBatchAndStatsKeys(t *testing.T) { hub := newVPNPacketHub() key := vpnPacketKey{ ClusterID: "cluster-1", VPNConnectionID: "vpn-1", Direction: vpnDirectionClientToGateway, } packetA := []byte{ 0x45, 0x00, 0x00, 20, 0x00, 0x01, 0x00, 0x00, 64, 17, 0, 0, 192, 168, 0, 1, 192, 168, 0, 2, 0x00, 0x50, 0x01, 0xBB, } packetB := make([]byte, len(packetA)) copy(packetB, packetA) packetB[19] = 0xBA hub.Push(key, packetA) hub.Push(key, packetB) packets := hub.PopBatch(context.Background(), key, 0, vpnPacketBatchMaxPackets, vpnPacketBatchMaxBytes) if len(packets) != 2 { t.Fatalf("expected 2 packets in batch, got %d", len(packets)) } statsAny := hub.Snapshot("cluster-1", "vpn-1")[vpnDirectionClientToGateway] stats, ok := statsAny.(map[string]any) if !ok { t.Fatalf("unexpected stats payload type: %T", statsAny) } for _, keyName := range []string{ "pushed", "pushed_bytes", "popped", "popped_bytes", "window_push_rate_pps", "window_pop_rate_pps", "window_push_rate_mbps", "window_pop_rate_mbps", "window_push_packets", "window_pop_packets", "queue_depth", "queue_depths", "queue_depth_max", "queue_depth_high_watermark", "queue_depth_high_at", "shard_depth_high_watermark", "shard_depth_high_at", "queue_capacity", "queue_shard_capacity", "queue_full_drops", "requeue_drops", "cleared_stale_packets", "flow_shard_count", "flow_isolation", } { if _, found := stats[keyName]; !found { t.Fatalf("missing vpn packet stat key %s", keyName) } } if got, ok := stats["popped"].(uint64); !ok || got != 2 { t.Fatalf("expected popped=2, got %v (ok=%v)", stats["popped"], ok) } if got, ok := stats["pushed"].(uint64); !ok || got != 2 { t.Fatalf("expected pushed=2, got %v (ok=%v)", stats["pushed"], ok) } if got, ok := stats["queue_depth_high_watermark"].(int); !ok || got < 1 { t.Fatalf("expected queue depth high watermark, got %v (ok=%v)", stats["queue_depth_high_watermark"], ok) } } func TestVPNPacketHubGatherBehavior(t *testing.T) { hub := newVPNPacketHub() key := vpnPacketKey{ ClusterID: "cluster-1", VPNConnectionID: "vpn-1", Direction: vpnDirectionGatewayToClient, } packet := []byte{ 0x45, 0x00, 0x00, 20, 0x00, 0x01, 0x00, 0x00, 64, 6, 0, 0, 10, 0, 0, 1, 10, 0, 0, 2, 0x12, 0x34, 0x56, 0x78, } hub.Push(key, packet) hub.Push(key, packet) hub.Push(key, packet) first, ok := hub.Pop(context.Background(), key, 0) if !ok { t.Fatal("expected packet from queue") } batch := hub.PopBatch(context.Background(), key, 0, 1, 1024) if len(batch) != 1 { t.Fatalf("expected 1 packet because batch limit 1, got %d", len(batch)) } _ = first } func TestVPNPacketHubFlowShardsReportDepths(t *testing.T) { hub := newVPNPacketHub() key := vpnPacketKey{ ClusterID: "cluster-1", VPNConnectionID: "vpn-1", Direction: vpnDirectionGatewayToClient, } for i := byte(1); i <= 8; i++ { packet := []byte{ 0x45, 0x00, 0x00, 24, 0x00, i, 0x00, 0x00, 64, 6, 0, 0, 10, 0, 0, i, 192, 168, 200, i, 0x12, i, 0x56, i, } if err := hub.Push(key, packet); err != nil { t.Fatalf("push packet %d: %v", i, err) } } statsAny := hub.Snapshot("cluster-1", "vpn-1")[vpnDirectionGatewayToClient] stats, ok := statsAny.(map[string]any) if !ok { t.Fatalf("unexpected stats payload type: %T", statsAny) } if got, ok := stats["queue_depth"].(int); !ok || got != 8 { t.Fatalf("expected queue_depth=8, got %v (ok=%v)", stats["queue_depth"], ok) } depths, ok := stats["queue_depths"].([]int) if !ok { t.Fatalf("unexpected queue_depths payload type: %T", stats["queue_depths"]) } if len(depths) != vpnPacketFlowShardCount { t.Fatalf("expected %d queue shards, got %d", vpnPacketFlowShardCount, len(depths)) } nonEmpty := 0 for _, depth := range depths { if depth > 0 { nonEmpty++ } } if nonEmpty < 2 { t.Fatalf("expected packets to be distributed across at least 2 shards, got depths=%v", depths) } } func TestVPNPacketHubClearDoesNotCountAsDrop(t *testing.T) { hub := newVPNPacketHub() key := vpnPacketKey{ ClusterID: "cluster-1", VPNConnectionID: "vpn-1", Direction: vpnDirectionClientToGateway, } packet := []byte{ 0x45, 0x00, 0x00, 20, 0x00, 0x01, 0x00, 0x00, 64, 6, 0, 0, 10, 0, 0, 1, 10, 0, 0, 2, 0x12, 0x34, 0x56, 0x78, } if err := hub.Push(key, packet); err != nil { t.Fatalf("push packet: %v", err) } if cleared := hub.Clear(key); cleared != 1 { t.Fatalf("expected cleared=1, got %d", cleared) } statsAny := hub.Snapshot("cluster-1", "vpn-1")[vpnDirectionClientToGateway] stats, ok := statsAny.(map[string]any) if !ok { t.Fatalf("unexpected stats payload type: %T", statsAny) } if got, ok := stats["dropped"].(uint64); !ok || got != 0 { t.Fatalf("expected dropped=0 for stale clear, got %v (ok=%v)", stats["dropped"], ok) } if got, ok := stats["cleared_stale_packets"].(uint64); !ok || got != 1 { t.Fatalf("expected cleared_stale_packets=1, got %v (ok=%v)", stats["cleared_stale_packets"], ok) } } func TestVPNClientDiagnosticStopCommandDrainsPendingWork(t *testing.T) { hub := newVPNClientDiagnosticHub() hub.Enqueue("cluster-1", "device-1", map[string]any{"type": "vpn_page_probe", "url": "https://speedtest.rt.ru/"}) hub.Enqueue("cluster-1", "device-1", map[string]any{"type": "vpn_tcp_connect", "host": "192.168.200.95"}) hub.Enqueue("cluster-1", "device-1", map[string]any{"type": "stop_vpn"}) item, ok := hub.Pop(context.Background(), "cluster-1", "device-1", time.Millisecond) if !ok { t.Fatal("expected priority stop command") } if got, _ := item.Payload["type"].(string); got != "stop_vpn" { t.Fatalf("first command = %q, want stop_vpn", got) } if item, ok := hub.Pop(context.Background(), "cluster-1", "device-1", 0); ok { t.Fatalf("expected old commands to be drained, got %#v", item.Payload) } }