package mesh import ( "context" "encoding/json" "testing" "time" ) func TestPeerConnectionManagerProbesDirectAndDefersRendezvous(t *testing.T) { now := time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC) current := now tlsConfig := testQUICTLSConfig(t) server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ ListenAddr: "127.0.0.1:0", TLSConfig: tlsConfig, }) if err != nil { t.Fatalf("start quic fabric server: %v", err) } defer server.Close() local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} certSHA256 := testQUICCertSHA256(t, tlsConfig) cache := NewPeerCache(PeerCacheConfig{ Local: local, PeerEndpointCandidates: map[string][]PeerEndpointCandidate{ "node-b": { { EndpointID: "node-b-direct", NodeID: "node-b", Transport: "direct_quic", Address: "quic://" + server.Addr().String(), Reachability: "private", ConnectivityMode: "direct", PolicyTags: []string{"corp-lan", "same-site"}, Priority: 1, Metadata: peerConnectionProbeMetadata(t, certSHA256), }, }, "node-c": { { EndpointID: "node-c-relay", NodeID: "node-c", Transport: "relay_quic", Address: "relay://fabric/node-c", Reachability: "relay", ConnectivityMode: "relay_required", Priority: 1, }, }, }, WarmPeerLimit: 2, Now: now, }) tracker := NewPeerConnectionTracker(cache.Snapshot(), now) manager := NewPeerConnectionManager(PeerConnectionManagerConfig{ Local: local, PeerCache: cache, Tracker: tracker, QUICTransport: NewQUICFabricTransport(nil), ProbeTimeout: time.Second, Now: func() time.Time { current = current.Add(10 * time.Millisecond) return current }, }) cycle := manager.ProbeOnce(context.Background()) if cycle.Attempted != 1 || cycle.Succeeded != 1 || cycle.Deferred != 1 || cycle.RendezvousRequiredCount != 1 { t.Fatalf("unexpected cycle: %+v", cycle) } snapshot := tracker.Snapshot() if snapshot.Ready != 1 || snapshot.Waiting != 1 { t.Fatalf("unexpected tracker snapshot: %+v", snapshot) } if cycle.Results[0].NodeID != "node-b" || cycle.Results[0].LinkStatus != PeerConnectionProbeReachable { t.Fatalf("direct peer was not probed first: %+v", cycle.Results) } if cycle.Results[1].NodeID != "node-c" || cycle.Results[1].LinkStatus != PeerConnectionProbeDeferred { t.Fatalf("relay peer was not deferred: %+v", cycle.Results) } } func TestPeerConnectionManagerRecordsFailureAndSuppressesActiveBackoff(t *testing.T) { now := time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC) current := now local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} cache := NewPeerCache(PeerCacheConfig{ Local: local, PeerEndpoints: map[string]string{ "node-b": "quic://127.0.0.1:1", }, WarmPeerLimit: 1, Now: now, }) tracker := NewPeerConnectionTracker(cache.Snapshot(), now) manager := NewPeerConnectionManager(PeerConnectionManagerConfig{ Local: local, PeerCache: cache, Tracker: tracker, ProbeTimeout: 20 * time.Millisecond, Now: func() time.Time { current = current.Add(10 * time.Millisecond) return current }, }) for i := 0; i < 3; i++ { manager.ProbeOnce(context.Background()) } backoff := tracker.Snapshot() if backoff.Backoff != 1 { t.Fatalf("expected backoff after repeated failures: %+v", backoff) } cycle := manager.ProbeOnce(context.Background()) if cycle.Attempted != 0 || len(cycle.Results) != 0 { t.Fatalf("active backoff peer should not be attempted: %+v", cycle) } } func TestPeerConnectionManagerProbesRelayQUICLease(t *testing.T) { now := time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC) current := now tlsConfig := testQUICTLSConfig(t) server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ ListenAddr: "127.0.0.1:0", TLSConfig: tlsConfig, }) if err != nil { t.Fatalf("start quic fabric server: %v", err) } defer server.Close() local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} certSHA256 := testQUICCertSHA256(t, tlsConfig) leases := []PeerRendezvousLease{ { LeaseID: "lease-node-b-via-node-r", PeerNodeID: "node-b", RelayNodeID: "node-r", RelayEndpoint: "quic://" + server.Addr().String(), Transport: "relay_quic", ConnectivityMode: "relay_required", Priority: 10, ControlPlaneOnly: true, IssuedAt: now.Add(-time.Minute), ExpiresAt: now.Add(time.Minute), Metadata: peerConnectionProbeMetadata(t, certSHA256), }, } cache := NewPeerCache(PeerCacheConfig{ Local: local, PeerEndpointCandidates: map[string][]PeerEndpointCandidate{ "node-b": { { EndpointID: "node-b-relay", NodeID: "node-b", Transport: "relay_quic", Address: "relay://fabric/node-b", Reachability: "relay", ConnectivityMode: "relay_required", Priority: 10, }, }, }, RendezvousLeases: leases, WarmPeerLimit: 1, Now: now, }) tracker := NewPeerConnectionTracker(cache.Snapshot(), now) manager := NewPeerConnectionManager(PeerConnectionManagerConfig{ Local: local, PeerCache: cache, Tracker: tracker, RendezvousLeases: leases, QUICTransport: NewQUICFabricTransport(nil), ProbeTimeout: time.Second, Now: func() time.Time { current = current.Add(10 * time.Millisecond) return current }, }) cycle := manager.ProbeOnce(context.Background()) if cycle.Attempted != 1 || cycle.Succeeded != 1 || cycle.Deferred != 0 || cycle.RelayQUICCount != 1 || cycle.RendezvousResolvedCount != 1 || cycle.RendezvousRequiredCount != 0 { t.Fatalf("unexpected relay-control cycle: %+v", cycle) } if len(cycle.Results) != 1 || cycle.Results[0].NodeID != "node-b" || cycle.Results[0].RelayNodeID != "node-r" || cycle.Results[0].ConnectionState.State != PeerConnectionRelayReady { t.Fatalf("unexpected relay-control result: %+v", cycle.Results) } snapshot := tracker.Snapshot() if snapshot.RelayReady != 1 || snapshot.Waiting != 0 { t.Fatalf("unexpected tracker snapshot: %+v", snapshot) } } func TestPeerConnectionProbeTargetKeepsPeerForLocalRelayReverseQUIC(t *testing.T) { intent := PeerConnectionIntent{ NodeID: "node-b", RelayCandidate: true, RelayNodeID: "node-a", Transport: "reverse_quic", } if got := peerConnectionProbeTargetNodeID(intent, "node-a"); got != "node-b" { t.Fatalf("local relay reverse probe target = %q, want peer node-b", got) } intent.RelayNodeID = "node-r" if got := peerConnectionProbeTargetNodeID(intent, "node-a"); got != "node-r" { t.Fatalf("remote relay probe target = %q, want relay node-r", got) } } func TestPeerConnectionProbeTargetsFallsBackToBestPeerCertSHA256(t *testing.T) { intent := PeerConnectionIntent{ NodeID: "node-b", BestPeerCertSHA256: "intent-cert", } cacheEntry := PeerCacheEntry{ NodeID: "node-b", BestPeerCertSHA256: "cache-cert", BestCandidateID: "node-b-best", BestTransport: "direct_quic", Endpoint: "quic://94.141.118.222:19199", EndpointCandidates: []PeerEndpointCandidate{ { EndpointID: "node-b-public", NodeID: "node-b", Transport: "direct_quic", Address: "quic://94.141.118.222:19199", Reachability: "public", ConnectivityMode: "direct", Priority: 1, }, }, } targets := peerConnectionProbeTargets(intent, cacheEntry) if len(targets) != 1 { t.Fatalf("target count = %d, want 1", len(targets)) } for _, target := range targets { if target.Endpoint != "quic://94.141.118.222:19199" { continue } if target.PeerCertSHA256 != "cache-cert" { t.Fatalf("peer cert = %q, want cache-cert", target.PeerCertSHA256) } } } func TestPeerConnectionProbeTargetsUsesRelayLeaseCertForRelayEndpoint(t *testing.T) { intent := PeerConnectionIntent{ NodeID: "node-b", BestCandidateID: "lease-node-b-via-node-r", Endpoint: "quic://195.123.240.88:19131", Transport: "relay_quic", BestPeerCertSHA256: "relay-cert", RelayCandidate: true, ConnectionState: PeerConnectionBackoff, } cacheEntry := PeerCacheEntry{ NodeID: "node-b", BestPeerCertSHA256: "direct-cert", EndpointCandidates: []PeerEndpointCandidate{ { EndpointID: "node-b-private", NodeID: "node-b", Transport: "direct_quic", Address: "quic://192.168.200.61:19132", Reachability: "private", ConnectivityMode: "private_lan", Priority: 1, Metadata: peerConnectionProbeMetadata(t, "direct-cert"), }, }, } targets := peerConnectionProbeTargets(intent, cacheEntry) if len(targets) != 2 { t.Fatalf("target count = %d, want 2", len(targets)) } for _, target := range targets { if target.Endpoint != "quic://195.123.240.88:19131" { continue } if target.PeerCertSHA256 != "relay-cert" { t.Fatalf("relay endpoint cert = %q, want relay-cert", target.PeerCertSHA256) } return } t.Fatalf("relay endpoint target not found: %+v", targets) } func TestPeerConnectionProbeTargetsUpgradeRelayReadyPeerToDirectQUIC(t *testing.T) { now := time.Date(2026, 5, 18, 12, 0, 0, 0, time.UTC) current := now tlsConfig := testQUICTLSConfig(t) server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ ListenAddr: "127.0.0.1:0", TLSConfig: tlsConfig, }) if err != nil { t.Fatalf("start quic fabric server: %v", err) } defer server.Close() local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} certSHA256 := testQUICCertSHA256(t, tlsConfig) leases := []PeerRendezvousLease{{ LeaseID: "lease-node-b-via-node-r", PeerNodeID: "node-b", RelayNodeID: "node-r", RelayEndpoint: "quic://127.0.0.1:1", Transport: "relay_quic", ConnectivityMode: "relay_required", Priority: 10, ControlPlaneOnly: true, IssuedAt: now.Add(-time.Minute), ExpiresAt: now.Add(time.Minute), }} cache := NewPeerCache(PeerCacheConfig{ Local: local, PeerEndpointCandidates: map[string][]PeerEndpointCandidate{ "node-b": { { EndpointID: "node-b-direct", NodeID: "node-b", Transport: "direct_quic", Address: "quic://" + server.Addr().String(), Reachability: "public", ConnectivityMode: "direct", Priority: 1, Metadata: peerConnectionProbeMetadata(t, certSHA256), }, }, }, RendezvousLeases: leases, WarmPeerLimit: 1, Now: now, }) tracker := NewPeerConnectionTracker(cache.Snapshot(), now) manager := NewPeerConnectionManager(PeerConnectionManagerConfig{ Local: local, PeerCache: cache, Tracker: tracker, RendezvousLeases: leases, QUICTransport: NewQUICFabricTransport(nil), ProbeTimeout: time.Second, Now: func() time.Time { current = current.Add(10 * time.Millisecond) return current }, }) cycle := manager.ProbeOnce(context.Background()) if cycle.Attempted != 1 || cycle.Succeeded != 1 || len(cycle.Results) != 1 { t.Fatalf("unexpected cycle: %+v", cycle) } result := cycle.Results[0] if result.SelectedCandidateID != "node-b-direct" || result.SelectedEndpoint != "quic://"+server.Addr().String() { t.Fatalf("relay-ready peer did not upgrade to direct candidate: %+v", result) } if result.ConnectionState.State != PeerConnectionReady { t.Fatalf("connection state = %q, want ready", result.ConnectionState.State) } if len(result.CandidateResults) == 0 || result.CandidateResults[0].Transport != "direct_quic" || result.CandidateResults[0].LinkStatus != PeerConnectionProbeReachable { t.Fatalf("candidate trail missing direct probe success: %+v", result.CandidateResults) } snapshot := tracker.Snapshot() if snapshot.Ready != 1 || snapshot.RelayReady != 0 { t.Fatalf("unexpected tracker snapshot after direct upgrade: %+v", snapshot) } } func TestPeerConnectionManagerFallsBackAcrossEndpointCandidates(t *testing.T) { now := time.Date(2026, 4, 30, 12, 0, 0, 0, time.UTC) current := now tlsConfig := testQUICTLSConfig(t) server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ ListenAddr: "127.0.0.1:0", TLSConfig: tlsConfig, }) if err != nil { t.Fatalf("start quic fabric server: %v", err) } defer server.Close() local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} certSHA256 := testQUICCertSHA256(t, tlsConfig) cache := NewPeerCache(PeerCacheConfig{ Local: local, PeerEndpointCandidates: map[string][]PeerEndpointCandidate{ "node-b": { { EndpointID: "node-b-dead", NodeID: "node-b", Transport: "lan_quic", Address: "quic://127.0.0.1:1", Reachability: "private", ConnectivityMode: "private_lan", Priority: 1, }, { EndpointID: "node-b-live", NodeID: "node-b", Transport: "lan_quic", Address: "quic://" + server.Addr().String(), Reachability: "private", ConnectivityMode: "private_lan", Priority: 2, Metadata: peerConnectionProbeMetadata(t, certSHA256), }, }, }, WarmPeerLimit: 1, Now: now, }) tracker := NewPeerConnectionTracker(cache.Snapshot(), now) manager := NewPeerConnectionManager(PeerConnectionManagerConfig{ Local: local, PeerCache: cache, Tracker: tracker, QUICTransport: NewQUICFabricTransport(nil), ProbeTimeout: 100 * time.Millisecond, Now: func() time.Time { current = current.Add(10 * time.Millisecond) return current }, }) cycle := manager.ProbeOnce(context.Background()) if cycle.Attempted != 1 || cycle.Succeeded != 1 || cycle.Failed != 0 || len(cycle.Results) != 1 { t.Fatalf("unexpected cycle: %+v", cycle) } result := cycle.Results[0] if result.LinkStatus != PeerConnectionProbeReachable || result.SelectedCandidateID != "node-b-live" || result.SelectedEndpoint != "quic://"+server.Addr().String() { t.Fatalf("fallback did not select live candidate: %+v", result) } if len(result.CandidateResults) != 2 || result.CandidateResults[0].LinkStatus != PeerConnectionProbeUnreachable || result.CandidateResults[1].LinkStatus != PeerConnectionProbeReachable { t.Fatalf("candidate probe trail mismatch: %+v", result.CandidateResults) } snapshot := tracker.Snapshot() if snapshot.Ready != 1 || len(snapshot.Entries) != 1 || snapshot.Entries[0].BestCandidateID != "node-b-live" || snapshot.Entries[0].Endpoint != "quic://"+server.Addr().String() { t.Fatalf("tracker did not retain selected candidate: %+v", snapshot) } } func TestPeerConnectionManagerSkipsUnspecifiedQUICCandidates(t *testing.T) { now := time.Date(2026, 5, 17, 6, 0, 0, 0, time.UTC) current := now tlsConfig := testQUICTLSConfig(t) server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ ListenAddr: "127.0.0.1:0", TLSConfig: tlsConfig, }) if err != nil { t.Fatalf("start quic fabric server: %v", err) } defer server.Close() local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"} certSHA256 := testQUICCertSHA256(t, tlsConfig) cache := NewPeerCache(PeerCacheConfig{ Local: local, PeerEndpointCandidates: map[string][]PeerEndpointCandidate{ "node-b": { { EndpointID: "node-b-unspecified-v6", NodeID: "node-b", Transport: "direct_quic", Address: "quic://[::]:19131", Reachability: "public", ConnectivityMode: "direct", Priority: 1, }, { EndpointID: "node-b-live", NodeID: "node-b", Transport: "direct_quic", Address: "quic://" + server.Addr().String(), Reachability: "public", ConnectivityMode: "direct", Priority: 2, Metadata: peerConnectionProbeMetadata(t, certSHA256), }, }, }, WarmPeerLimit: 1, Now: now, }) tracker := NewPeerConnectionTracker(cache.Snapshot(), now) manager := NewPeerConnectionManager(PeerConnectionManagerConfig{ Local: local, PeerCache: cache, Tracker: tracker, QUICTransport: NewQUICFabricTransport(nil), ProbeTimeout: time.Second, Now: func() time.Time { current = current.Add(10 * time.Millisecond) return current }, }) cycle := manager.ProbeOnce(context.Background()) if cycle.Attempted != 1 || cycle.Succeeded != 1 || len(cycle.Results) != 1 { t.Fatalf("unexpected cycle: %+v", cycle) } result := cycle.Results[0] if result.SelectedCandidateID != "node-b-live" || result.SelectedEndpoint != "quic://"+server.Addr().String() { t.Fatalf("manager did not skip unspecified endpoint: %+v", result) } if len(result.CandidateResults) != 1 || result.CandidateResults[0].CandidateID != "node-b-live" { t.Fatalf("unspecified endpoint should not be probed: %+v", result.CandidateResults) } } func peerConnectionProbeMetadata(t *testing.T, certSHA256 string) json.RawMessage { t.Helper() payload, err := json.Marshal(map[string]string{"peer_cert_sha256": certSHA256}) if err != nil { t.Fatalf("marshal probe metadata: %v", err) } return payload }