Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

536 lines
17 KiB
Go

package mesh
import (
"context"
"encoding/json"
"testing"
"time"
)
func TestPeerConnectionManagerProbesDirectAndDefersRendezvous(t *testing.T) {
now := time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)
current := now
tlsConfig := testQUICTLSConfig(t)
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: tlsConfig,
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}
certSHA256 := testQUICCertSHA256(t, tlsConfig)
cache := NewPeerCache(PeerCacheConfig{
Local: local,
PeerEndpointCandidates: map[string][]PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-direct",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://" + server.Addr().String(),
Reachability: "private",
ConnectivityMode: "direct",
PolicyTags: []string{"corp-lan", "same-site"},
Priority: 1,
Metadata: peerConnectionProbeMetadata(t, certSHA256),
},
},
"node-c": {
{
EndpointID: "node-c-relay",
NodeID: "node-c",
Transport: "relay_quic",
Address: "relay://fabric/node-c",
Reachability: "relay",
ConnectivityMode: "relay_required",
Priority: 1,
},
},
},
WarmPeerLimit: 2,
Now: now,
})
tracker := NewPeerConnectionTracker(cache.Snapshot(), now)
manager := NewPeerConnectionManager(PeerConnectionManagerConfig{
Local: local,
PeerCache: cache,
Tracker: tracker,
QUICTransport: NewQUICFabricTransport(nil),
ProbeTimeout: time.Second,
Now: func() time.Time {
current = current.Add(10 * time.Millisecond)
return current
},
})
cycle := manager.ProbeOnce(context.Background())
if cycle.Attempted != 1 || cycle.Succeeded != 1 || cycle.Deferred != 1 || cycle.RendezvousRequiredCount != 1 {
t.Fatalf("unexpected cycle: %+v", cycle)
}
snapshot := tracker.Snapshot()
if snapshot.Ready != 1 || snapshot.Waiting != 1 {
t.Fatalf("unexpected tracker snapshot: %+v", snapshot)
}
if cycle.Results[0].NodeID != "node-b" || cycle.Results[0].LinkStatus != PeerConnectionProbeReachable {
t.Fatalf("direct peer was not probed first: %+v", cycle.Results)
}
if cycle.Results[1].NodeID != "node-c" || cycle.Results[1].LinkStatus != PeerConnectionProbeDeferred {
t.Fatalf("relay peer was not deferred: %+v", cycle.Results)
}
}
func TestPeerConnectionManagerRecordsFailureAndSuppressesActiveBackoff(t *testing.T) {
now := time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)
current := now
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}
cache := NewPeerCache(PeerCacheConfig{
Local: local,
PeerEndpoints: map[string]string{
"node-b": "quic://127.0.0.1:1",
},
WarmPeerLimit: 1,
Now: now,
})
tracker := NewPeerConnectionTracker(cache.Snapshot(), now)
manager := NewPeerConnectionManager(PeerConnectionManagerConfig{
Local: local,
PeerCache: cache,
Tracker: tracker,
ProbeTimeout: 20 * time.Millisecond,
Now: func() time.Time {
current = current.Add(10 * time.Millisecond)
return current
},
})
for i := 0; i < 3; i++ {
manager.ProbeOnce(context.Background())
}
backoff := tracker.Snapshot()
if backoff.Backoff != 1 {
t.Fatalf("expected backoff after repeated failures: %+v", backoff)
}
cycle := manager.ProbeOnce(context.Background())
if cycle.Attempted != 0 || len(cycle.Results) != 0 {
t.Fatalf("active backoff peer should not be attempted: %+v", cycle)
}
}
func TestPeerConnectionManagerProbesRelayQUICLease(t *testing.T) {
now := time.Date(2026, 4, 28, 12, 0, 0, 0, time.UTC)
current := now
tlsConfig := testQUICTLSConfig(t)
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: tlsConfig,
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}
certSHA256 := testQUICCertSHA256(t, tlsConfig)
leases := []PeerRendezvousLease{
{
LeaseID: "lease-node-b-via-node-r",
PeerNodeID: "node-b",
RelayNodeID: "node-r",
RelayEndpoint: "quic://" + server.Addr().String(),
Transport: "relay_quic",
ConnectivityMode: "relay_required",
Priority: 10,
ControlPlaneOnly: true,
IssuedAt: now.Add(-time.Minute),
ExpiresAt: now.Add(time.Minute),
Metadata: peerConnectionProbeMetadata(t, certSHA256),
},
}
cache := NewPeerCache(PeerCacheConfig{
Local: local,
PeerEndpointCandidates: map[string][]PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-relay",
NodeID: "node-b",
Transport: "relay_quic",
Address: "relay://fabric/node-b",
Reachability: "relay",
ConnectivityMode: "relay_required",
Priority: 10,
},
},
},
RendezvousLeases: leases,
WarmPeerLimit: 1,
Now: now,
})
tracker := NewPeerConnectionTracker(cache.Snapshot(), now)
manager := NewPeerConnectionManager(PeerConnectionManagerConfig{
Local: local,
PeerCache: cache,
Tracker: tracker,
RendezvousLeases: leases,
QUICTransport: NewQUICFabricTransport(nil),
ProbeTimeout: time.Second,
Now: func() time.Time {
current = current.Add(10 * time.Millisecond)
return current
},
})
cycle := manager.ProbeOnce(context.Background())
if cycle.Attempted != 1 ||
cycle.Succeeded != 1 ||
cycle.Deferred != 0 ||
cycle.RelayQUICCount != 1 ||
cycle.RendezvousResolvedCount != 1 ||
cycle.RendezvousRequiredCount != 0 {
t.Fatalf("unexpected relay-control cycle: %+v", cycle)
}
if len(cycle.Results) != 1 ||
cycle.Results[0].NodeID != "node-b" ||
cycle.Results[0].RelayNodeID != "node-r" ||
cycle.Results[0].ConnectionState.State != PeerConnectionRelayReady {
t.Fatalf("unexpected relay-control result: %+v", cycle.Results)
}
snapshot := tracker.Snapshot()
if snapshot.RelayReady != 1 || snapshot.Waiting != 0 {
t.Fatalf("unexpected tracker snapshot: %+v", snapshot)
}
}
func TestPeerConnectionProbeTargetKeepsPeerForLocalRelayReverseQUIC(t *testing.T) {
intent := PeerConnectionIntent{
NodeID: "node-b",
RelayCandidate: true,
RelayNodeID: "node-a",
Transport: "reverse_quic",
}
if got := peerConnectionProbeTargetNodeID(intent, "node-a"); got != "node-b" {
t.Fatalf("local relay reverse probe target = %q, want peer node-b", got)
}
intent.RelayNodeID = "node-r"
if got := peerConnectionProbeTargetNodeID(intent, "node-a"); got != "node-r" {
t.Fatalf("remote relay probe target = %q, want relay node-r", got)
}
}
func TestPeerConnectionProbeTargetsFallsBackToBestPeerCertSHA256(t *testing.T) {
intent := PeerConnectionIntent{
NodeID: "node-b",
BestPeerCertSHA256: "intent-cert",
}
cacheEntry := PeerCacheEntry{
NodeID: "node-b",
BestPeerCertSHA256: "cache-cert",
BestCandidateID: "node-b-best",
BestTransport: "direct_quic",
Endpoint: "quic://94.141.118.222:19199",
EndpointCandidates: []PeerEndpointCandidate{
{
EndpointID: "node-b-public",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://94.141.118.222:19199",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 1,
},
},
}
targets := peerConnectionProbeTargets(intent, cacheEntry)
if len(targets) != 1 {
t.Fatalf("target count = %d, want 1", len(targets))
}
for _, target := range targets {
if target.Endpoint != "quic://94.141.118.222:19199" {
continue
}
if target.PeerCertSHA256 != "cache-cert" {
t.Fatalf("peer cert = %q, want cache-cert", target.PeerCertSHA256)
}
}
}
func TestPeerConnectionProbeTargetsUsesRelayLeaseCertForRelayEndpoint(t *testing.T) {
intent := PeerConnectionIntent{
NodeID: "node-b",
BestCandidateID: "lease-node-b-via-node-r",
Endpoint: "quic://195.123.240.88:19131",
Transport: "relay_quic",
BestPeerCertSHA256: "relay-cert",
RelayCandidate: true,
ConnectionState: PeerConnectionBackoff,
}
cacheEntry := PeerCacheEntry{
NodeID: "node-b",
BestPeerCertSHA256: "direct-cert",
EndpointCandidates: []PeerEndpointCandidate{
{
EndpointID: "node-b-private",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://192.168.200.61:19132",
Reachability: "private",
ConnectivityMode: "private_lan",
Priority: 1,
Metadata: peerConnectionProbeMetadata(t, "direct-cert"),
},
},
}
targets := peerConnectionProbeTargets(intent, cacheEntry)
if len(targets) != 2 {
t.Fatalf("target count = %d, want 2", len(targets))
}
for _, target := range targets {
if target.Endpoint != "quic://195.123.240.88:19131" {
continue
}
if target.PeerCertSHA256 != "relay-cert" {
t.Fatalf("relay endpoint cert = %q, want relay-cert", target.PeerCertSHA256)
}
return
}
t.Fatalf("relay endpoint target not found: %+v", targets)
}
func TestPeerConnectionProbeTargetsUpgradeRelayReadyPeerToDirectQUIC(t *testing.T) {
now := time.Date(2026, 5, 18, 12, 0, 0, 0, time.UTC)
current := now
tlsConfig := testQUICTLSConfig(t)
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: tlsConfig,
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}
certSHA256 := testQUICCertSHA256(t, tlsConfig)
leases := []PeerRendezvousLease{{
LeaseID: "lease-node-b-via-node-r",
PeerNodeID: "node-b",
RelayNodeID: "node-r",
RelayEndpoint: "quic://127.0.0.1:1",
Transport: "relay_quic",
ConnectivityMode: "relay_required",
Priority: 10,
ControlPlaneOnly: true,
IssuedAt: now.Add(-time.Minute),
ExpiresAt: now.Add(time.Minute),
}}
cache := NewPeerCache(PeerCacheConfig{
Local: local,
PeerEndpointCandidates: map[string][]PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-direct",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://" + server.Addr().String(),
Reachability: "public",
ConnectivityMode: "direct",
Priority: 1,
Metadata: peerConnectionProbeMetadata(t, certSHA256),
},
},
},
RendezvousLeases: leases,
WarmPeerLimit: 1,
Now: now,
})
tracker := NewPeerConnectionTracker(cache.Snapshot(), now)
manager := NewPeerConnectionManager(PeerConnectionManagerConfig{
Local: local,
PeerCache: cache,
Tracker: tracker,
RendezvousLeases: leases,
QUICTransport: NewQUICFabricTransport(nil),
ProbeTimeout: time.Second,
Now: func() time.Time {
current = current.Add(10 * time.Millisecond)
return current
},
})
cycle := manager.ProbeOnce(context.Background())
if cycle.Attempted != 1 || cycle.Succeeded != 1 || len(cycle.Results) != 1 {
t.Fatalf("unexpected cycle: %+v", cycle)
}
result := cycle.Results[0]
if result.SelectedCandidateID != "node-b-direct" || result.SelectedEndpoint != "quic://"+server.Addr().String() {
t.Fatalf("relay-ready peer did not upgrade to direct candidate: %+v", result)
}
if result.ConnectionState.State != PeerConnectionReady {
t.Fatalf("connection state = %q, want ready", result.ConnectionState.State)
}
if len(result.CandidateResults) == 0 || result.CandidateResults[0].Transport != "direct_quic" || result.CandidateResults[0].LinkStatus != PeerConnectionProbeReachable {
t.Fatalf("candidate trail missing direct probe success: %+v", result.CandidateResults)
}
snapshot := tracker.Snapshot()
if snapshot.Ready != 1 || snapshot.RelayReady != 0 {
t.Fatalf("unexpected tracker snapshot after direct upgrade: %+v", snapshot)
}
}
func TestPeerConnectionManagerFallsBackAcrossEndpointCandidates(t *testing.T) {
now := time.Date(2026, 4, 30, 12, 0, 0, 0, time.UTC)
current := now
tlsConfig := testQUICTLSConfig(t)
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: tlsConfig,
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}
certSHA256 := testQUICCertSHA256(t, tlsConfig)
cache := NewPeerCache(PeerCacheConfig{
Local: local,
PeerEndpointCandidates: map[string][]PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-dead",
NodeID: "node-b",
Transport: "lan_quic",
Address: "quic://127.0.0.1:1",
Reachability: "private",
ConnectivityMode: "private_lan",
Priority: 1,
},
{
EndpointID: "node-b-live",
NodeID: "node-b",
Transport: "lan_quic",
Address: "quic://" + server.Addr().String(),
Reachability: "private",
ConnectivityMode: "private_lan",
Priority: 2,
Metadata: peerConnectionProbeMetadata(t, certSHA256),
},
},
},
WarmPeerLimit: 1,
Now: now,
})
tracker := NewPeerConnectionTracker(cache.Snapshot(), now)
manager := NewPeerConnectionManager(PeerConnectionManagerConfig{
Local: local,
PeerCache: cache,
Tracker: tracker,
QUICTransport: NewQUICFabricTransport(nil),
ProbeTimeout: 100 * time.Millisecond,
Now: func() time.Time {
current = current.Add(10 * time.Millisecond)
return current
},
})
cycle := manager.ProbeOnce(context.Background())
if cycle.Attempted != 1 || cycle.Succeeded != 1 || cycle.Failed != 0 || len(cycle.Results) != 1 {
t.Fatalf("unexpected cycle: %+v", cycle)
}
result := cycle.Results[0]
if result.LinkStatus != PeerConnectionProbeReachable || result.SelectedCandidateID != "node-b-live" || result.SelectedEndpoint != "quic://"+server.Addr().String() {
t.Fatalf("fallback did not select live candidate: %+v", result)
}
if len(result.CandidateResults) != 2 ||
result.CandidateResults[0].LinkStatus != PeerConnectionProbeUnreachable ||
result.CandidateResults[1].LinkStatus != PeerConnectionProbeReachable {
t.Fatalf("candidate probe trail mismatch: %+v", result.CandidateResults)
}
snapshot := tracker.Snapshot()
if snapshot.Ready != 1 || len(snapshot.Entries) != 1 || snapshot.Entries[0].BestCandidateID != "node-b-live" || snapshot.Entries[0].Endpoint != "quic://"+server.Addr().String() {
t.Fatalf("tracker did not retain selected candidate: %+v", snapshot)
}
}
func TestPeerConnectionManagerSkipsUnspecifiedQUICCandidates(t *testing.T) {
now := time.Date(2026, 5, 17, 6, 0, 0, 0, time.UTC)
current := now
tlsConfig := testQUICTLSConfig(t)
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
ListenAddr: "127.0.0.1:0",
TLSConfig: tlsConfig,
})
if err != nil {
t.Fatalf("start quic fabric server: %v", err)
}
defer server.Close()
local := PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}
certSHA256 := testQUICCertSHA256(t, tlsConfig)
cache := NewPeerCache(PeerCacheConfig{
Local: local,
PeerEndpointCandidates: map[string][]PeerEndpointCandidate{
"node-b": {
{
EndpointID: "node-b-unspecified-v6",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://[::]:19131",
Reachability: "public",
ConnectivityMode: "direct",
Priority: 1,
},
{
EndpointID: "node-b-live",
NodeID: "node-b",
Transport: "direct_quic",
Address: "quic://" + server.Addr().String(),
Reachability: "public",
ConnectivityMode: "direct",
Priority: 2,
Metadata: peerConnectionProbeMetadata(t, certSHA256),
},
},
},
WarmPeerLimit: 1,
Now: now,
})
tracker := NewPeerConnectionTracker(cache.Snapshot(), now)
manager := NewPeerConnectionManager(PeerConnectionManagerConfig{
Local: local,
PeerCache: cache,
Tracker: tracker,
QUICTransport: NewQUICFabricTransport(nil),
ProbeTimeout: time.Second,
Now: func() time.Time {
current = current.Add(10 * time.Millisecond)
return current
},
})
cycle := manager.ProbeOnce(context.Background())
if cycle.Attempted != 1 || cycle.Succeeded != 1 || len(cycle.Results) != 1 {
t.Fatalf("unexpected cycle: %+v", cycle)
}
result := cycle.Results[0]
if result.SelectedCandidateID != "node-b-live" || result.SelectedEndpoint != "quic://"+server.Addr().String() {
t.Fatalf("manager did not skip unspecified endpoint: %+v", result)
}
if len(result.CandidateResults) != 1 || result.CandidateResults[0].CandidateID != "node-b-live" {
t.Fatalf("unspecified endpoint should not be probed: %+v", result.CandidateResults)
}
}
func peerConnectionProbeMetadata(t *testing.T, certSHA256 string) json.RawMessage {
t.Helper()
payload, err := json.Marshal(map[string]string{"peer_cert_sha256": certSHA256})
if err != nil {
t.Fatalf("marshal probe metadata: %v", err)
}
return payload
}