package mesh import ( "context" "strings" "sync" "testing" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" ) func TestFabricChannelRuntimeMigratesSlowAckToStandbyRoute(t *testing.T) { transport := newFakeFabricRuntimeTransport(map[string]time.Duration{ "quic://slow.example.test:19443": 60 * time.Millisecond, "quic://fast.example.test:19443": 0, }) runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{ RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30}, StreamID: 9, }) routeSet := FabricRouteSet{ TargetKind: FabricChannelTargetNode, TargetID: "node-b", Primary: testRuntimeRoute("route-slow", "node-b", "quic://slow.example.test:19443", 10), WarmStandby: []FabricRoute{ testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20), }, } result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{ []byte("one"), []byte("two"), []byte("three"), }) if err != nil { t.Fatalf("send reliable: %v", err) } if result.MigrationEvents != 1 { t.Fatalf("migration events = %d, want 1: %+v", result.MigrationEvents, result.RouteEvents) } if result.Channel.RouteID != "route-fast" || result.Channel.RerouteCount != 1 { t.Fatalf("channel = %+v", result.Channel) } if result.BytesSent != uint64(len("one")+len("two")+len("three")) || result.AcksReceived != 3 { t.Fatalf("result = %+v", result) } if got := transport.connectCount("quic://slow.example.test:19443"); got != 1 { t.Fatalf("slow connect count = %d, want 1", got) } if got := transport.connectCount("quic://fast.example.test:19443"); got != 1 { t.Fatalf("fast connect count = %d, want 1", got) } if result.RoutePressure.AcquiredTotal != 2 || result.RoutePressure.ReleasedTotal != 2 || result.RoutePressure.MaxActiveTotal == 0 { t.Fatalf("route pressure = %+v", result.RoutePressure) } } func TestFabricChannelRuntimeReroutesOnConnectFailure(t *testing.T) { transport := newFakeFabricRuntimeTransport(map[string]time.Duration{ "quic://fast.example.test:19443": 0, }) transport.failConnect["quic://dead.example.test:19443"] = true runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{ RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30}, StreamID: 9, }) routeSet := FabricRouteSet{ TargetKind: FabricChannelTargetNode, TargetID: "node-b", Primary: testRuntimeRoute("route-dead", "node-b", "quic://dead.example.test:19443", 10), WarmStandby: []FabricRoute{ testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20), }, } result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("payload")}) if err != nil { t.Fatalf("send reliable: %v", err) } if result.MigrationEvents != 1 || result.Channel.RouteID != "route-fast" || result.BytesSent != uint64(len("payload")) { t.Fatalf("result = %+v", result) } } func TestFabricChannelRuntimeQuarantinesFailedRouteAcrossChannels(t *testing.T) { transport := newFakeFabricRuntimeTransport(map[string]time.Duration{ "quic://fast.example.test:19443": 0, }) transport.failConnect["quic://dead.example.test:19443"] = true runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{ RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30}, StreamID: 9, RouteHealthTTL: time.Minute, }) routeSet := FabricRouteSet{ TargetKind: FabricChannelTargetNode, TargetID: "node-b", Primary: testRuntimeRoute("route-dead", "node-b", "quic://dead.example.test:19443", 10), WarmStandby: []FabricRoute{ testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20), }, } first, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("first")}) if err != nil { t.Fatalf("first send reliable: %v", err) } if first.Channel.RouteID != "route-fast" || first.RouteHealth.Quarantined["route-dead"].Failures != 1 { t.Fatalf("first result = %+v", first) } second, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("second")}) if err != nil { t.Fatalf("second send reliable: %v", err) } if second.Channel.RouteID != "route-fast" { t.Fatalf("second route = %s, want route-fast", second.Channel.RouteID) } if got := transport.connectCount("quic://dead.example.test:19443"); got != 1 { t.Fatalf("dead connect count = %d, want one attempt before quarantine", got) } if got := transport.connectCount("quic://fast.example.test:19443"); got != 2 { t.Fatalf("fast connect count = %d, want both channels on healthy route", got) } } func TestFabricChannelRuntimeReroutesOnAckTimeout(t *testing.T) { transport := newFakeFabricRuntimeTransport(map[string]time.Duration{ "quic://slow.example.test:19443": 100 * time.Millisecond, "quic://fast.example.test:19443": 0, }) runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{ RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30}, StreamID: 9, Timeout: 10 * time.Millisecond, }) routeSet := FabricRouteSet{ TargetKind: FabricChannelTargetNode, TargetID: "node-b", Primary: testRuntimeRoute("route-slow", "node-b", "quic://slow.example.test:19443", 10), WarmStandby: []FabricRoute{ testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20), }, } result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("payload")}) if err != nil { t.Fatalf("send reliable: %v", err) } if result.MigrationEvents != 1 || result.Channel.RouteID != "route-fast" || result.BytesSent != uint64(len("payload")) { t.Fatalf("result = %+v", result) } } func TestFabricChannelRuntimeSpreadsConcurrentChannelsBySharedPressure(t *testing.T) { transport := newFakeFabricRuntimeTransport(map[string]time.Duration{ "quic://route-a.example.test:19443": 80 * time.Millisecond, "quic://route-b.example.test:19443": 0, }) runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{StreamID: 9}) routeSet := FabricRouteSet{ TargetKind: FabricChannelTargetNode, TargetID: "node-b", Primary: testRuntimeRoute("route-a", "node-b", "quic://route-a.example.test:19443", 10), WarmStandby: []FabricRoute{ testRuntimeRoute("route-b", "node-b", "quic://route-b.example.test:19443", 11), }, } firstDone := make(chan error, 1) go func() { _, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("one")}) firstDone <- err }() transport.waitForConnect(t, "quic://route-a.example.test:19443", 1) result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("two")}) if err != nil { t.Fatalf("second send reliable: %v", err) } if result.Channel.RouteID != "route-b" { t.Fatalf("second route = %s, want route-b", result.Channel.RouteID) } if got := transport.connectCount("quic://route-b.example.test:19443"); got != 1 { t.Fatalf("route-b connect count = %d, want 1", got) } if err := <-firstDone; err != nil { t.Fatalf("first send reliable: %v", err) } } func TestFabricChannelRuntimeRequestResponseReturnsPayload(t *testing.T) { transport := newFakeFabricRequestResponseTransport(map[string][]byte{ "quic://runtime.example.test:19443": []byte(`{"status":"ok"}`), }) runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{ RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30}, StreamID: 9, }) routeSet := FabricRouteSet{ TargetKind: FabricChannelTargetPool, TargetID: "pool-admin-runtime", Primary: testRuntimePoolRoute("route-runtime", "pool-admin-runtime", "node-runtime", "quic://runtime.example.test:19443", 10), } result, err := runtime.SendRequestResponse(context.Background(), FabricChannelSpec{ ChannelID: "channel-web-1", ClusterID: "cluster-1", SourceNodeID: "node-a", TargetKind: FabricChannelTargetPool, TargetID: "pool-admin-runtime", TrafficClass: "control", CreatedAt: time.Now().UTC(), }, routeSet, []byte(`{"request":true}`)) if err != nil { t.Fatalf("request response: %v", err) } if string(result.ResponsePayload) != `{"status":"ok"}` { t.Fatalf("response payload = %s", string(result.ResponsePayload)) } if result.Channel.RouteID != "route-runtime" || result.BytesSent != uint64(len(`{"request":true}`)) || result.BytesRecv != uint64(len(`{"status":"ok"}`)) || result.FramesSent != 1 || result.FramesRecv != 1 || result.AcksReceived != 1 { t.Fatalf("result = %+v", result) } } func TestFabricChannelRuntimeRequestResponseReroutesOnResponseFailure(t *testing.T) { transport := newFakeFabricRequestResponseTransport(map[string][]byte{ "quic://fast.example.test:19443": []byte(`{"status":"ok"}`), }) transport.failResponse["quic://slow.example.test:19443"] = true runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{ RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30}, StreamID: 9, Timeout: 10 * time.Millisecond, }) routeSet := FabricRouteSet{ TargetKind: FabricChannelTargetNode, TargetID: "node-runtime", Primary: testRuntimeRoute("route-slow", "node-runtime", "quic://slow.example.test:19443", 10), WarmStandby: []FabricRoute{ testRuntimeRoute("route-fast", "node-runtime", "quic://fast.example.test:19443", 20), }, } result, err := runtime.SendRequestResponse(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-runtime"), routeSet, []byte(`{"request":true}`)) if err != nil { t.Fatalf("request response: %v", err) } if result.MigrationEvents != 1 || result.Channel.RouteID != "route-fast" || string(result.ResponsePayload) != `{"status":"ok"}` { t.Fatalf("result = %+v", result) } } func TestFabricTransportTargetForRouteUsesLastAddressedHop(t *testing.T) { target, err := FabricTransportTargetForRoute(FabricRoute{ RouteID: "route-1", Hops: []FabricRouteHop{ {NodeID: "node-a"}, {NodeID: "node-r", Mode: FabricRouteRelay, EndpointID: "relay-1", Address: "quic://relay.example.test:19443"}, {NodeID: "node-b", Mode: FabricRouteDirect, EndpointID: "node-b-quic", Address: "quic://node-b.example.test:19443"}, }, }) if err != nil { t.Fatalf("target for route: %v", err) } if target.PeerID != "node-b" || target.EndpointID != "node-b-quic" || target.Endpoint != "quic://node-b.example.test:19443" || target.Transport != string(FabricRouteDirect) { t.Fatalf("target = %+v", target) } } type fakeFabricRequestResponseTransport struct { mu sync.Mutex responses map[string][]byte failResponse map[string]bool connects map[string]int } func newFakeFabricRequestResponseTransport(responses map[string][]byte) *fakeFabricRequestResponseTransport { return &fakeFabricRequestResponseTransport{ responses: responses, failResponse: map[string]bool{}, connects: map[string]int{}, } } func (t *fakeFabricRequestResponseTransport) Connect(_ context.Context, target FabricTransportTarget) (FabricTransportSession, error) { endpoint := target.Endpoint t.mu.Lock() t.connects[endpoint]++ response := append([]byte(nil), t.responses[endpoint]...) failResponse := t.failResponse[endpoint] t.mu.Unlock() return &fakeFabricRequestResponseSession{ response: response, failResponse: failResponse, frames: make(chan fabricproto.Frame, 16), errors: make(chan error, 1), done: make(chan struct{}), }, nil } func (t *fakeFabricRequestResponseTransport) Close() error { return nil } type fakeFabricRequestResponseSession struct { response []byte failResponse bool frames chan fabricproto.Frame errors chan error done chan struct{} once sync.Once } func (s *fakeFabricRequestResponseSession) Send(_ context.Context, frame fabricproto.Frame) error { if frame.Type != fabricproto.FrameData || s.failResponse { return nil } response := append([]byte(nil), s.response...) go func() { select { case <-s.done: case s.frames <- fabricproto.Frame{Type: fabricproto.FrameData, TrafficClass: frame.TrafficClass, StreamID: frame.StreamID, Sequence: frame.Sequence, Payload: response}: } }() return nil } func (s *fakeFabricRequestResponseSession) Frames() <-chan fabricproto.Frame { return s.frames } func (s *fakeFabricRequestResponseSession) Errors() <-chan error { return s.errors } func (s *fakeFabricRequestResponseSession) Close() error { s.once.Do(func() { close(s.done) }) return nil } func (s *fakeFabricRequestResponseSession) Closed() bool { select { case <-s.done: return true default: return false } } func TestFabricTransportTargetForRouteUsesRelayHopForRelayRoute(t *testing.T) { target, err := FabricTransportTargetForRoute(FabricRoute{ RouteID: "route-relay", RelayCount: 1, Hops: []FabricRouteHop{ {NodeID: "node-a"}, {NodeID: "node-r", Mode: FabricRouteRelay, EndpointID: "relay-1", Address: "quic://relay.example.test:19443", PeerCertSHA256: "relay-cert"}, {NodeID: "node-b", Mode: FabricRouteRelay, EndpointID: "node-b-private", Address: "quic://10.0.0.2:19443", PeerCertSHA256: "node-b-cert"}, }, }) if err != nil { t.Fatalf("target for relay route: %v", err) } if target.PeerID != "node-r" || target.EndpointID != "relay-1" || target.Endpoint != "quic://relay.example.test:19443" || target.PeerCertSHA256 != "relay-cert" { t.Fatalf("target = %+v", target) } } type fakeFabricRuntimeTransport struct { mu sync.Mutex delays map[string]time.Duration failConnect map[string]bool connects map[string]int } func newFakeFabricRuntimeTransport(delays map[string]time.Duration) *fakeFabricRuntimeTransport { return &fakeFabricRuntimeTransport{ delays: delays, failConnect: map[string]bool{}, connects: map[string]int{}, } } func (t *fakeFabricRuntimeTransport) Connect(_ context.Context, target FabricTransportTarget) (FabricTransportSession, error) { endpoint := target.Endpoint t.mu.Lock() t.connects[endpoint]++ fail := t.failConnect[endpoint] delay := t.delays[endpoint] t.mu.Unlock() if fail { return nil, ErrForwardPeerUnavailable } return &fakeFabricRuntimeSession{ endpoint: endpoint, delay: delay, frames: make(chan fabricproto.Frame, 64), errors: make(chan error, 1), done: make(chan struct{}), }, nil } func (t *fakeFabricRuntimeTransport) Close() error { return nil } func (t *fakeFabricRuntimeTransport) connectCount(endpoint string) int { t.mu.Lock() defer t.mu.Unlock() return t.connects[endpoint] } func (t *fakeFabricRuntimeTransport) waitForConnect(tb testing.TB, endpoint string, count int) { tb.Helper() deadline := time.Now().Add(time.Second) for { t.mu.Lock() got := t.connects[endpoint] t.mu.Unlock() if got >= count { return } if time.Now().After(deadline) { tb.Fatalf("timed out waiting for %s connect count %d, got %d", endpoint, count, got) } time.Sleep(time.Millisecond) } } type fakeFabricRuntimeSession struct { endpoint string delay time.Duration frames chan fabricproto.Frame errors chan error done chan struct{} once sync.Once } func (s *fakeFabricRuntimeSession) Send(_ context.Context, frame fabricproto.Frame) error { if frame.Type != fabricproto.FrameData { return nil } delay := s.delay go func() { if delay > 0 { time.Sleep(delay) } select { case <-s.done: case s.frames <- fabricproto.Frame{Type: fabricproto.FrameAck, TrafficClass: frame.TrafficClass, StreamID: frame.StreamID, Sequence: frame.Sequence}: } }() return nil } func (s *fakeFabricRuntimeSession) Frames() <-chan fabricproto.Frame { return s.frames } func (s *fakeFabricRuntimeSession) Errors() <-chan error { return s.errors } func (s *fakeFabricRuntimeSession) Close() error { s.once.Do(func() { close(s.done) }) return nil } func (s *fakeFabricRuntimeSession) Closed() bool { select { case <-s.done: return true default: return false } } func testRuntimeRoute(routeID string, destination string, endpoint string, latency int) FabricRoute { route := testFabricRoute(routeID, destination, latency, 100, 0, true) route.Hops[len(route.Hops)-1].Address = endpoint route.Hops[len(route.Hops)-1].EndpointID = strings.TrimPrefix(routeID, "route-") route.Hops[len(route.Hops)-1].Mode = FabricRouteDirect return route } func testRuntimePoolRoute(routeID string, poolID string, destination string, endpoint string, latency int) FabricRoute { route := testRuntimeRoute(routeID, destination, endpoint, latency) route.PoolID = poolID return route }