224 lines
6.5 KiB
Go
224 lines
6.5 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
|
)
|
|
|
|
func TestQUICSyntheticTransportReroutesOnConnectFailure(t *testing.T) {
|
|
transport := newFakeSyntheticFabricTransport()
|
|
transport.failConnect["quic://dead.example.test:19443"] = true
|
|
transport.responses["quic://fast.example.test:19443"] = testSyntheticAckEnvelope("route-1", 1)
|
|
forward := NewQUICSyntheticTransportFromRouteSets(map[string]FabricRouteSet{
|
|
"node-b": FabricRouteSetForTransportTargets("cluster-a", "node-a", "node-b", []FabricTransportTarget{
|
|
{EndpointID: "dead", PeerID: "node-b", Endpoint: "quic://dead.example.test:19443", Transport: "quic"},
|
|
{EndpointID: "fast", PeerID: "node-b", Endpoint: "quic://fast.example.test:19443", Transport: "quic"},
|
|
}),
|
|
}, transport)
|
|
forward.Timeout = time.Second
|
|
|
|
ack, err := forward.SendSynthetic(context.Background(), "node-b", testSyntheticEnvelope("route-1", 1))
|
|
if err != nil {
|
|
t.Fatalf("send synthetic: %v", err)
|
|
}
|
|
if ack.RouteID != "route-1" || ack.MessageType != SyntheticMessageRouteHealthAck {
|
|
t.Fatalf("ack = %+v", ack)
|
|
}
|
|
if got := transport.connectCount("quic://dead.example.test:19443"); got != 1 {
|
|
t.Fatalf("dead connect count = %d, want 1", got)
|
|
}
|
|
if got := transport.connectCount("quic://fast.example.test:19443"); got != 1 {
|
|
t.Fatalf("fast connect count = %d, want 1", got)
|
|
}
|
|
}
|
|
|
|
func TestQUICFabricServerHandlesSyntheticFrames(t *testing.T) {
|
|
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
|
|
ListenAddr: "127.0.0.1:0",
|
|
TLSConfig: testQUICTLSConfig(t),
|
|
SyntheticForwardHandler: func(_ context.Context, envelope SyntheticEnvelope) (SyntheticEnvelope, error) {
|
|
return testSyntheticAckEnvelope(envelope.RouteID, envelope.Sequence), nil
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("start quic fabric server: %v", err)
|
|
}
|
|
defer server.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
session, err := NewQUICFabricTransport(nil).Connect(ctx, FabricTransportTarget{
|
|
Endpoint: server.Addr().String(),
|
|
TLSConfig: &tls.Config{
|
|
InsecureSkipVerify: true,
|
|
NextProtos: []string{fabricQUICNextProto},
|
|
},
|
|
Timeout: time.Second,
|
|
InboundBuffer: 4,
|
|
ErrorBuffer: 4,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("connect: %v", err)
|
|
}
|
|
defer session.Close()
|
|
|
|
payload, err := json.Marshal(testSyntheticEnvelope("route-1", 7))
|
|
if err != nil {
|
|
t.Fatalf("marshal envelope: %v", err)
|
|
}
|
|
if err := session.Send(ctx, fabricproto.Frame{
|
|
Type: fabricproto.FrameData,
|
|
TrafficClass: fabricproto.TrafficClassReliable,
|
|
StreamID: SyntheticForwardQUICStreamID,
|
|
Sequence: 42,
|
|
Payload: payload,
|
|
}); err != nil {
|
|
t.Fatalf("send synthetic frame: %v", err)
|
|
}
|
|
select {
|
|
case frame := <-session.Frames():
|
|
if frame.StreamID != SyntheticForwardQUICStreamID || frame.Sequence != 42 {
|
|
t.Fatalf("frame = %+v", frame)
|
|
}
|
|
ack, err := decodeQUICSyntheticForwardResponse(frame.Payload)
|
|
if err != nil {
|
|
t.Fatalf("decode response: %v", err)
|
|
}
|
|
if ack.RouteID != "route-1" || ack.MessageType != SyntheticMessageRouteHealthAck || ack.Sequence != 7 {
|
|
t.Fatalf("ack = %+v", ack)
|
|
}
|
|
case err := <-session.Errors():
|
|
t.Fatalf("session error: %v", err)
|
|
case <-ctx.Done():
|
|
t.Fatal(ctx.Err())
|
|
}
|
|
}
|
|
|
|
type fakeSyntheticFabricTransport struct {
|
|
mu sync.Mutex
|
|
failConnect map[string]bool
|
|
responses map[string]SyntheticEnvelope
|
|
connects map[string]int
|
|
}
|
|
|
|
func newFakeSyntheticFabricTransport() *fakeSyntheticFabricTransport {
|
|
return &fakeSyntheticFabricTransport{
|
|
failConnect: map[string]bool{},
|
|
responses: map[string]SyntheticEnvelope{},
|
|
connects: map[string]int{},
|
|
}
|
|
}
|
|
|
|
func (t *fakeSyntheticFabricTransport) Connect(_ context.Context, target FabricTransportTarget) (FabricTransportSession, error) {
|
|
endpoint := target.Endpoint
|
|
t.mu.Lock()
|
|
t.connects[endpoint]++
|
|
fail := t.failConnect[endpoint]
|
|
response := t.responses[endpoint]
|
|
t.mu.Unlock()
|
|
if fail {
|
|
return nil, ErrSyntheticPeerUnavailable
|
|
}
|
|
return &fakeSyntheticFabricSession{
|
|
response: response,
|
|
frames: make(chan fabricproto.Frame, 16),
|
|
errors: make(chan error, 1),
|
|
done: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
func (t *fakeSyntheticFabricTransport) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func (t *fakeSyntheticFabricTransport) connectCount(endpoint string) int {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.connects[endpoint]
|
|
}
|
|
|
|
type fakeSyntheticFabricSession struct {
|
|
response SyntheticEnvelope
|
|
frames chan fabricproto.Frame
|
|
errors chan error
|
|
done chan struct{}
|
|
once sync.Once
|
|
}
|
|
|
|
func (s *fakeSyntheticFabricSession) Send(_ context.Context, frame fabricproto.Frame) error {
|
|
if frame.Type != fabricproto.FrameData {
|
|
return nil
|
|
}
|
|
responsePayload, _ := json.Marshal(quicSyntheticForwardResponse{Envelope: s.response})
|
|
go func() {
|
|
select {
|
|
case <-s.done:
|
|
case s.frames <- fabricproto.Frame{
|
|
Type: fabricproto.FrameData,
|
|
TrafficClass: frame.TrafficClass,
|
|
StreamID: frame.StreamID,
|
|
Sequence: frame.Sequence,
|
|
Payload: responsePayload,
|
|
}:
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (s *fakeSyntheticFabricSession) Frames() <-chan fabricproto.Frame {
|
|
return s.frames
|
|
}
|
|
|
|
func (s *fakeSyntheticFabricSession) Errors() <-chan error {
|
|
return s.errors
|
|
}
|
|
|
|
func (s *fakeSyntheticFabricSession) Close() error {
|
|
s.once.Do(func() {
|
|
close(s.done)
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (s *fakeSyntheticFabricSession) Closed() bool {
|
|
select {
|
|
case <-s.done:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func testSyntheticEnvelope(routeID string, sequence uint64) SyntheticEnvelope {
|
|
now := time.Now().UTC()
|
|
return SyntheticEnvelope{
|
|
ProtocolVersion: ProtocolVersion,
|
|
RouteID: routeID,
|
|
ClusterID: "cluster-a",
|
|
From: PeerIdentity{ClusterID: "cluster-a", NodeID: "node-a"},
|
|
To: PeerIdentity{ClusterID: "cluster-a", NodeID: "node-b"},
|
|
Channel: SyntheticChannelFabricControl,
|
|
MessageType: SyntheticMessageRouteHealth,
|
|
TTL: 8,
|
|
HopCount: 1,
|
|
Visited: []string{"node-a"},
|
|
Sequence: sequence,
|
|
SentAt: now,
|
|
}
|
|
}
|
|
|
|
func testSyntheticAckEnvelope(routeID string, sequence uint64) SyntheticEnvelope {
|
|
ack := testSyntheticEnvelope(routeID, sequence)
|
|
ack.From = PeerIdentity{ClusterID: "cluster-a", NodeID: "node-b"}
|
|
ack.To = PeerIdentity{ClusterID: "cluster-a", NodeID: "node-a"}
|
|
ack.MessageType = SyntheticMessageRouteHealthAck
|
|
ack.Visited = []string{"node-a", "node-b"}
|
|
return ack
|
|
}
|