From f84b088580f8810de1d67488606e1d7dd935ea53 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Sat, 16 May 2026 10:24:37 +0300 Subject: [PATCH] Add QUIC fabric listener --- .../internal/mesh/fabric_quic_server.go | 147 ++++++++++++++++++ .../mesh/fabric_quic_transport_test.go | 48 ++++++ .../DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md | 2 + 3 files changed, 197 insertions(+) create mode 100644 agents/rap-node-agent/internal/mesh/fabric_quic_server.go diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_server.go b/agents/rap-node-agent/internal/mesh/fabric_quic_server.go new file mode 100644 index 0000000..36d75bf --- /dev/null +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_server.go @@ -0,0 +1,147 @@ +package mesh + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "sync" + + "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" + "github.com/quic-go/quic-go" +) + +type QUICFabricServer struct { + listener *quic.Listener + logger FabricSessionEventLogger + done chan struct{} + closeOnce sync.Once +} + +type QUICFabricServerConfig struct { + ListenAddr string + TLSConfig *tls.Config + QUICConfig *quic.Config + Logger FabricSessionEventLogger +} + +func StartQUICFabricServer(ctx context.Context, cfg QUICFabricServerConfig) (*QUICFabricServer, error) { + if cfg.ListenAddr == "" { + return nil, fmt.Errorf("quic fabric listen addr is required") + } + tlsConfig := cfg.TLSConfig + if tlsConfig == nil { + return nil, fmt.Errorf("quic fabric tls config is required") + } + tlsConfig = tlsConfig.Clone() + if len(tlsConfig.NextProtos) == 0 { + tlsConfig.NextProtos = []string{fabricQUICNextProto} + } + listener, err := quic.ListenAddr(cfg.ListenAddr, tlsConfig, cfg.QUICConfig) + if err != nil { + return nil, err + } + server := &QUICFabricServer{ + listener: listener, + logger: cfg.Logger, + done: make(chan struct{}), + } + go server.acceptLoop(ctx) + return server, nil +} + +func (s *QUICFabricServer) Addr() net.Addr { + if s == nil || s.listener == nil { + return nil + } + return s.listener.Addr() +} + +func (s *QUICFabricServer) Close() error { + if s == nil { + return nil + } + var err error + s.closeOnce.Do(func() { + close(s.done) + if s.listener != nil { + err = s.listener.Close() + } + }) + return err +} + +func (s *QUICFabricServer) acceptLoop(ctx context.Context) { + defer s.Close() + for { + conn, err := s.listener.Accept(ctx) + if err != nil { + return + } + go s.handleConn(ctx, conn) + } +} + +func (s *QUICFabricServer) handleConn(ctx context.Context, conn *quic.Conn) { + for { + stream, err := conn.AcceptStream(ctx) + if err != nil { + _ = conn.CloseWithError(0, "accept stream stopped") + return + } + go s.handleStream(ctx, conn, stream) + } +} + +func (s *QUICFabricServer) handleStream(ctx context.Context, conn *quic.Conn, stream *quic.Stream) { + session := fabricproto.NewSession(fabricproto.SessionConfig{}) + s.logFabricSession(FabricSessionEventLogEntry{ + Event: "fabric_session_quic_stream_opened", + AcceptedBy: "quic", + RemoteAddr: conn.RemoteAddr().String(), + }) + defer s.logFabricSession(FabricSessionEventLogEntry{ + Event: "fabric_session_quic_stream_closed", + AcceptedBy: "quic", + RemoteAddr: conn.RemoteAddr().String(), + }) + for { + select { + case <-ctx.Done(): + _ = stream.Close() + return + default: + } + frame, err := fabricproto.ReadFrame(stream, fabricproto.DefaultMaxPayload) + if err != nil { + return + } + event, responses, err := session.HandleFrame(frame) + if err != nil { + _ = conn.CloseWithError(2, err.Error()) + return + } + if event.Type != fabricproto.SessionEventNone { + s.logFabricSession(FabricSessionEventLogEntry{ + Event: "fabric_session_event", + SessionEvent: event.Type, + StreamID: event.StreamID, + Sequence: event.Sequence, + TrafficClass: event.TrafficClass, + AcceptedBy: "quic", + RemoteAddr: conn.RemoteAddr().String(), + }) + } + for _, response := range responses { + if err := fabricproto.WriteFrame(stream, response); err != nil { + return + } + } + } +} + +func (s *QUICFabricServer) logFabricSession(entry FabricSessionEventLogEntry) { + if s != nil && s.logger != nil { + s.logger(entry) + } +} diff --git a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go index 17be260..7d40513 100644 --- a/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go +++ b/agents/rap-node-agent/internal/mesh/fabric_quic_transport_test.go @@ -102,6 +102,54 @@ func TestQUICFabricTransportDataAck(t *testing.T) { } } +func TestQUICFabricServerHandlesFabricFrames(t *testing.T) { + var events []FabricSessionEventLogEntry + server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{ + ListenAddr: "127.0.0.1:0", + TLSConfig: testQUICTLSConfig(t), + Logger: func(entry FabricSessionEventLogEntry) { + events = append(events, entry) + }, + }) + if err != nil { + t.Fatalf("start quic fabric server: %v", err) + } + defer server.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + session, err := NewQUICFabricTransport(nil).Connect(ctx, FabricTransportTarget{ + Endpoint: server.Addr().String(), + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{fabricQUICNextProto}, + }, + Timeout: time.Second, + InboundBuffer: 4, + ErrorBuffer: 4, + }) + if err != nil { + t.Fatalf("connect quic fabric: %v", err) + } + defer session.Close() + if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 77, Payload: []byte("server")}); err != nil { + t.Fatalf("send ping: %v", err) + } + select { + case frame := <-session.Frames(): + if frame.Type != fabricproto.FramePong || frame.Sequence != 77 || string(frame.Payload) != "server" { + t.Fatalf("frame = %+v", frame) + } + case err := <-session.Errors(): + t.Fatalf("session error: %v", err) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + if len(events) < 2 || events[0].Event != "fabric_session_quic_stream_opened" { + t.Fatalf("events = %+v", events) + } +} + func startQUICFabricEchoServer(t *testing.T) *quic.Listener { t.Helper() listener, err := quic.ListenAddr("127.0.0.1:0", testQUICTLSConfig(t), &quic.Config{EnableDatagrams: true}) diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index 30f3118..f10f5a5 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -299,6 +299,8 @@ binary `fabricproto` frames over a QUIC stream, with local smoke coverage for `PING`/`PONG` and DATA/ACK. Carrier selection understands QUIC transport labels and `quic://host:port` endpoints while preserving WebSocket as the default fallback. +`QUICFabricServer` provides the matching node-side QUIC listener for accepting +fabric streams and running the same session frame handler as other carriers. Deliverables: