Files
2026-04-28 22:29:50 +03:00

433 lines
17 KiB
Go

package mesh
import (
"context"
"encoding/json"
"errors"
"testing"
"time"
)
type syntheticTestTransport struct {
nodes map[string]*SyntheticRuntime
}
func (t syntheticTestTransport) SendSynthetic(ctx context.Context, nextNodeID string, envelope SyntheticEnvelope) (SyntheticEnvelope, error) {
next := t.nodes[nextNodeID]
if next == nil {
return SyntheticEnvelope{}, ErrSyntheticPeerUnavailable
}
return next.Receive(ctx, envelope)
}
func TestSyntheticRuntimeDirectProbe(t *testing.T) {
route := testRoute("route-direct", []string{"node-a", "node-b"})
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntime("node-a", transport, route)
nodeB := testRuntime("node-b", transport, route)
transport.nodes["node-a"] = nodeA
transport.nodes["node-b"] = nodeB
ack, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-direct")
if err != nil {
t.Fatalf("send probe: %v", err)
}
if ack.MessageType != SyntheticMessageProbeAck {
t.Fatalf("MessageType = %q, want %q", ack.MessageType, SyntheticMessageProbeAck)
}
if ack.From.NodeID != "node-b" || ack.To.NodeID != "node-a" {
t.Fatalf("unexpected ack peers: from=%+v to=%+v", ack.From, ack.To)
}
payload := decodeAckPayload(t, ack)
if len(payload.Path) != 2 || payload.Path[0] != "node-a" || payload.Path[1] != "node-b" {
t.Fatalf("Path = %#v, want node-a -> node-b", payload.Path)
}
if nodeB.SnapshotMetrics().ProbeAcksCreated != 1 {
t.Fatalf("ProbeAcksCreated = %d, want 1", nodeB.SnapshotMetrics().ProbeAcksCreated)
}
}
func TestSyntheticRuntimeSingleRelayProbe(t *testing.T) {
route := testRoute("route-relay", []string{"node-a", "node-r", "node-b"})
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntime("node-a", transport, route)
nodeR := testRuntime("node-r", transport, route)
nodeB := testRuntime("node-b", transport, route)
transport.nodes["node-a"] = nodeA
transport.nodes["node-r"] = nodeR
transport.nodes["node-b"] = nodeB
ack, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-relay")
if err != nil {
t.Fatalf("send probe: %v", err)
}
payload := decodeAckPayload(t, ack)
if len(payload.Path) != 3 || payload.Path[0] != "node-a" || payload.Path[1] != "node-r" || payload.Path[2] != "node-b" {
t.Fatalf("Path = %#v, want node-a -> node-r -> node-b", payload.Path)
}
if nodeR.SnapshotMetrics().ProbesForwarded != 1 {
t.Fatalf("ProbesForwarded = %d, want 1", nodeR.SnapshotMetrics().ProbesForwarded)
}
}
func TestSyntheticRuntimeDisabledRejectsProbe(t *testing.T) {
route := testRoute("route-disabled", []string{"node-a", "node-b"})
nodeA := NewSyntheticRuntime(SyntheticRuntimeConfig{
Enabled: false,
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
Routes: []SyntheticRoute{route},
})
_, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-disabled")
if !errors.Is(err, ErrMeshRuntimeDisabled) {
t.Fatalf("err = %v, want ErrMeshRuntimeDisabled", err)
}
}
func TestSyntheticRuntimeRejectsWrongCluster(t *testing.T) {
route := testRoute("route-wrong-cluster", []string{"node-a", "node-b"})
nodeB := testRuntime("node-b", syntheticTestTransport{}, route)
envelope := testEnvelope(route, "node-a", "node-b")
envelope.ClusterID = "cluster-2"
_, err := nodeB.Receive(context.Background(), envelope)
if !errors.Is(err, ErrClusterMismatch) {
t.Fatalf("err = %v, want ErrClusterMismatch", err)
}
}
func TestSyntheticRuntimeRejectsWrongNode(t *testing.T) {
route := testRoute("route-wrong-node", []string{"node-a", "node-b"})
nodeB := testRuntime("node-b", syntheticTestTransport{}, route)
envelope := testEnvelope(route, "node-a", "node-c")
_, err := nodeB.Receive(context.Background(), envelope)
if !errors.Is(err, ErrNodeMismatch) {
t.Fatalf("err = %v, want ErrNodeMismatch", err)
}
}
func TestSyntheticRuntimeRejectsUnauthorizedChannel(t *testing.T) {
route := testRoute("route-unauthorized", []string{"node-a", "node-b"})
nodeA := testRuntime("node-a", syntheticTestTransport{}, route)
_, err := nodeA.SendProbe(context.Background(), route.RouteID, "rdp_render", "probe-unauthorized")
if !errors.Is(err, ErrUnauthorizedChannel) {
t.Fatalf("err = %v, want ErrUnauthorizedChannel", err)
}
}
func TestSyntheticRuntimeRejectsExpiredRoute(t *testing.T) {
route := testRoute("route-expired", []string{"node-a", "node-b"})
route.ExpiresAt = time.Now().UTC().Add(-time.Minute)
nodeA := testRuntime("node-a", syntheticTestTransport{}, route)
_, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-expired")
if !errors.Is(err, ErrRouteExpired) {
t.Fatalf("err = %v, want ErrRouteExpired", err)
}
}
func TestSyntheticRuntimeRejectsTTLExhaustion(t *testing.T) {
route := testRoute("route-ttl", []string{"node-a", "node-r", "node-b"})
route.MaxTTL = 1
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntime("node-a", transport, route)
nodeR := testRuntime("node-r", transport, route)
transport.nodes["node-a"] = nodeA
transport.nodes["node-r"] = nodeR
_, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-ttl")
if !errors.Is(err, ErrTTLExhausted) {
t.Fatalf("err = %v, want ErrTTLExhausted", err)
}
}
func TestSyntheticRuntimeRejectsLoop(t *testing.T) {
route := testRoute("route-loop", []string{"node-a", "node-b"})
nodeB := testRuntime("node-b", syntheticTestTransport{}, route)
envelope := testEnvelope(route, "node-a", "node-b")
envelope.Visited = []string{"node-a", "node-b"}
_, err := nodeB.Receive(context.Background(), envelope)
if !errors.Is(err, ErrLoopDetected) {
t.Fatalf("err = %v, want ErrLoopDetected", err)
}
}
func TestSyntheticRuntimeRejectsUnavailablePeer(t *testing.T) {
route := testRoute("route-missing-peer", []string{"node-a", "node-b"})
nodeA := testRuntime("node-a", syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}, route)
_, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-missing-peer")
if !errors.Is(err, ErrSyntheticPeerUnavailable) {
t.Fatalf("err = %v, want ErrSyntheticPeerUnavailable", err)
}
}
func TestSyntheticRuntimeRouteHealthProbeRecordsSuccess(t *testing.T) {
route := testRoute("route-health", []string{"node-a", "node-b"})
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntime("node-a", transport, route)
nodeB := testRuntime("node-b", transport, route)
transport.nodes["node-a"] = nodeA
transport.nodes["node-b"] = nodeB
result, err := nodeA.SendRouteHealthProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-health")
if err != nil {
t.Fatalf("send route health probe: %v", err)
}
if result.Ack.MessageType != SyntheticMessageRouteHealthAck {
t.Fatalf("MessageType = %q, want %q", result.Ack.MessageType, SyntheticMessageRouteHealthAck)
}
if result.FallbackUsed {
t.Fatal("FallbackUsed = true, want false")
}
observation, ok := nodeA.SnapshotRouteObservation(route.RouteID)
if !ok {
t.Fatal("route observation missing")
}
if observation.State != SyntheticRouteStateHealthy || observation.SuccessCount != 1 {
t.Fatalf("observation = %+v, want healthy success", observation)
}
if observation.PolicyVersion != "policy-v1" || observation.PeerDirectoryVersion != "peers-v1" || observation.RouteVersion != "route-v1" {
t.Fatalf("observation versions = %+v", observation)
}
metrics := nodeA.SnapshotMetrics()
if metrics.RouteHealthProbesSent != 1 || metrics.RouteDeliveriesSucceeded != 1 {
t.Fatalf("metrics = %+v, want health probe success", metrics)
}
}
func TestSyntheticRuntimeRouteHealthUsesDedicatedRouteConfig(t *testing.T) {
base := testRoute("route-effective-health", []string{"node-a", "node-old", "node-b"})
effective := testRoute("route-effective-health", []string{"node-a", "node-new", "node-b"})
effective.RouteVersion = "decision-v1"
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntimeWithRouteHealth("node-a", transport, []SyntheticRoute{base}, []SyntheticRoute{effective})
nodeOld := testRuntimeWithRouteHealth("node-old", transport, []SyntheticRoute{base}, []SyntheticRoute{effective})
nodeNew := testRuntimeWithRouteHealth("node-new", transport, []SyntheticRoute{base}, []SyntheticRoute{effective})
nodeB := testRuntimeWithRouteHealth("node-b", transport, []SyntheticRoute{base}, []SyntheticRoute{effective})
transport.nodes["node-a"] = nodeA
transport.nodes["node-old"] = nodeOld
transport.nodes["node-new"] = nodeNew
transport.nodes["node-b"] = nodeB
health, err := nodeA.SendRouteHealthProbe(context.Background(), base.RouteID, SyntheticChannelFabricControl, "probe-health-effective")
if err != nil {
t.Fatalf("send route health probe: %v", err)
}
healthPayload := decodeAckPayload(t, health.Ack)
if got, want := healthPayload.Path, []string{"node-a", "node-new", "node-b"}; !sameStrings(got, want) {
t.Fatalf("route health path = %v, want %v", got, want)
}
if nodeNew.SnapshotMetrics().ProbesForwarded != 1 {
t.Fatalf("node-new forwarded = %d, want 1", nodeNew.SnapshotMetrics().ProbesForwarded)
}
if nodeOld.SnapshotMetrics().ProbesForwarded != 0 {
t.Fatalf("node-old forwarded = %d, want 0 before regular probe", nodeOld.SnapshotMetrics().ProbesForwarded)
}
observation, ok := nodeA.SnapshotRouteObservation(base.RouteID)
if !ok || observation.RouteVersion != "decision-v1" {
t.Fatalf("route health observation = %+v, want decision route version", observation)
}
probe, err := nodeA.SendProbe(context.Background(), base.RouteID, SyntheticChannelFabricControl, "probe-regular")
if err != nil {
t.Fatalf("send regular probe: %v", err)
}
probePayload := decodeAckPayload(t, probe)
if got, want := probePayload.Path, []string{"node-a", "node-old", "node-b"}; !sameStrings(got, want) {
t.Fatalf("regular probe path = %v, want %v", got, want)
}
if nodeOld.SnapshotMetrics().ProbesForwarded != 1 {
t.Fatalf("node-old forwarded = %d, want 1 after regular probe", nodeOld.SnapshotMetrics().ProbesForwarded)
}
}
func TestSyntheticRuntimeRouteHealthUsesFallbackWhenPreferredUnavailable(t *testing.T) {
preferred := testRoute("route-preferred", []string{"node-a", "node-r", "node-b"})
fallback := testRoute("route-fallback", []string{"node-a", "node-b"})
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntime("node-a", transport, preferred, fallback)
nodeB := testRuntime("node-b", transport, preferred, fallback)
transport.nodes["node-a"] = nodeA
transport.nodes["node-b"] = nodeB
result, err := nodeA.SendRouteHealthProbeWithFallback(
context.Background(),
preferred.RouteID,
[]string{fallback.RouteID},
SyntheticChannelFabricControl,
"probe-fallback",
)
if err != nil {
t.Fatalf("send route health probe with fallback: %v", err)
}
if !result.FallbackUsed {
t.Fatal("FallbackUsed = false, want true")
}
if result.SelectedRouteID != fallback.RouteID {
t.Fatalf("SelectedRouteID = %q, want %q", result.SelectedRouteID, fallback.RouteID)
}
preferredObservation, ok := nodeA.SnapshotRouteObservation(preferred.RouteID)
if !ok {
t.Fatal("preferred route observation missing")
}
if preferredObservation.State != SyntheticRouteStateFailed || preferredObservation.FailureCount != 1 {
t.Fatalf("preferred observation = %+v, want failed", preferredObservation)
}
fallbackObservation, ok := nodeA.SnapshotRouteObservation(fallback.RouteID)
if !ok {
t.Fatal("fallback route observation missing")
}
if fallbackObservation.State != SyntheticRouteStateHealthy || fallbackObservation.SuccessCount != 1 {
t.Fatalf("fallback observation = %+v, want healthy", fallbackObservation)
}
metrics := nodeA.SnapshotMetrics()
if metrics.FallbackRoutesUsed != 1 || metrics.WarmRoutesPromoted != 1 || metrics.RouteDeliveriesFailed != 1 {
t.Fatalf("metrics = %+v, want fallback promotion and one failed delivery", metrics)
}
}
func TestSyntheticRuntimeRouteCacheInvalidatesOnVersionChange(t *testing.T) {
route := testRoute("route-cache", []string{"node-a", "node-b"})
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntime("node-a", transport, route)
nodeB := testRuntime("node-b", transport, route)
transport.nodes["node-a"] = nodeA
transport.nodes["node-b"] = nodeB
if _, err := nodeA.SendRouteHealthProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-cache"); err != nil {
t.Fatalf("send route health probe: %v", err)
}
if _, ok := nodeA.SnapshotRouteObservation(route.RouteID); !ok {
t.Fatal("route observation missing before invalidation")
}
invalidated := nodeA.InvalidateRouteCache("policy_changed", SyntheticRouteCacheVersion{PolicyVersion: "policy-v2"})
if invalidated != 1 {
t.Fatalf("invalidated = %d, want 1", invalidated)
}
if _, ok := nodeA.SnapshotRouteObservation(route.RouteID); ok {
t.Fatal("route observation still present after invalidation")
}
if nodeA.SnapshotMetrics().RouteCacheInvalidations != 1 {
t.Fatalf("RouteCacheInvalidations = %d, want 1", nodeA.SnapshotMetrics().RouteCacheInvalidations)
}
}
func TestSyntheticRuntimeRouteCacheKeepsCurrentVersion(t *testing.T) {
route := testRoute("route-cache-current", []string{"node-a", "node-b"})
transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}
nodeA := testRuntime("node-a", transport, route)
nodeB := testRuntime("node-b", transport, route)
transport.nodes["node-a"] = nodeA
transport.nodes["node-b"] = nodeB
if _, err := nodeA.SendRouteHealthProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-cache-current"); err != nil {
t.Fatalf("send route health probe: %v", err)
}
invalidated := nodeA.InvalidateRouteCache("same_versions", SyntheticRouteCacheVersion{
RouteVersion: "route-v1",
PolicyVersion: "policy-v1",
PeerDirectoryVersion: "peers-v1",
})
if invalidated != 0 {
t.Fatalf("invalidated = %d, want 0", invalidated)
}
if _, ok := nodeA.SnapshotRouteObservation(route.RouteID); !ok {
t.Fatal("route observation missing after same-version invalidation")
}
}
func TestSyntheticRuntimeRouteHealthDisabledRejects(t *testing.T) {
route := testRoute("route-health-disabled", []string{"node-a", "node-b"})
nodeA := NewSyntheticRuntime(SyntheticRuntimeConfig{
Enabled: false,
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"},
Routes: []SyntheticRoute{route},
})
_, err := nodeA.SendRouteHealthProbeWithFallback(
context.Background(),
route.RouteID,
[]string{"route-fallback"},
SyntheticChannelFabricControl,
"probe-disabled-health",
)
if !errors.Is(err, ErrMeshRuntimeDisabled) {
t.Fatalf("err = %v, want ErrMeshRuntimeDisabled", err)
}
}
func testRuntime(nodeID string, transport SyntheticTransport, routes ...SyntheticRoute) *SyntheticRuntime {
return NewSyntheticRuntime(SyntheticRuntimeConfig{
Enabled: true,
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: nodeID},
Routes: routes,
Transport: transport,
MaxTTL: 8,
MaxHops: 8,
})
}
func testRuntimeWithRouteHealth(nodeID string, transport SyntheticTransport, routes []SyntheticRoute, routeHealthRoutes []SyntheticRoute) *SyntheticRuntime {
return NewSyntheticRuntime(SyntheticRuntimeConfig{
Enabled: true,
Local: PeerIdentity{ClusterID: "cluster-1", NodeID: nodeID},
Routes: routes,
RouteHealthRoutes: routeHealthRoutes,
Transport: transport,
MaxTTL: 8,
MaxHops: 8,
})
}
func testRoute(routeID string, hops []string) SyntheticRoute {
return SyntheticRoute{
RouteID: routeID,
ClusterID: "cluster-1",
SourceNodeID: hops[0],
DestinationNodeID: hops[len(hops)-1],
Hops: hops,
AllowedChannels: []string{SyntheticChannelFabricControl},
ExpiresAt: time.Now().UTC().Add(time.Hour),
MaxTTL: 8,
MaxHops: 8,
RouteVersion: "route-v1",
PolicyVersion: "policy-v1",
PeerDirectoryVersion: "peers-v1",
}
}
func testEnvelope(route SyntheticRoute, fromNodeID string, toNodeID string) SyntheticEnvelope {
payload, _ := json.Marshal(SyntheticProbePayload{
ProbeID: "probe-test",
SentAt: time.Now().UTC(),
})
return SyntheticEnvelope{
ProtocolVersion: ProtocolVersion,
RouteID: route.RouteID,
ClusterID: route.ClusterID,
From: PeerIdentity{ClusterID: route.ClusterID, NodeID: fromNodeID},
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: toNodeID},
Channel: SyntheticChannelFabricControl,
MessageType: SyntheticMessageProbe,
TTL: 8,
HopCount: 1,
Visited: []string{fromNodeID},
Sequence: 1,
SentAt: time.Now().UTC(),
Payload: payload,
}
}
func decodeAckPayload(t *testing.T, envelope SyntheticEnvelope) SyntheticProbeAckPayload {
t.Helper()
var payload SyntheticProbeAckPayload
if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
t.Fatalf("decode ack payload: %v", err)
}
return payload
}