From 130ff117f3c1e17336ac1c91a3884af47512c038 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 10:19:16 +0300 Subject: [PATCH] Add QUIC fabric transport adapter --- agents/rap-node-agent/go.mod | 2 + agents/rap-node-agent/go.sum | 4 + .../internal/mesh/fabric_quic_transport.go | 189 ++++++++++++++++++ .../mesh/fabric_quic_transport_test.go | 175 ++++++++++++++++ .../internal/mesh/fabric_transport.go | 2 + .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 5 + 6 files changed, 377 insertions(+) create mode 100644 agents/rap-node-agent/internal/mesh/fabric_quic_transport.go create mode 100644 agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go diff --git a/agents/rap-node-agent/go.mod b/agents/rap-node-agent/go.mod index c63200c..ec8920e 100644 --- a/agents/rap-node-agent/go.mod +++ b/agents/rap-node-agent/go.mod @@ -6,6 +6,8 @@ require golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb require ( github.com/gorilla/websocket v1.5.3 // indirect + github.com/quic-go/quic-go v0.59.1 // indirect + golang.org/x/crypto v0.50.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/sys v0.43.0 // indirect golang.org/x/time v0.15.0 // indirect diff --git a/agents/rap-node-agent/go.sum b/agents/rap-node-agent/go.sum index 782c60c..2342f65 100644 --- a/agents/rap-node-agent/go.sum +++ b/agents/rap-node-agent/go.sum @@ -1,5 +1,9 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/quic-go/quic-go v0.59.1 h1:0Gmua0HW1Tv7ANR7hUYwRyD0MG5OJfgvYSZasGZzBic= +github.com/quic-go/quic-go v0.59.1/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go new file mode 100644 index 0000000..667cb3b --- /dev/null +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport.go @@ -0,0 +1,189 @@ +package mesh + +import ( + "context" + "crypto/tls" + "fmt" + "sync" + "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" + "github.com/quic-go/quic-go" +) + +const fabricQUICNextProto = "rap-fabric-data-session-v1" + +type QUICFabricTransport struct { + Config *quic.Config +} + +type quicFabricSession struct { + conn *quic.Conn + stream *quic.Stream + inbound chan fabricproto.Frame + errors chan error + done chan struct{} + closeOnce sync.Once + writeMu sync.Mutex + maxPayload int + timeout time.Duration +} + +func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport { + return &QUICFabricTransport{Config: config} +} + +func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) { + if target.Endpoint == "" { + return nil, fmt.Errorf("quic fabric endpoint is required") + } + tlsConfig := target.TLSConfig + if tlsConfig == nil { + tlsConfig = &tls.Config{NextProtos: []string{fabricQUICNextProto}} + } else { + tlsConfig = tlsConfig.Clone() + if len(tlsConfig.NextProtos) == 0 { + tlsConfig.NextProtos = []string{fabricQUICNextProto} + } + } + conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) + if err != nil { + return nil, err + } + stream, err := conn.OpenStreamSync(ctx) + if err != nil { + _ = conn.CloseWithError(1, "open stream failed") + return nil, err + } + maxPayload := target.MaxPayload + if maxPayload <= 0 { + maxPayload = fabricproto.DefaultMaxPayload + } + inboundBuffer := target.InboundBuffer + if inboundBuffer <= 0 { + inboundBuffer = 64 + } + errorBuffer := target.ErrorBuffer + if errorBuffer <= 0 { + errorBuffer = 8 + } + session := &quicFabricSession{ + conn: conn, + stream: stream, + inbound: make(chan fabricproto.Frame, inboundBuffer), + errors: make(chan error, errorBuffer), + done: make(chan struct{}), + maxPayload: maxPayload, + timeout: target.Timeout, + } + go session.readLoop(ctx) + return session, nil +} + +func (t *QUICFabricTransport) Close() error { + return nil +} + +func (s *quicFabricSession) Send(ctx context.Context, frame fabricproto.Frame) error { + if s == nil || s.stream == nil { + return fmt.Errorf("quic fabric session is closed") + } + select { + case <-s.done: + return fmt.Errorf("quic fabric session is closed") + default: + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + s.applyWriteDeadline(ctx) + return fabricproto.WriteFrame(s.stream, frame) +} + +func (s *quicFabricSession) Frames() <-chan fabricproto.Frame { + if s == nil { + return nil + } + return s.inbound +} + +func (s *quicFabricSession) Errors() <-chan error { + if s == nil { + return nil + } + return s.errors +} + +func (s *quicFabricSession) Close() error { + if s == nil { + return nil + } + var err error + s.closeOnce.Do(func() { + close(s.done) + if s.stream != nil { + err = s.stream.Close() + } + if s.conn != nil { + _ = s.conn.CloseWithError(0, "closed") + } + }) + return err +} + +func (s *quicFabricSession) Closed() bool { + if s == nil { + return true + } + select { + case <-s.done: + return true + default: + return false + } +} + +func (s *quicFabricSession) readLoop(ctx context.Context) { + defer s.Close() + for { + s.applyReadDeadline(ctx) + frame, err := fabricproto.ReadFrame(s.stream, s.maxPayload) + if err != nil { + s.reportError(err) + return + } + select { + case <-ctx.Done(): + s.reportError(ctx.Err()) + return + case <-s.done: + return + case s.inbound <- frame: + } + } +} + +func (s *quicFabricSession) reportError(err error) { + if err == nil { + return + } + select { + case s.errors <- err: + default: + } +} + +func (s *quicFabricSession) applyReadDeadline(ctx context.Context) { + if deadline, ok := ctx.Deadline(); ok { + _ = s.stream.SetReadDeadline(deadline) + } else if s.timeout > 0 { + _ = s.stream.SetReadDeadline(time.Now().Add(s.timeout)) + } +} + +func (s *quicFabricSession) applyWriteDeadline(ctx context.Context) { + if deadline, ok := ctx.Deadline(); ok { + _ = s.stream.SetWriteDeadline(deadline) + } else if s.timeout > 0 { + _ = s.stream.SetWriteDeadline(time.Now().Add(s.timeout)) + } +} diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go new file mode 100644 index 0000000..17be260 --- /dev/null +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go @@ -0,0 +1,175 @@ +package mesh + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "testing" + "time" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" + "github.com/quic-go/quic-go" +) + +func TestQUICFabricTransportPingPong(t *testing.T) { + listener := startQUICFabricEchoServer(t) + defer listener.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + transport := NewQUICFabricTransport(&quic.Config{EnableDatagrams: true}) + session, err := transport.Connect(ctx, FabricTransportTarget{ + Endpoint: listener.Addr().String(), + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{fabricQUICNextProto}, + }, + Timeout: time.Second, + InboundBuffer: 4, + ErrorBuffer: 4, + }) + if err != nil { + t.Fatalf("connect quic fabric: %v", err) + } + defer session.Close() + + if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 42, Payload: []byte("quic")}); err != nil { + t.Fatalf("send ping: %v", err) + } + select { + case frame := <-session.Frames(): + if frame.Type != fabricproto.FramePong || frame.Sequence != 42 || string(frame.Payload) != "quic" { + t.Fatalf("frame = %+v", frame) + } + case err := <-session.Errors(): + t.Fatalf("session error: %v", err) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} + +func TestQUICFabricTransportDataAck(t *testing.T) { + listener := startQUICFabricEchoServer(t) + defer listener.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + session, err := NewQUICFabricTransport(nil).Connect(ctx, FabricTransportTarget{ + Endpoint: listener.Addr().String(), + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{fabricQUICNextProto}, + }, + Timeout: time.Second, + InboundBuffer: 4, + ErrorBuffer: 4, + }) + if err != nil { + t.Fatalf("connect quic fabric: %v", err) + } + defer session.Close() + + if err := session.Send(ctx, fabricproto.Frame{ + Type: fabricproto.FrameOpenStream, + StreamID: 9, + TrafficClass: fabricproto.TrafficClassInteractive, + }); err != nil { + t.Fatalf("open stream: %v", err) + } + if err := session.Send(ctx, fabricproto.Frame{ + Type: fabricproto.FrameData, + StreamID: 9, + Sequence: 7, + TrafficClass: fabricproto.TrafficClassInteractive, + Payload: []byte("packet"), + }); err != nil { + t.Fatalf("send data: %v", err) + } + select { + case frame := <-session.Frames(): + if frame.Type != fabricproto.FrameAck || frame.StreamID != 9 || frame.Sequence != 7 { + t.Fatalf("frame = %+v", frame) + } + case err := <-session.Errors(): + t.Fatalf("session error: %v", err) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} + +func startQUICFabricEchoServer(t *testing.T) *quic.Listener { + t.Helper() + listener, err := quic.ListenAddr("127.0.0.1:0", testQUICTLSConfig(t), &quic.Config{EnableDatagrams: true}) + if err != nil { + t.Fatalf("listen quic: %v", err) + } + go func() { + conn, err := listener.Accept(context.Background()) + if err != nil { + return + } + stream, err := conn.AcceptStream(context.Background()) + if err != nil { + _ = conn.CloseWithError(1, "accept stream failed") + return + } + session := fabricproto.NewSession(fabricproto.SessionConfig{}) + for { + frame, err := fabricproto.ReadFrame(stream, fabricproto.DefaultMaxPayload) + if err != nil { + _ = conn.CloseWithError(0, "closed") + return + } + _, responses, err := session.HandleFrame(frame) + if err != nil { + _ = conn.CloseWithError(2, err.Error()) + return + } + for _, response := range responses { + if err := fabricproto.WriteFrame(stream, response); err != nil { + _ = conn.CloseWithError(3, err.Error()) + return + } + } + } + }() + return listener +} + +func testQUICTLSConfig(t *testing.T) *tls.Config { + t.Helper() + key, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + t.Fatalf("generate key: %v", err) + } + template := x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: "rap-fabric-test"}, + NotBefore: time.Now().Add(-time.Minute), + NotAfter: time.Now().Add(time.Hour), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + DNSNames: []string{"localhost"}, + } + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) + if err != nil { + t.Fatalf("create certificate: %v", err) + } + keyDER := x509.MarshalPKCS1PrivateKey(key) + cert, err := tls.X509KeyPair( + pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}), + pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: keyDER}), + ) + if err != nil { + t.Fatalf("key pair: %v", err) + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{fabricQUICNextProto}, + } +} diff --git a/agents/rap-node-agent/internal/mesh/fabric_transport.go b/agents/rap-node-agent/internal/mesh/fabric_transport.go index 71f577b..0067b1b 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_transport.go +++ b/agents/rap-node-agent/internal/mesh/fabric_transport.go @@ -2,6 +2,7 @@ package mesh import ( "context" + "crypto/tls" "net/http" "time" @@ -26,6 +27,7 @@ type FabricTransportTarget struct { Endpoint string Token string Header http.Header + TLSConfig *tls.Config Timeout time.Duration MaxPayload int OutboundBuffer int diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index bc11aa8..23d140f 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -294,6 +294,9 @@ reuses, opens, closed-pump evictions, and explicit close operations. The mesh package now exposes a service-neutral `FabricTransport` abstraction; the current WebSocket carrier implements it as `WebSocketFabricTransport`, so future QUIC/UDP transport can be added without changing VPN/RDP/HTTP services. +`QUICFabricTransport` now implements the same interface and carries the same +binary `fabricproto` frames over a QUIC stream, with local smoke coverage for +`PING`/`PONG` and DATA/ACK. Deliverables: @@ -323,6 +326,8 @@ Deliverables: ### Stage FNP-6: QUIC/UDP Transport +Status: started with `QUICFabricTransport` in `internal/mesh`. + Deliverables: - implement QUIC transport for Fabric Data Session V1;