196 lines
4.6 KiB
Go
196 lines
4.6 KiB
Go
package fabriccontrol
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/elliptic"
|
|
"crypto/rand"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"crypto/x509/pkix"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/big"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/quic-go/quic-go"
|
|
)
|
|
|
|
const nextProto = "rap-fabric-data-session-v1"
|
|
|
|
type Config struct {
|
|
Enabled bool
|
|
ListenAddr string
|
|
}
|
|
|
|
type Server struct {
|
|
cfg Config
|
|
router http.Handler
|
|
ln *quic.Listener
|
|
}
|
|
|
|
type rawControlRequest struct {
|
|
Method string `json:"method"`
|
|
Path string `json:"path"`
|
|
Body json.RawMessage `json:"body,omitempty"`
|
|
}
|
|
|
|
type rawControlResponse struct {
|
|
StatusCode int `json:"status_code"`
|
|
Body json.RawMessage `json:"body,omitempty"`
|
|
}
|
|
|
|
type controlEnvelope struct {
|
|
Payload json.RawMessage `json:"payload,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
func New(cfg Config, router http.Handler) *Server {
|
|
return &Server{cfg: cfg, router: router}
|
|
}
|
|
|
|
func (s *Server) ListenAndServe(ctx context.Context) error {
|
|
if s == nil || !s.cfg.Enabled {
|
|
return nil
|
|
}
|
|
listenAddr := strings.TrimSpace(s.cfg.ListenAddr)
|
|
if listenAddr == "" {
|
|
listenAddr = ":19191"
|
|
}
|
|
ln, err := quic.ListenAddr(listenAddr, selfSignedTLSConfig(), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.ln = ln
|
|
go func() {
|
|
<-ctx.Done()
|
|
_ = ln.Close()
|
|
}()
|
|
for {
|
|
conn, err := ln.Accept(ctx)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
go s.handleConn(ctx, conn)
|
|
}
|
|
}
|
|
|
|
func (s *Server) Close() error {
|
|
if s == nil || s.ln == nil {
|
|
return nil
|
|
}
|
|
return s.ln.Close()
|
|
}
|
|
|
|
func (s *Server) handleConn(ctx context.Context, conn *quic.Conn) {
|
|
for {
|
|
stream, err := conn.AcceptStream(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
go s.handleStream(ctx, stream)
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleStream(ctx context.Context, stream *quic.Stream) {
|
|
defer stream.Close()
|
|
for {
|
|
reqFrame, err := readFrame(stream)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if reqFrame.Type != frameData || reqFrame.StreamID != controlForwardQUICStream {
|
|
continue
|
|
}
|
|
payload, err := s.handlePayload(ctx, reqFrame.Payload)
|
|
envelope := controlEnvelope{Payload: payload}
|
|
if err != nil {
|
|
envelope = controlEnvelope{Error: err.Error()}
|
|
}
|
|
raw, _ := json.Marshal(envelope)
|
|
_ = writeFrame(stream, frame{
|
|
Type: frameData,
|
|
TrafficClass: trafficClassReliable,
|
|
StreamID: controlForwardQUICStream,
|
|
Sequence: reqFrame.Sequence,
|
|
Payload: raw,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *Server) handlePayload(ctx context.Context, payload []byte) (json.RawMessage, error) {
|
|
var req rawControlRequest
|
|
if err := json.Unmarshal(payload, &req); err != nil {
|
|
return nil, fmt.Errorf("invalid fabric control request")
|
|
}
|
|
method := strings.ToUpper(strings.TrimSpace(req.Method))
|
|
if method == "" {
|
|
method = http.MethodGet
|
|
}
|
|
path := normalizeControlPath(req.Path)
|
|
if path == "" {
|
|
return nil, fmt.Errorf("fabric control path is not allowed")
|
|
}
|
|
httpReq := httptest.NewRequest(method, path, bytes.NewReader(req.Body)).WithContext(ctx)
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
httpReq.Header.Set("X-RAP-Fabric-Control", "quic")
|
|
rec := httptest.NewRecorder()
|
|
s.router.ServeHTTP(rec, httpReq)
|
|
body := append(json.RawMessage(nil), rec.Body.Bytes()...)
|
|
raw, err := json.Marshal(rawControlResponse{StatusCode: rec.Code, Body: body})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return raw, nil
|
|
}
|
|
|
|
func normalizeControlPath(path string) string {
|
|
path = strings.TrimSpace(path)
|
|
if path == "" || strings.Contains(path, "://") || strings.Contains(path, "..") {
|
|
return ""
|
|
}
|
|
if !strings.HasPrefix(path, "/") {
|
|
path = "/" + path
|
|
}
|
|
if strings.HasPrefix(path, "/api/v1/") {
|
|
return path
|
|
}
|
|
switch {
|
|
case strings.HasPrefix(path, "/clusters/"),
|
|
strings.HasPrefix(path, "/organizations/"),
|
|
strings.HasPrefix(path, "/node-agents/"),
|
|
strings.HasPrefix(path, "/auth/"):
|
|
return "/api/v1" + path
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func selfSignedTLSConfig() *tls.Config {
|
|
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
tmpl := x509.Certificate{
|
|
SerialNumber: big.NewInt(time.Now().UnixNano()),
|
|
Subject: pkix.Name{CommonName: "rap-fabric-control"},
|
|
NotBefore: time.Now().Add(-time.Hour),
|
|
NotAfter: time.Now().Add(365 * 24 * time.Hour),
|
|
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
|
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
|
}
|
|
der, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &key.PublicKey, key)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
cert := tls.Certificate{Certificate: [][]byte{der}, PrivateKey: key}
|
|
return &tls.Config{Certificates: []tls.Certificate{cert}, NextProtos: []string{nextProto}}
|
|
}
|