148 lines
3.4 KiB
Go
148 lines
3.4 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
|
"github.com/quic-go/quic-go"
|
|
)
|
|
|
|
type QUICFabricServer struct {
|
|
listener *quic.Listener
|
|
logger FabricSessionEventLogger
|
|
done chan struct{}
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
type QUICFabricServerConfig struct {
|
|
ListenAddr string
|
|
TLSConfig *tls.Config
|
|
QUICConfig *quic.Config
|
|
Logger FabricSessionEventLogger
|
|
}
|
|
|
|
func StartQUICFabricServer(ctx context.Context, cfg QUICFabricServerConfig) (*QUICFabricServer, error) {
|
|
if cfg.ListenAddr == "" {
|
|
return nil, fmt.Errorf("quic fabric listen addr is required")
|
|
}
|
|
tlsConfig := cfg.TLSConfig
|
|
if tlsConfig == nil {
|
|
return nil, fmt.Errorf("quic fabric tls config is required")
|
|
}
|
|
tlsConfig = tlsConfig.Clone()
|
|
if len(tlsConfig.NextProtos) == 0 {
|
|
tlsConfig.NextProtos = []string{fabricQUICNextProto}
|
|
}
|
|
listener, err := quic.ListenAddr(cfg.ListenAddr, tlsConfig, cfg.QUICConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
server := &QUICFabricServer{
|
|
listener: listener,
|
|
logger: cfg.Logger,
|
|
done: make(chan struct{}),
|
|
}
|
|
go server.acceptLoop(ctx)
|
|
return server, nil
|
|
}
|
|
|
|
func (s *QUICFabricServer) Addr() net.Addr {
|
|
if s == nil || s.listener == nil {
|
|
return nil
|
|
}
|
|
return s.listener.Addr()
|
|
}
|
|
|
|
func (s *QUICFabricServer) Close() error {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
var err error
|
|
s.closeOnce.Do(func() {
|
|
close(s.done)
|
|
if s.listener != nil {
|
|
err = s.listener.Close()
|
|
}
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (s *QUICFabricServer) acceptLoop(ctx context.Context) {
|
|
defer s.Close()
|
|
for {
|
|
conn, err := s.listener.Accept(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
go s.handleConn(ctx, conn)
|
|
}
|
|
}
|
|
|
|
func (s *QUICFabricServer) handleConn(ctx context.Context, conn *quic.Conn) {
|
|
for {
|
|
stream, err := conn.AcceptStream(ctx)
|
|
if err != nil {
|
|
_ = conn.CloseWithError(0, "accept stream stopped")
|
|
return
|
|
}
|
|
go s.handleStream(ctx, conn, stream)
|
|
}
|
|
}
|
|
|
|
func (s *QUICFabricServer) handleStream(ctx context.Context, conn *quic.Conn, stream *quic.Stream) {
|
|
session := fabricproto.NewSession(fabricproto.SessionConfig{})
|
|
s.logFabricSession(FabricSessionEventLogEntry{
|
|
Event: "fabric_session_quic_stream_opened",
|
|
AcceptedBy: "quic",
|
|
RemoteAddr: conn.RemoteAddr().String(),
|
|
})
|
|
defer s.logFabricSession(FabricSessionEventLogEntry{
|
|
Event: "fabric_session_quic_stream_closed",
|
|
AcceptedBy: "quic",
|
|
RemoteAddr: conn.RemoteAddr().String(),
|
|
})
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
_ = stream.Close()
|
|
return
|
|
default:
|
|
}
|
|
frame, err := fabricproto.ReadFrame(stream, fabricproto.DefaultMaxPayload)
|
|
if err != nil {
|
|
return
|
|
}
|
|
event, responses, err := session.HandleFrame(frame)
|
|
if err != nil {
|
|
_ = conn.CloseWithError(2, err.Error())
|
|
return
|
|
}
|
|
if event.Type != fabricproto.SessionEventNone {
|
|
s.logFabricSession(FabricSessionEventLogEntry{
|
|
Event: "fabric_session_event",
|
|
SessionEvent: event.Type,
|
|
StreamID: event.StreamID,
|
|
Sequence: event.Sequence,
|
|
TrafficClass: event.TrafficClass,
|
|
AcceptedBy: "quic",
|
|
RemoteAddr: conn.RemoteAddr().String(),
|
|
})
|
|
}
|
|
for _, response := range responses {
|
|
if err := fabricproto.WriteFrame(stream, response); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *QUICFabricServer) logFabricSession(entry FabricSessionEventLogEntry) {
|
|
if s != nil && s.logger != nil {
|
|
s.logger(entry)
|
|
}
|
|
}
|