package mesh import ( "context" "encoding/json" "errors" "testing" "time" ) type syntheticTestTransport struct { nodes map[string]*SyntheticRuntime } func (t syntheticTestTransport) SendSynthetic(ctx context.Context, nextNodeID string, envelope SyntheticEnvelope) (SyntheticEnvelope, error) { next := t.nodes[nextNodeID] if next == nil { return SyntheticEnvelope{}, ErrSyntheticPeerUnavailable } return next.Receive(ctx, envelope) } func TestSyntheticRuntimeDirectProbe(t *testing.T) { route := testRoute("route-direct", []string{"node-a", "node-b"}) transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntime("node-a", transport, route) nodeB := testRuntime("node-b", transport, route) transport.nodes["node-a"] = nodeA transport.nodes["node-b"] = nodeB ack, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-direct") if err != nil { t.Fatalf("send probe: %v", err) } if ack.MessageType != SyntheticMessageProbeAck { t.Fatalf("MessageType = %q, want %q", ack.MessageType, SyntheticMessageProbeAck) } if ack.From.NodeID != "node-b" || ack.To.NodeID != "node-a" { t.Fatalf("unexpected ack peers: from=%+v to=%+v", ack.From, ack.To) } payload := decodeAckPayload(t, ack) if len(payload.Path) != 2 || payload.Path[0] != "node-a" || payload.Path[1] != "node-b" { t.Fatalf("Path = %#v, want node-a -> node-b", payload.Path) } if nodeB.SnapshotMetrics().ProbeAcksCreated != 1 { t.Fatalf("ProbeAcksCreated = %d, want 1", nodeB.SnapshotMetrics().ProbeAcksCreated) } } func TestSyntheticRuntimeSingleRelayProbe(t *testing.T) { route := testRoute("route-relay", []string{"node-a", "node-r", "node-b"}) transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntime("node-a", transport, route) nodeR := testRuntime("node-r", transport, route) nodeB := testRuntime("node-b", transport, route) transport.nodes["node-a"] = nodeA transport.nodes["node-r"] = nodeR transport.nodes["node-b"] = nodeB ack, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-relay") if err != nil { t.Fatalf("send probe: %v", err) } payload := decodeAckPayload(t, ack) if len(payload.Path) != 3 || payload.Path[0] != "node-a" || payload.Path[1] != "node-r" || payload.Path[2] != "node-b" { t.Fatalf("Path = %#v, want node-a -> node-r -> node-b", payload.Path) } if nodeR.SnapshotMetrics().ProbesForwarded != 1 { t.Fatalf("ProbesForwarded = %d, want 1", nodeR.SnapshotMetrics().ProbesForwarded) } } func TestSyntheticRuntimeDisabledRejectsProbe(t *testing.T) { route := testRoute("route-disabled", []string{"node-a", "node-b"}) nodeA := NewSyntheticRuntime(SyntheticRuntimeConfig{ Enabled: false, Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, Routes: []SyntheticRoute{route}, }) _, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-disabled") if !errors.Is(err, ErrMeshRuntimeDisabled) { t.Fatalf("err = %v, want ErrMeshRuntimeDisabled", err) } } func TestSyntheticRuntimeRejectsWrongCluster(t *testing.T) { route := testRoute("route-wrong-cluster", []string{"node-a", "node-b"}) nodeB := testRuntime("node-b", syntheticTestTransport{}, route) envelope := testEnvelope(route, "node-a", "node-b") envelope.ClusterID = "cluster-2" _, err := nodeB.Receive(context.Background(), envelope) if !errors.Is(err, ErrClusterMismatch) { t.Fatalf("err = %v, want ErrClusterMismatch", err) } } func TestSyntheticRuntimeRejectsWrongNode(t *testing.T) { route := testRoute("route-wrong-node", []string{"node-a", "node-b"}) nodeB := testRuntime("node-b", syntheticTestTransport{}, route) envelope := testEnvelope(route, "node-a", "node-c") _, err := nodeB.Receive(context.Background(), envelope) if !errors.Is(err, ErrNodeMismatch) { t.Fatalf("err = %v, want ErrNodeMismatch", err) } } func TestSyntheticRuntimeRejectsUnauthorizedChannel(t *testing.T) { route := testRoute("route-unauthorized", []string{"node-a", "node-b"}) nodeA := testRuntime("node-a", syntheticTestTransport{}, route) _, err := nodeA.SendProbe(context.Background(), route.RouteID, "rdp_render", "probe-unauthorized") if !errors.Is(err, ErrUnauthorizedChannel) { t.Fatalf("err = %v, want ErrUnauthorizedChannel", err) } } func TestSyntheticRuntimeRejectsExpiredRoute(t *testing.T) { route := testRoute("route-expired", []string{"node-a", "node-b"}) route.ExpiresAt = time.Now().UTC().Add(-time.Minute) nodeA := testRuntime("node-a", syntheticTestTransport{}, route) _, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-expired") if !errors.Is(err, ErrRouteExpired) { t.Fatalf("err = %v, want ErrRouteExpired", err) } } func TestSyntheticRuntimeRejectsTTLExhaustion(t *testing.T) { route := testRoute("route-ttl", []string{"node-a", "node-r", "node-b"}) route.MaxTTL = 1 transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntime("node-a", transport, route) nodeR := testRuntime("node-r", transport, route) transport.nodes["node-a"] = nodeA transport.nodes["node-r"] = nodeR _, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-ttl") if !errors.Is(err, ErrTTLExhausted) { t.Fatalf("err = %v, want ErrTTLExhausted", err) } } func TestSyntheticRuntimeRejectsLoop(t *testing.T) { route := testRoute("route-loop", []string{"node-a", "node-b"}) nodeB := testRuntime("node-b", syntheticTestTransport{}, route) envelope := testEnvelope(route, "node-a", "node-b") envelope.Visited = []string{"node-a", "node-b"} _, err := nodeB.Receive(context.Background(), envelope) if !errors.Is(err, ErrLoopDetected) { t.Fatalf("err = %v, want ErrLoopDetected", err) } } func TestSyntheticRuntimeRejectsUnavailablePeer(t *testing.T) { route := testRoute("route-missing-peer", []string{"node-a", "node-b"}) nodeA := testRuntime("node-a", syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}}, route) _, err := nodeA.SendProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-missing-peer") if !errors.Is(err, ErrSyntheticPeerUnavailable) { t.Fatalf("err = %v, want ErrSyntheticPeerUnavailable", err) } } func TestSyntheticRuntimeRouteHealthProbeRecordsSuccess(t *testing.T) { route := testRoute("route-health", []string{"node-a", "node-b"}) transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntime("node-a", transport, route) nodeB := testRuntime("node-b", transport, route) transport.nodes["node-a"] = nodeA transport.nodes["node-b"] = nodeB result, err := nodeA.SendRouteHealthProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-health") if err != nil { t.Fatalf("send route health probe: %v", err) } if result.Ack.MessageType != SyntheticMessageRouteHealthAck { t.Fatalf("MessageType = %q, want %q", result.Ack.MessageType, SyntheticMessageRouteHealthAck) } if result.FallbackUsed { t.Fatal("FallbackUsed = true, want false") } observation, ok := nodeA.SnapshotRouteObservation(route.RouteID) if !ok { t.Fatal("route observation missing") } if observation.State != SyntheticRouteStateHealthy || observation.SuccessCount != 1 { t.Fatalf("observation = %+v, want healthy success", observation) } if observation.PolicyVersion != "policy-v1" || observation.PeerDirectoryVersion != "peers-v1" || observation.RouteVersion != "route-v1" { t.Fatalf("observation versions = %+v", observation) } metrics := nodeA.SnapshotMetrics() if metrics.RouteHealthProbesSent != 1 || metrics.RouteDeliveriesSucceeded != 1 { t.Fatalf("metrics = %+v, want health probe success", metrics) } } func TestSyntheticRuntimeRouteHealthUsesDedicatedRouteConfig(t *testing.T) { base := testRoute("route-effective-health", []string{"node-a", "node-old", "node-b"}) effective := testRoute("route-effective-health", []string{"node-a", "node-new", "node-b"}) effective.RouteVersion = "decision-v1" transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntimeWithRouteHealth("node-a", transport, []SyntheticRoute{base}, []SyntheticRoute{effective}) nodeOld := testRuntimeWithRouteHealth("node-old", transport, []SyntheticRoute{base}, []SyntheticRoute{effective}) nodeNew := testRuntimeWithRouteHealth("node-new", transport, []SyntheticRoute{base}, []SyntheticRoute{effective}) nodeB := testRuntimeWithRouteHealth("node-b", transport, []SyntheticRoute{base}, []SyntheticRoute{effective}) transport.nodes["node-a"] = nodeA transport.nodes["node-old"] = nodeOld transport.nodes["node-new"] = nodeNew transport.nodes["node-b"] = nodeB health, err := nodeA.SendRouteHealthProbe(context.Background(), base.RouteID, SyntheticChannelFabricControl, "probe-health-effective") if err != nil { t.Fatalf("send route health probe: %v", err) } healthPayload := decodeAckPayload(t, health.Ack) if got, want := healthPayload.Path, []string{"node-a", "node-new", "node-b"}; !sameStrings(got, want) { t.Fatalf("route health path = %v, want %v", got, want) } if nodeNew.SnapshotMetrics().ProbesForwarded != 1 { t.Fatalf("node-new forwarded = %d, want 1", nodeNew.SnapshotMetrics().ProbesForwarded) } if nodeOld.SnapshotMetrics().ProbesForwarded != 0 { t.Fatalf("node-old forwarded = %d, want 0 before regular probe", nodeOld.SnapshotMetrics().ProbesForwarded) } observation, ok := nodeA.SnapshotRouteObservation(base.RouteID) if !ok || observation.RouteVersion != "decision-v1" { t.Fatalf("route health observation = %+v, want decision route version", observation) } probe, err := nodeA.SendProbe(context.Background(), base.RouteID, SyntheticChannelFabricControl, "probe-regular") if err != nil { t.Fatalf("send regular probe: %v", err) } probePayload := decodeAckPayload(t, probe) if got, want := probePayload.Path, []string{"node-a", "node-old", "node-b"}; !sameStrings(got, want) { t.Fatalf("regular probe path = %v, want %v", got, want) } if nodeOld.SnapshotMetrics().ProbesForwarded != 1 { t.Fatalf("node-old forwarded = %d, want 1 after regular probe", nodeOld.SnapshotMetrics().ProbesForwarded) } } func TestSyntheticRuntimeRouteHealthUsesFallbackWhenPreferredUnavailable(t *testing.T) { preferred := testRoute("route-preferred", []string{"node-a", "node-r", "node-b"}) fallback := testRoute("route-fallback", []string{"node-a", "node-b"}) transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntime("node-a", transport, preferred, fallback) nodeB := testRuntime("node-b", transport, preferred, fallback) transport.nodes["node-a"] = nodeA transport.nodes["node-b"] = nodeB result, err := nodeA.SendRouteHealthProbeWithFallback( context.Background(), preferred.RouteID, []string{fallback.RouteID}, SyntheticChannelFabricControl, "probe-fallback", ) if err != nil { t.Fatalf("send route health probe with fallback: %v", err) } if !result.FallbackUsed { t.Fatal("FallbackUsed = false, want true") } if result.SelectedRouteID != fallback.RouteID { t.Fatalf("SelectedRouteID = %q, want %q", result.SelectedRouteID, fallback.RouteID) } preferredObservation, ok := nodeA.SnapshotRouteObservation(preferred.RouteID) if !ok { t.Fatal("preferred route observation missing") } if preferredObservation.State != SyntheticRouteStateFailed || preferredObservation.FailureCount != 1 { t.Fatalf("preferred observation = %+v, want failed", preferredObservation) } fallbackObservation, ok := nodeA.SnapshotRouteObservation(fallback.RouteID) if !ok { t.Fatal("fallback route observation missing") } if fallbackObservation.State != SyntheticRouteStateHealthy || fallbackObservation.SuccessCount != 1 { t.Fatalf("fallback observation = %+v, want healthy", fallbackObservation) } metrics := nodeA.SnapshotMetrics() if metrics.FallbackRoutesUsed != 1 || metrics.WarmRoutesPromoted != 1 || metrics.RouteDeliveriesFailed != 1 { t.Fatalf("metrics = %+v, want fallback promotion and one failed delivery", metrics) } } func TestSyntheticRuntimeRouteCacheInvalidatesOnVersionChange(t *testing.T) { route := testRoute("route-cache", []string{"node-a", "node-b"}) transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntime("node-a", transport, route) nodeB := testRuntime("node-b", transport, route) transport.nodes["node-a"] = nodeA transport.nodes["node-b"] = nodeB if _, err := nodeA.SendRouteHealthProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-cache"); err != nil { t.Fatalf("send route health probe: %v", err) } if _, ok := nodeA.SnapshotRouteObservation(route.RouteID); !ok { t.Fatal("route observation missing before invalidation") } invalidated := nodeA.InvalidateRouteCache("policy_changed", SyntheticRouteCacheVersion{PolicyVersion: "policy-v2"}) if invalidated != 1 { t.Fatalf("invalidated = %d, want 1", invalidated) } if _, ok := nodeA.SnapshotRouteObservation(route.RouteID); ok { t.Fatal("route observation still present after invalidation") } if nodeA.SnapshotMetrics().RouteCacheInvalidations != 1 { t.Fatalf("RouteCacheInvalidations = %d, want 1", nodeA.SnapshotMetrics().RouteCacheInvalidations) } } func TestSyntheticRuntimeRouteCacheKeepsCurrentVersion(t *testing.T) { route := testRoute("route-cache-current", []string{"node-a", "node-b"}) transport := syntheticTestTransport{nodes: map[string]*SyntheticRuntime{}} nodeA := testRuntime("node-a", transport, route) nodeB := testRuntime("node-b", transport, route) transport.nodes["node-a"] = nodeA transport.nodes["node-b"] = nodeB if _, err := nodeA.SendRouteHealthProbe(context.Background(), route.RouteID, SyntheticChannelFabricControl, "probe-cache-current"); err != nil { t.Fatalf("send route health probe: %v", err) } invalidated := nodeA.InvalidateRouteCache("same_versions", SyntheticRouteCacheVersion{ RouteVersion: "route-v1", PolicyVersion: "policy-v1", PeerDirectoryVersion: "peers-v1", }) if invalidated != 0 { t.Fatalf("invalidated = %d, want 0", invalidated) } if _, ok := nodeA.SnapshotRouteObservation(route.RouteID); !ok { t.Fatal("route observation missing after same-version invalidation") } } func TestSyntheticRuntimeRouteHealthDisabledRejects(t *testing.T) { route := testRoute("route-health-disabled", []string{"node-a", "node-b"}) nodeA := NewSyntheticRuntime(SyntheticRuntimeConfig{ Enabled: false, Local: PeerIdentity{ClusterID: "cluster-1", NodeID: "node-a"}, Routes: []SyntheticRoute{route}, }) _, err := nodeA.SendRouteHealthProbeWithFallback( context.Background(), route.RouteID, []string{"route-fallback"}, SyntheticChannelFabricControl, "probe-disabled-health", ) if !errors.Is(err, ErrMeshRuntimeDisabled) { t.Fatalf("err = %v, want ErrMeshRuntimeDisabled", err) } } func testRuntime(nodeID string, transport SyntheticTransport, routes ...SyntheticRoute) *SyntheticRuntime { return NewSyntheticRuntime(SyntheticRuntimeConfig{ Enabled: true, Local: PeerIdentity{ClusterID: "cluster-1", NodeID: nodeID}, Routes: routes, Transport: transport, MaxTTL: 8, MaxHops: 8, }) } func testRuntimeWithRouteHealth(nodeID string, transport SyntheticTransport, routes []SyntheticRoute, routeHealthRoutes []SyntheticRoute) *SyntheticRuntime { return NewSyntheticRuntime(SyntheticRuntimeConfig{ Enabled: true, Local: PeerIdentity{ClusterID: "cluster-1", NodeID: nodeID}, Routes: routes, RouteHealthRoutes: routeHealthRoutes, Transport: transport, MaxTTL: 8, MaxHops: 8, }) } func testRoute(routeID string, hops []string) SyntheticRoute { return SyntheticRoute{ RouteID: routeID, ClusterID: "cluster-1", SourceNodeID: hops[0], DestinationNodeID: hops[len(hops)-1], Hops: hops, AllowedChannels: []string{SyntheticChannelFabricControl}, ExpiresAt: time.Now().UTC().Add(time.Hour), MaxTTL: 8, MaxHops: 8, RouteVersion: "route-v1", PolicyVersion: "policy-v1", PeerDirectoryVersion: "peers-v1", } } func testEnvelope(route SyntheticRoute, fromNodeID string, toNodeID string) SyntheticEnvelope { payload, _ := json.Marshal(SyntheticProbePayload{ ProbeID: "probe-test", SentAt: time.Now().UTC(), }) return SyntheticEnvelope{ ProtocolVersion: ProtocolVersion, RouteID: route.RouteID, ClusterID: route.ClusterID, From: PeerIdentity{ClusterID: route.ClusterID, NodeID: fromNodeID}, To: PeerIdentity{ClusterID: route.ClusterID, NodeID: toNodeID}, Channel: SyntheticChannelFabricControl, MessageType: SyntheticMessageProbe, TTL: 8, HopCount: 1, Visited: []string{fromNodeID}, Sequence: 1, SentAt: time.Now().UTC(), Payload: payload, } } func decodeAckPayload(t *testing.T, envelope SyntheticEnvelope) SyntheticProbeAckPayload { t.Helper() var payload SyntheticProbeAckPayload if err := json.Unmarshal(envelope.Payload, &payload); err != nil { t.Fatalf("decode ack payload: %v", err) } return payload }