Add QUIC fabric listener
This commit is contained in:
@@ -0,0 +1,147 @@
|
||||
package mesh
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
||||
"github.com/quic-go/quic-go"
|
||||
)
|
||||
|
||||
type QUICFabricServer struct {
|
||||
listener *quic.Listener
|
||||
logger FabricSessionEventLogger
|
||||
done chan struct{}
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
type QUICFabricServerConfig struct {
|
||||
ListenAddr string
|
||||
TLSConfig *tls.Config
|
||||
QUICConfig *quic.Config
|
||||
Logger FabricSessionEventLogger
|
||||
}
|
||||
|
||||
func StartQUICFabricServer(ctx context.Context, cfg QUICFabricServerConfig) (*QUICFabricServer, error) {
|
||||
if cfg.ListenAddr == "" {
|
||||
return nil, fmt.Errorf("quic fabric listen addr is required")
|
||||
}
|
||||
tlsConfig := cfg.TLSConfig
|
||||
if tlsConfig == nil {
|
||||
return nil, fmt.Errorf("quic fabric tls config is required")
|
||||
}
|
||||
tlsConfig = tlsConfig.Clone()
|
||||
if len(tlsConfig.NextProtos) == 0 {
|
||||
tlsConfig.NextProtos = []string{fabricQUICNextProto}
|
||||
}
|
||||
listener, err := quic.ListenAddr(cfg.ListenAddr, tlsConfig, cfg.QUICConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
server := &QUICFabricServer{
|
||||
listener: listener,
|
||||
logger: cfg.Logger,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
go server.acceptLoop(ctx)
|
||||
return server, nil
|
||||
}
|
||||
|
||||
func (s *QUICFabricServer) Addr() net.Addr {
|
||||
if s == nil || s.listener == nil {
|
||||
return nil
|
||||
}
|
||||
return s.listener.Addr()
|
||||
}
|
||||
|
||||
func (s *QUICFabricServer) Close() error {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
s.closeOnce.Do(func() {
|
||||
close(s.done)
|
||||
if s.listener != nil {
|
||||
err = s.listener.Close()
|
||||
}
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *QUICFabricServer) acceptLoop(ctx context.Context) {
|
||||
defer s.Close()
|
||||
for {
|
||||
conn, err := s.listener.Accept(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go s.handleConn(ctx, conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *QUICFabricServer) handleConn(ctx context.Context, conn *quic.Conn) {
|
||||
for {
|
||||
stream, err := conn.AcceptStream(ctx)
|
||||
if err != nil {
|
||||
_ = conn.CloseWithError(0, "accept stream stopped")
|
||||
return
|
||||
}
|
||||
go s.handleStream(ctx, conn, stream)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *QUICFabricServer) handleStream(ctx context.Context, conn *quic.Conn, stream *quic.Stream) {
|
||||
session := fabricproto.NewSession(fabricproto.SessionConfig{})
|
||||
s.logFabricSession(FabricSessionEventLogEntry{
|
||||
Event: "fabric_session_quic_stream_opened",
|
||||
AcceptedBy: "quic",
|
||||
RemoteAddr: conn.RemoteAddr().String(),
|
||||
})
|
||||
defer s.logFabricSession(FabricSessionEventLogEntry{
|
||||
Event: "fabric_session_quic_stream_closed",
|
||||
AcceptedBy: "quic",
|
||||
RemoteAddr: conn.RemoteAddr().String(),
|
||||
})
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = stream.Close()
|
||||
return
|
||||
default:
|
||||
}
|
||||
frame, err := fabricproto.ReadFrame(stream, fabricproto.DefaultMaxPayload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
event, responses, err := session.HandleFrame(frame)
|
||||
if err != nil {
|
||||
_ = conn.CloseWithError(2, err.Error())
|
||||
return
|
||||
}
|
||||
if event.Type != fabricproto.SessionEventNone {
|
||||
s.logFabricSession(FabricSessionEventLogEntry{
|
||||
Event: "fabric_session_event",
|
||||
SessionEvent: event.Type,
|
||||
StreamID: event.StreamID,
|
||||
Sequence: event.Sequence,
|
||||
TrafficClass: event.TrafficClass,
|
||||
AcceptedBy: "quic",
|
||||
RemoteAddr: conn.RemoteAddr().String(),
|
||||
})
|
||||
}
|
||||
for _, response := range responses {
|
||||
if err := fabricproto.WriteFrame(stream, response); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *QUICFabricServer) logFabricSession(entry FabricSessionEventLogEntry) {
|
||||
if s != nil && s.logger != nil {
|
||||
s.logger(entry)
|
||||
}
|
||||
}
|
||||
@@ -102,6 +102,54 @@ func TestQUICFabricTransportDataAck(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestQUICFabricServerHandlesFabricFrames(t *testing.T) {
|
||||
var events []FabricSessionEventLogEntry
|
||||
server, err := StartQUICFabricServer(context.Background(), QUICFabricServerConfig{
|
||||
ListenAddr: "127.0.0.1:0",
|
||||
TLSConfig: testQUICTLSConfig(t),
|
||||
Logger: func(entry FabricSessionEventLogEntry) {
|
||||
events = append(events, entry)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("start quic fabric server: %v", err)
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
session, err := NewQUICFabricTransport(nil).Connect(ctx, FabricTransportTarget{
|
||||
Endpoint: server.Addr().String(),
|
||||
TLSConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
NextProtos: []string{fabricQUICNextProto},
|
||||
},
|
||||
Timeout: time.Second,
|
||||
InboundBuffer: 4,
|
||||
ErrorBuffer: 4,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("connect quic fabric: %v", err)
|
||||
}
|
||||
defer session.Close()
|
||||
if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 77, Payload: []byte("server")}); err != nil {
|
||||
t.Fatalf("send ping: %v", err)
|
||||
}
|
||||
select {
|
||||
case frame := <-session.Frames():
|
||||
if frame.Type != fabricproto.FramePong || frame.Sequence != 77 || string(frame.Payload) != "server" {
|
||||
t.Fatalf("frame = %+v", frame)
|
||||
}
|
||||
case err := <-session.Errors():
|
||||
t.Fatalf("session error: %v", err)
|
||||
case <-ctx.Done():
|
||||
t.Fatal(ctx.Err())
|
||||
}
|
||||
if len(events) < 2 || events[0].Event != "fabric_session_quic_stream_opened" {
|
||||
t.Fatalf("events = %+v", events)
|
||||
}
|
||||
}
|
||||
|
||||
func startQUICFabricEchoServer(t *testing.T) *quic.Listener {
|
||||
t.Helper()
|
||||
listener, err := quic.ListenAddr("127.0.0.1:0", testQUICTLSConfig(t), &quic.Config{EnableDatagrams: true})
|
||||
|
||||
Reference in New Issue
Block a user