Files
rdp-proxy/agents/rap-node-agent/internal/mesh/fabric_channel_runtime_test.go
T

496 lines
17 KiB
Go

package mesh
import (
"context"
"strings"
"sync"
"testing"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
)
func TestFabricChannelRuntimeMigratesSlowAckToStandbyRoute(t *testing.T) {
transport := newFakeFabricRuntimeTransport(map[string]time.Duration{
"quic://slow.example.test:19443": 60 * time.Millisecond,
"quic://fast.example.test:19443": 0,
})
runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{
RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30},
StreamID: 9,
})
routeSet := FabricRouteSet{
TargetKind: FabricChannelTargetNode,
TargetID: "node-b",
Primary: testRuntimeRoute("route-slow", "node-b", "quic://slow.example.test:19443", 10),
WarmStandby: []FabricRoute{
testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20),
},
}
result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{
[]byte("one"),
[]byte("two"),
[]byte("three"),
})
if err != nil {
t.Fatalf("send reliable: %v", err)
}
if result.MigrationEvents != 1 {
t.Fatalf("migration events = %d, want 1: %+v", result.MigrationEvents, result.RouteEvents)
}
if result.Channel.RouteID != "route-fast" || result.Channel.RerouteCount != 1 {
t.Fatalf("channel = %+v", result.Channel)
}
if result.BytesSent != uint64(len("one")+len("two")+len("three")) || result.AcksReceived != 3 {
t.Fatalf("result = %+v", result)
}
if got := transport.connectCount("quic://slow.example.test:19443"); got != 1 {
t.Fatalf("slow connect count = %d, want 1", got)
}
if got := transport.connectCount("quic://fast.example.test:19443"); got != 1 {
t.Fatalf("fast connect count = %d, want 1", got)
}
if result.RoutePressure.AcquiredTotal != 2 || result.RoutePressure.ReleasedTotal != 2 || result.RoutePressure.MaxActiveTotal == 0 {
t.Fatalf("route pressure = %+v", result.RoutePressure)
}
}
func TestFabricChannelRuntimeReroutesOnConnectFailure(t *testing.T) {
transport := newFakeFabricRuntimeTransport(map[string]time.Duration{
"quic://fast.example.test:19443": 0,
})
transport.failConnect["quic://dead.example.test:19443"] = true
runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{
RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30},
StreamID: 9,
})
routeSet := FabricRouteSet{
TargetKind: FabricChannelTargetNode,
TargetID: "node-b",
Primary: testRuntimeRoute("route-dead", "node-b", "quic://dead.example.test:19443", 10),
WarmStandby: []FabricRoute{
testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20),
},
}
result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("payload")})
if err != nil {
t.Fatalf("send reliable: %v", err)
}
if result.MigrationEvents != 1 || result.Channel.RouteID != "route-fast" || result.BytesSent != uint64(len("payload")) {
t.Fatalf("result = %+v", result)
}
}
func TestFabricChannelRuntimeQuarantinesFailedRouteAcrossChannels(t *testing.T) {
transport := newFakeFabricRuntimeTransport(map[string]time.Duration{
"quic://fast.example.test:19443": 0,
})
transport.failConnect["quic://dead.example.test:19443"] = true
runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{
RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30},
StreamID: 9,
RouteHealthTTL: time.Minute,
})
routeSet := FabricRouteSet{
TargetKind: FabricChannelTargetNode,
TargetID: "node-b",
Primary: testRuntimeRoute("route-dead", "node-b", "quic://dead.example.test:19443", 10),
WarmStandby: []FabricRoute{
testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20),
},
}
first, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("first")})
if err != nil {
t.Fatalf("first send reliable: %v", err)
}
if first.Channel.RouteID != "route-fast" || first.RouteHealth.Quarantined["route-dead"].Failures != 1 {
t.Fatalf("first result = %+v", first)
}
second, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("second")})
if err != nil {
t.Fatalf("second send reliable: %v", err)
}
if second.Channel.RouteID != "route-fast" {
t.Fatalf("second route = %s, want route-fast", second.Channel.RouteID)
}
if got := transport.connectCount("quic://dead.example.test:19443"); got != 1 {
t.Fatalf("dead connect count = %d, want one attempt before quarantine", got)
}
if got := transport.connectCount("quic://fast.example.test:19443"); got != 2 {
t.Fatalf("fast connect count = %d, want both channels on healthy route", got)
}
}
func TestFabricChannelRuntimeReroutesOnAckTimeout(t *testing.T) {
transport := newFakeFabricRuntimeTransport(map[string]time.Duration{
"quic://slow.example.test:19443": 100 * time.Millisecond,
"quic://fast.example.test:19443": 0,
})
runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{
RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30},
StreamID: 9,
Timeout: 10 * time.Millisecond,
})
routeSet := FabricRouteSet{
TargetKind: FabricChannelTargetNode,
TargetID: "node-b",
Primary: testRuntimeRoute("route-slow", "node-b", "quic://slow.example.test:19443", 10),
WarmStandby: []FabricRoute{
testRuntimeRoute("route-fast", "node-b", "quic://fast.example.test:19443", 20),
},
}
result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("payload")})
if err != nil {
t.Fatalf("send reliable: %v", err)
}
if result.MigrationEvents != 1 || result.Channel.RouteID != "route-fast" || result.BytesSent != uint64(len("payload")) {
t.Fatalf("result = %+v", result)
}
}
func TestFabricChannelRuntimeSpreadsConcurrentChannelsBySharedPressure(t *testing.T) {
transport := newFakeFabricRuntimeTransport(map[string]time.Duration{
"quic://route-a.example.test:19443": 80 * time.Millisecond,
"quic://route-b.example.test:19443": 0,
})
runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{StreamID: 9})
routeSet := FabricRouteSet{
TargetKind: FabricChannelTargetNode,
TargetID: "node-b",
Primary: testRuntimeRoute("route-a", "node-b", "quic://route-a.example.test:19443", 10),
WarmStandby: []FabricRoute{
testRuntimeRoute("route-b", "node-b", "quic://route-b.example.test:19443", 11),
},
}
firstDone := make(chan error, 1)
go func() {
_, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("one")})
firstDone <- err
}()
transport.waitForConnect(t, "quic://route-a.example.test:19443", 1)
result, err := runtime.SendReliable(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-b"), routeSet, [][]byte{[]byte("two")})
if err != nil {
t.Fatalf("second send reliable: %v", err)
}
if result.Channel.RouteID != "route-b" {
t.Fatalf("second route = %s, want route-b", result.Channel.RouteID)
}
if got := transport.connectCount("quic://route-b.example.test:19443"); got != 1 {
t.Fatalf("route-b connect count = %d, want 1", got)
}
if err := <-firstDone; err != nil {
t.Fatalf("first send reliable: %v", err)
}
}
func TestFabricChannelRuntimeRequestResponseReturnsPayload(t *testing.T) {
transport := newFakeFabricRequestResponseTransport(map[string][]byte{
"quic://runtime.example.test:19443": []byte(`{"status":"ok"}`),
})
runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{
RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30},
StreamID: 9,
})
routeSet := FabricRouteSet{
TargetKind: FabricChannelTargetPool,
TargetID: "pool-admin-runtime",
Primary: testRuntimePoolRoute("route-runtime", "pool-admin-runtime", "node-runtime", "quic://runtime.example.test:19443", 10),
}
result, err := runtime.SendRequestResponse(context.Background(), FabricChannelSpec{
ChannelID: "channel-web-1",
ClusterID: "cluster-1",
SourceNodeID: "node-a",
TargetKind: FabricChannelTargetPool,
TargetID: "pool-admin-runtime",
TrafficClass: "control",
CreatedAt: time.Now().UTC(),
}, routeSet, []byte(`{"request":true}`))
if err != nil {
t.Fatalf("request response: %v", err)
}
if string(result.ResponsePayload) != `{"status":"ok"}` {
t.Fatalf("response payload = %s", string(result.ResponsePayload))
}
if result.Channel.RouteID != "route-runtime" ||
result.BytesSent != uint64(len(`{"request":true}`)) ||
result.BytesRecv != uint64(len(`{"status":"ok"}`)) ||
result.FramesSent != 1 ||
result.FramesRecv != 1 ||
result.AcksReceived != 1 {
t.Fatalf("result = %+v", result)
}
}
func TestFabricChannelRuntimeRequestResponseReroutesOnResponseFailure(t *testing.T) {
transport := newFakeFabricRequestResponseTransport(map[string][]byte{
"quic://fast.example.test:19443": []byte(`{"status":"ok"}`),
})
transport.failResponse["quic://slow.example.test:19443"] = true
runtime := NewFabricChannelRuntime(transport, FabricChannelRuntimeConfig{
RouterConfig: FabricChannelRouterConfig{MaxAckLatencyMs: 30},
StreamID: 9,
Timeout: 10 * time.Millisecond,
})
routeSet := FabricRouteSet{
TargetKind: FabricChannelTargetNode,
TargetID: "node-runtime",
Primary: testRuntimeRoute("route-slow", "node-runtime", "quic://slow.example.test:19443", 10),
WarmStandby: []FabricRoute{
testRuntimeRoute("route-fast", "node-runtime", "quic://fast.example.test:19443", 20),
},
}
result, err := runtime.SendRequestResponse(context.Background(), testFabricChannelSpec(FabricChannelTargetNode, "node-runtime"), routeSet, []byte(`{"request":true}`))
if err != nil {
t.Fatalf("request response: %v", err)
}
if result.MigrationEvents != 1 || result.Channel.RouteID != "route-fast" || string(result.ResponsePayload) != `{"status":"ok"}` {
t.Fatalf("result = %+v", result)
}
}
func TestFabricTransportTargetForRouteUsesLastAddressedHop(t *testing.T) {
target, err := FabricTransportTargetForRoute(FabricRoute{
RouteID: "route-1",
Hops: []FabricRouteHop{
{NodeID: "node-a"},
{NodeID: "node-r", Mode: FabricRouteRelay, EndpointID: "relay-1", Address: "quic://relay.example.test:19443"},
{NodeID: "node-b", Mode: FabricRouteDirect, EndpointID: "node-b-quic", Address: "quic://node-b.example.test:19443"},
},
})
if err != nil {
t.Fatalf("target for route: %v", err)
}
if target.PeerID != "node-b" || target.EndpointID != "node-b-quic" || target.Endpoint != "quic://node-b.example.test:19443" || target.Transport != string(FabricRouteDirect) {
t.Fatalf("target = %+v", target)
}
}
type fakeFabricRequestResponseTransport struct {
mu sync.Mutex
responses map[string][]byte
failResponse map[string]bool
connects map[string]int
}
func newFakeFabricRequestResponseTransport(responses map[string][]byte) *fakeFabricRequestResponseTransport {
return &fakeFabricRequestResponseTransport{
responses: responses,
failResponse: map[string]bool{},
connects: map[string]int{},
}
}
func (t *fakeFabricRequestResponseTransport) Connect(_ context.Context, target FabricTransportTarget) (FabricTransportSession, error) {
endpoint := target.Endpoint
t.mu.Lock()
t.connects[endpoint]++
response := append([]byte(nil), t.responses[endpoint]...)
failResponse := t.failResponse[endpoint]
t.mu.Unlock()
return &fakeFabricRequestResponseSession{
response: response,
failResponse: failResponse,
frames: make(chan fabricproto.Frame, 16),
errors: make(chan error, 1),
done: make(chan struct{}),
}, nil
}
func (t *fakeFabricRequestResponseTransport) Close() error {
return nil
}
type fakeFabricRequestResponseSession struct {
response []byte
failResponse bool
frames chan fabricproto.Frame
errors chan error
done chan struct{}
once sync.Once
}
func (s *fakeFabricRequestResponseSession) Send(_ context.Context, frame fabricproto.Frame) error {
if frame.Type != fabricproto.FrameData || s.failResponse {
return nil
}
response := append([]byte(nil), s.response...)
go func() {
select {
case <-s.done:
case s.frames <- fabricproto.Frame{Type: fabricproto.FrameData, TrafficClass: frame.TrafficClass, StreamID: frame.StreamID, Sequence: frame.Sequence, Payload: response}:
}
}()
return nil
}
func (s *fakeFabricRequestResponseSession) Frames() <-chan fabricproto.Frame {
return s.frames
}
func (s *fakeFabricRequestResponseSession) Errors() <-chan error {
return s.errors
}
func (s *fakeFabricRequestResponseSession) Close() error {
s.once.Do(func() {
close(s.done)
})
return nil
}
func (s *fakeFabricRequestResponseSession) Closed() bool {
select {
case <-s.done:
return true
default:
return false
}
}
func TestFabricTransportTargetForRouteUsesRelayHopForRelayRoute(t *testing.T) {
target, err := FabricTransportTargetForRoute(FabricRoute{
RouteID: "route-relay",
RelayCount: 1,
Hops: []FabricRouteHop{
{NodeID: "node-a"},
{NodeID: "node-r", Mode: FabricRouteRelay, EndpointID: "relay-1", Address: "quic://relay.example.test:19443", PeerCertSHA256: "relay-cert"},
{NodeID: "node-b", Mode: FabricRouteRelay, EndpointID: "node-b-private", Address: "quic://10.0.0.2:19443", PeerCertSHA256: "node-b-cert"},
},
})
if err != nil {
t.Fatalf("target for relay route: %v", err)
}
if target.PeerID != "node-r" || target.EndpointID != "relay-1" || target.Endpoint != "quic://relay.example.test:19443" || target.PeerCertSHA256 != "relay-cert" {
t.Fatalf("target = %+v", target)
}
}
type fakeFabricRuntimeTransport struct {
mu sync.Mutex
delays map[string]time.Duration
failConnect map[string]bool
connects map[string]int
}
func newFakeFabricRuntimeTransport(delays map[string]time.Duration) *fakeFabricRuntimeTransport {
return &fakeFabricRuntimeTransport{
delays: delays,
failConnect: map[string]bool{},
connects: map[string]int{},
}
}
func (t *fakeFabricRuntimeTransport) Connect(_ context.Context, target FabricTransportTarget) (FabricTransportSession, error) {
endpoint := target.Endpoint
t.mu.Lock()
t.connects[endpoint]++
fail := t.failConnect[endpoint]
delay := t.delays[endpoint]
t.mu.Unlock()
if fail {
return nil, ErrForwardPeerUnavailable
}
return &fakeFabricRuntimeSession{
endpoint: endpoint,
delay: delay,
frames: make(chan fabricproto.Frame, 64),
errors: make(chan error, 1),
done: make(chan struct{}),
}, nil
}
func (t *fakeFabricRuntimeTransport) Close() error {
return nil
}
func (t *fakeFabricRuntimeTransport) connectCount(endpoint string) int {
t.mu.Lock()
defer t.mu.Unlock()
return t.connects[endpoint]
}
func (t *fakeFabricRuntimeTransport) waitForConnect(tb testing.TB, endpoint string, count int) {
tb.Helper()
deadline := time.Now().Add(time.Second)
for {
t.mu.Lock()
got := t.connects[endpoint]
t.mu.Unlock()
if got >= count {
return
}
if time.Now().After(deadline) {
tb.Fatalf("timed out waiting for %s connect count %d, got %d", endpoint, count, got)
}
time.Sleep(time.Millisecond)
}
}
type fakeFabricRuntimeSession struct {
endpoint string
delay time.Duration
frames chan fabricproto.Frame
errors chan error
done chan struct{}
once sync.Once
}
func (s *fakeFabricRuntimeSession) Send(_ context.Context, frame fabricproto.Frame) error {
if frame.Type != fabricproto.FrameData {
return nil
}
delay := s.delay
go func() {
if delay > 0 {
time.Sleep(delay)
}
select {
case <-s.done:
case s.frames <- fabricproto.Frame{Type: fabricproto.FrameAck, TrafficClass: frame.TrafficClass, StreamID: frame.StreamID, Sequence: frame.Sequence}:
}
}()
return nil
}
func (s *fakeFabricRuntimeSession) Frames() <-chan fabricproto.Frame {
return s.frames
}
func (s *fakeFabricRuntimeSession) Errors() <-chan error {
return s.errors
}
func (s *fakeFabricRuntimeSession) Close() error {
s.once.Do(func() {
close(s.done)
})
return nil
}
func (s *fakeFabricRuntimeSession) Closed() bool {
select {
case <-s.done:
return true
default:
return false
}
}
func testRuntimeRoute(routeID string, destination string, endpoint string, latency int) FabricRoute {
route := testFabricRoute(routeID, destination, latency, 100, 0, true)
route.Hops[len(route.Hops)-1].Address = endpoint
route.Hops[len(route.Hops)-1].EndpointID = strings.TrimPrefix(routeID, "route-")
route.Hops[len(route.Hops)-1].Mode = FabricRouteDirect
return route
}
func testRuntimePoolRoute(routeID string, poolID string, destination string, endpoint string, latency int) FabricRoute {
route := testRuntimeRoute(routeID, destination, endpoint, latency)
route.PoolID = poolID
return route
}