Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

196 lines
4.6 KiB
Go

package fabriccontrol
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"math/big"
"net/http"
"net/http/httptest"
"strings"
"time"
"github.com/quic-go/quic-go"
)
const nextProto = "rap-fabric-data-session-v1"
type Config struct {
Enabled bool
ListenAddr string
}
type Server struct {
cfg Config
router http.Handler
ln *quic.Listener
}
type rawControlRequest struct {
Method string `json:"method"`
Path string `json:"path"`
Body json.RawMessage `json:"body,omitempty"`
}
type rawControlResponse struct {
StatusCode int `json:"status_code"`
Body json.RawMessage `json:"body,omitempty"`
}
type controlEnvelope struct {
Payload json.RawMessage `json:"payload,omitempty"`
Error string `json:"error,omitempty"`
}
func New(cfg Config, router http.Handler) *Server {
return &Server{cfg: cfg, router: router}
}
func (s *Server) ListenAndServe(ctx context.Context) error {
if s == nil || !s.cfg.Enabled {
return nil
}
listenAddr := strings.TrimSpace(s.cfg.ListenAddr)
if listenAddr == "" {
listenAddr = ":19191"
}
ln, err := quic.ListenAddr(listenAddr, selfSignedTLSConfig(), nil)
if err != nil {
return err
}
s.ln = ln
go func() {
<-ctx.Done()
_ = ln.Close()
}()
for {
conn, err := ln.Accept(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
return err
}
go s.handleConn(ctx, conn)
}
}
func (s *Server) Close() error {
if s == nil || s.ln == nil {
return nil
}
return s.ln.Close()
}
func (s *Server) handleConn(ctx context.Context, conn *quic.Conn) {
for {
stream, err := conn.AcceptStream(ctx)
if err != nil {
return
}
go s.handleStream(ctx, stream)
}
}
func (s *Server) handleStream(ctx context.Context, stream *quic.Stream) {
defer stream.Close()
for {
reqFrame, err := readFrame(stream)
if err != nil {
return
}
if reqFrame.Type != frameData || reqFrame.StreamID != controlForwardQUICStream {
continue
}
payload, err := s.handlePayload(ctx, reqFrame.Payload)
envelope := controlEnvelope{Payload: payload}
if err != nil {
envelope = controlEnvelope{Error: err.Error()}
}
raw, _ := json.Marshal(envelope)
_ = writeFrame(stream, frame{
Type: frameData,
TrafficClass: trafficClassReliable,
StreamID: controlForwardQUICStream,
Sequence: reqFrame.Sequence,
Payload: raw,
})
}
}
func (s *Server) handlePayload(ctx context.Context, payload []byte) (json.RawMessage, error) {
var req rawControlRequest
if err := json.Unmarshal(payload, &req); err != nil {
return nil, fmt.Errorf("invalid fabric control request")
}
method := strings.ToUpper(strings.TrimSpace(req.Method))
if method == "" {
method = http.MethodGet
}
path := normalizeControlPath(req.Path)
if path == "" {
return nil, fmt.Errorf("fabric control path is not allowed")
}
httpReq := httptest.NewRequest(method, path, bytes.NewReader(req.Body)).WithContext(ctx)
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("X-RAP-Fabric-Control", "quic")
rec := httptest.NewRecorder()
s.router.ServeHTTP(rec, httpReq)
body := append(json.RawMessage(nil), rec.Body.Bytes()...)
raw, err := json.Marshal(rawControlResponse{StatusCode: rec.Code, Body: body})
if err != nil {
return nil, err
}
return raw, nil
}
func normalizeControlPath(path string) string {
path = strings.TrimSpace(path)
if path == "" || strings.Contains(path, "://") || strings.Contains(path, "..") {
return ""
}
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
if strings.HasPrefix(path, "/api/v1/") {
return path
}
switch {
case strings.HasPrefix(path, "/clusters/"),
strings.HasPrefix(path, "/organizations/"),
strings.HasPrefix(path, "/node-agents/"),
strings.HasPrefix(path, "/auth/"):
return "/api/v1" + path
default:
return ""
}
}
func selfSignedTLSConfig() *tls.Config {
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
panic(err)
}
tmpl := x509.Certificate{
SerialNumber: big.NewInt(time.Now().UnixNano()),
Subject: pkix.Name{CommonName: "rap-fabric-control"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}
der, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &key.PublicKey, key)
if err != nil {
panic(err)
}
cert := tls.Certificate{Certificate: [][]byte{der}, PrivateKey: key}
return &tls.Config{Certificates: []tls.Certificate{cert}, NextProtos: []string{nextProto}}
}