Add QUIC fabric transport adapter

This commit is contained in:
2026-05-16 10:19:16 +03:00
parent ba3522d966
commit 130ff117f3
6 changed files with 377 additions and 0 deletions
@@ -0,0 +1,189 @@
package mesh
import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
"github.com/quic-go/quic-go"
)
const fabricQUICNextProto = "rap-fabric-data-session-v1"
type QUICFabricTransport struct {
Config *quic.Config
}
type quicFabricSession struct {
conn *quic.Conn
stream *quic.Stream
inbound chan fabricproto.Frame
errors chan error
done chan struct{}
closeOnce sync.Once
writeMu sync.Mutex
maxPayload int
timeout time.Duration
}
func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport {
return &QUICFabricTransport{Config: config}
}
func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) {
if target.Endpoint == "" {
return nil, fmt.Errorf("quic fabric endpoint is required")
}
tlsConfig := target.TLSConfig
if tlsConfig == nil {
tlsConfig = &tls.Config{NextProtos: []string{fabricQUICNextProto}}
} else {
tlsConfig = tlsConfig.Clone()
if len(tlsConfig.NextProtos) == 0 {
tlsConfig.NextProtos = []string{fabricQUICNextProto}
}
}
conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config)
if err != nil {
return nil, err
}
stream, err := conn.OpenStreamSync(ctx)
if err != nil {
_ = conn.CloseWithError(1, "open stream failed")
return nil, err
}
maxPayload := target.MaxPayload
if maxPayload <= 0 {
maxPayload = fabricproto.DefaultMaxPayload
}
inboundBuffer := target.InboundBuffer
if inboundBuffer <= 0 {
inboundBuffer = 64
}
errorBuffer := target.ErrorBuffer
if errorBuffer <= 0 {
errorBuffer = 8
}
session := &quicFabricSession{
conn: conn,
stream: stream,
inbound: make(chan fabricproto.Frame, inboundBuffer),
errors: make(chan error, errorBuffer),
done: make(chan struct{}),
maxPayload: maxPayload,
timeout: target.Timeout,
}
go session.readLoop(ctx)
return session, nil
}
func (t *QUICFabricTransport) Close() error {
return nil
}
func (s *quicFabricSession) Send(ctx context.Context, frame fabricproto.Frame) error {
if s == nil || s.stream == nil {
return fmt.Errorf("quic fabric session is closed")
}
select {
case <-s.done:
return fmt.Errorf("quic fabric session is closed")
default:
}
s.writeMu.Lock()
defer s.writeMu.Unlock()
s.applyWriteDeadline(ctx)
return fabricproto.WriteFrame(s.stream, frame)
}
func (s *quicFabricSession) Frames() <-chan fabricproto.Frame {
if s == nil {
return nil
}
return s.inbound
}
func (s *quicFabricSession) Errors() <-chan error {
if s == nil {
return nil
}
return s.errors
}
func (s *quicFabricSession) Close() error {
if s == nil {
return nil
}
var err error
s.closeOnce.Do(func() {
close(s.done)
if s.stream != nil {
err = s.stream.Close()
}
if s.conn != nil {
_ = s.conn.CloseWithError(0, "closed")
}
})
return err
}
func (s *quicFabricSession) Closed() bool {
if s == nil {
return true
}
select {
case <-s.done:
return true
default:
return false
}
}
func (s *quicFabricSession) readLoop(ctx context.Context) {
defer s.Close()
for {
s.applyReadDeadline(ctx)
frame, err := fabricproto.ReadFrame(s.stream, s.maxPayload)
if err != nil {
s.reportError(err)
return
}
select {
case <-ctx.Done():
s.reportError(ctx.Err())
return
case <-s.done:
return
case s.inbound <- frame:
}
}
}
func (s *quicFabricSession) reportError(err error) {
if err == nil {
return
}
select {
case s.errors <- err:
default:
}
}
func (s *quicFabricSession) applyReadDeadline(ctx context.Context) {
if deadline, ok := ctx.Deadline(); ok {
_ = s.stream.SetReadDeadline(deadline)
} else if s.timeout > 0 {
_ = s.stream.SetReadDeadline(time.Now().Add(s.timeout))
}
}
func (s *quicFabricSession) applyWriteDeadline(ctx context.Context) {
if deadline, ok := ctx.Deadline(); ok {
_ = s.stream.SetWriteDeadline(deadline)
} else if s.timeout > 0 {
_ = s.stream.SetWriteDeadline(time.Now().Add(s.timeout))
}
}
@@ -0,0 +1,175 @@
package mesh
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"testing"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
"github.com/quic-go/quic-go"
)
func TestQUICFabricTransportPingPong(t *testing.T) {
listener := startQUICFabricEchoServer(t)
defer listener.Close()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
transport := NewQUICFabricTransport(&quic.Config{EnableDatagrams: true})
session, err := transport.Connect(ctx, FabricTransportTarget{
Endpoint: listener.Addr().String(),
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{fabricQUICNextProto},
},
Timeout: time.Second,
InboundBuffer: 4,
ErrorBuffer: 4,
})
if err != nil {
t.Fatalf("connect quic fabric: %v", err)
}
defer session.Close()
if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 42, Payload: []byte("quic")}); err != nil {
t.Fatalf("send ping: %v", err)
}
select {
case frame := <-session.Frames():
if frame.Type != fabricproto.FramePong || frame.Sequence != 42 || string(frame.Payload) != "quic" {
t.Fatalf("frame = %+v", frame)
}
case err := <-session.Errors():
t.Fatalf("session error: %v", err)
case <-ctx.Done():
t.Fatal(ctx.Err())
}
}
func TestQUICFabricTransportDataAck(t *testing.T) {
listener := startQUICFabricEchoServer(t)
defer listener.Close()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
session, err := NewQUICFabricTransport(nil).Connect(ctx, FabricTransportTarget{
Endpoint: listener.Addr().String(),
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{fabricQUICNextProto},
},
Timeout: time.Second,
InboundBuffer: 4,
ErrorBuffer: 4,
})
if err != nil {
t.Fatalf("connect quic fabric: %v", err)
}
defer session.Close()
if err := session.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameOpenStream,
StreamID: 9,
TrafficClass: fabricproto.TrafficClassInteractive,
}); err != nil {
t.Fatalf("open stream: %v", err)
}
if err := session.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameData,
StreamID: 9,
Sequence: 7,
TrafficClass: fabricproto.TrafficClassInteractive,
Payload: []byte("packet"),
}); err != nil {
t.Fatalf("send data: %v", err)
}
select {
case frame := <-session.Frames():
if frame.Type != fabricproto.FrameAck || frame.StreamID != 9 || frame.Sequence != 7 {
t.Fatalf("frame = %+v", frame)
}
case err := <-session.Errors():
t.Fatalf("session error: %v", err)
case <-ctx.Done():
t.Fatal(ctx.Err())
}
}
func startQUICFabricEchoServer(t *testing.T) *quic.Listener {
t.Helper()
listener, err := quic.ListenAddr("127.0.0.1:0", testQUICTLSConfig(t), &quic.Config{EnableDatagrams: true})
if err != nil {
t.Fatalf("listen quic: %v", err)
}
go func() {
conn, err := listener.Accept(context.Background())
if err != nil {
return
}
stream, err := conn.AcceptStream(context.Background())
if err != nil {
_ = conn.CloseWithError(1, "accept stream failed")
return
}
session := fabricproto.NewSession(fabricproto.SessionConfig{})
for {
frame, err := fabricproto.ReadFrame(stream, fabricproto.DefaultMaxPayload)
if err != nil {
_ = conn.CloseWithError(0, "closed")
return
}
_, responses, err := session.HandleFrame(frame)
if err != nil {
_ = conn.CloseWithError(2, err.Error())
return
}
for _, response := range responses {
if err := fabricproto.WriteFrame(stream, response); err != nil {
_ = conn.CloseWithError(3, err.Error())
return
}
}
}
}()
return listener
}
func testQUICTLSConfig(t *testing.T) *tls.Config {
t.Helper()
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatalf("generate key: %v", err)
}
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "rap-fabric-test"},
NotBefore: time.Now().Add(-time.Minute),
NotAfter: time.Now().Add(time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
DNSNames: []string{"localhost"},
}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
t.Fatalf("create certificate: %v", err)
}
keyDER := x509.MarshalPKCS1PrivateKey(key)
cert, err := tls.X509KeyPair(
pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}),
pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: keyDER}),
)
if err != nil {
t.Fatalf("key pair: %v", err)
}
return &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: []string{fabricQUICNextProto},
}
}
@@ -2,6 +2,7 @@ package mesh
import (
"context"
"crypto/tls"
"net/http"
"time"
@@ -26,6 +27,7 @@ type FabricTransportTarget struct {
Endpoint string
Token string
Header http.Header
TLSConfig *tls.Config
Timeout time.Duration
MaxPayload int
OutboundBuffer int