Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

271 lines
8.3 KiB
Go

package runtime
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/example/remote-access-platform/backend/internal/modules/auth"
"github.com/example/remote-access-platform/backend/internal/modules/cluster"
"github.com/example/remote-access-platform/backend/internal/modules/identitysource"
"github.com/example/remote-access-platform/backend/internal/modules/node"
"github.com/example/remote-access-platform/backend/internal/modules/nodeagent"
"github.com/example/remote-access-platform/backend/internal/modules/organization"
"github.com/example/remote-access-platform/backend/internal/modules/resource"
"github.com/example/remote-access-platform/backend/internal/modules/sessionbroker"
"github.com/example/remote-access-platform/backend/internal/modules/sessiongateway"
"github.com/example/remote-access-platform/backend/internal/modules/worker"
"github.com/example/remote-access-platform/backend/internal/platform/authority"
"github.com/example/remote-access-platform/backend/internal/platform/config"
"github.com/example/remote-access-platform/backend/internal/platform/fabriccontrol"
"github.com/example/remote-access-platform/backend/internal/platform/httpserver"
"github.com/example/remote-access-platform/backend/internal/platform/logging"
"github.com/example/remote-access-platform/backend/internal/platform/module"
postgresplatform "github.com/example/remote-access-platform/backend/internal/platform/postgres"
redisplatform "github.com/example/remote-access-platform/backend/internal/platform/redis"
"github.com/example/remote-access-platform/backend/internal/platform/secrets"
)
type App struct {
cfg config.Config
logger *slog.Logger
httpServer *http.Server
fabricControl *fabriccontrol.Server
workers []backgroundRunner
db closeFunc
redis closeFunc
}
type closeFunc func() error
type backgroundRunner func(context.Context) error
func NewApp(ctx context.Context) (*App, error) {
cfg, err := config.Load()
if err != nil {
return nil, fmt.Errorf("load config: %w", err)
}
logger := logging.New(cfg.App.Env)
db, err := postgresplatform.Open(ctx, cfg.Postgres)
if err != nil {
return nil, err
}
redisClient, err := redisplatform.Open(ctx, cfg.Redis)
if err != nil {
db.Close()
return nil, err
}
authorityVerifier, err := authority.NewVerifier(cfg.Installation)
if err != nil {
redisClient.Close()
db.Close()
return nil, fmt.Errorf("create installation authority verifier: %w", err)
}
deps := module.Dependencies{
Config: module.Config{
App: cfg.App,
Auth: cfg.Auth,
Installation: cfg.Installation,
DataPlane: cfg.DataPlane,
Secret: cfg.Secret,
Session: cfg.Session,
Worker: cfg.Worker,
WebSocket: cfg.WebSocket,
},
Infra: module.Infra{
Logger: logger,
DB: db,
Redis: redisClient,
},
}
workerStore := worker.NewRedisStore(redisClient)
workerService := worker.NewService(deps, workerStore)
authStore := auth.NewPostgresStore(db)
authTx := auth.NewPostgresTransactor(db)
authService := auth.NewService(deps, authStore, authTx, authorityVerifier)
var resourceSecretStore *secrets.ResourceSecretStore
if cfg.Secret.EncryptionKeyBase64 != "" {
secretEncryptor, err := secrets.NewEncryptor(cfg.Secret.EncryptionKeyBase64, cfg.Secret.EncryptionKeyID)
if err != nil {
redisClient.Close()
db.Close()
return nil, fmt.Errorf("create resource secret encryptor: %w", err)
}
resourceSecretStore = secrets.NewResourceSecretStore(db, secretEncryptor)
}
brokerStore := sessionbroker.NewPostgresStore(db, authorityVerifier)
brokerTx := sessionbroker.NewPostgresTransactor(db, authorityVerifier)
liveStateStore := sessionbroker.NewRedisLiveStateStore(redisClient)
brokerService := sessionbroker.NewService(deps, brokerStore, brokerTx, liveStateStore, workerService, resourceSecretStore)
workerEvents := worker.NewEventProcessor(redisClient, brokerService)
leaseMonitor := worker.NewLeaseMonitor(workerService, brokerService, cfg.Worker.StaleLeaseGracePeriod)
brokerModule := sessionbroker.NewModule(brokerService)
authModule := auth.NewModule(deps, authService)
clusterModule := cluster.NewModule(deps, authorityVerifier)
organizationModule := organization.NewModule(deps)
identitySourceModule := identitysource.NewModule(deps)
resourceModule := resource.NewModule(deps, resourceSecretStore)
nodeModule := node.NewModule(deps)
nodeAgentModule := nodeagent.NewModule(deps)
sessionGatewayModule := sessiongateway.NewModule(deps, brokerModule.Service(), workerService)
router := buildRouter(
logger,
authModule,
clusterModule,
organizationModule,
identitySourceModule,
resourceModule,
brokerModule,
nodeModule,
nodeAgentModule,
sessionGatewayModule,
)
return &App{
cfg: cfg,
logger: logger,
httpServer: httpserver.New(cfg.HTTP, router),
fabricControl: fabriccontrol.New(fabriccontrol.Config{
Enabled: cfg.FabricControl.Enabled,
ListenAddr: cfg.FabricControl.ListenAddr,
}, router),
workers: []backgroundRunner{workerEvents.Run, leaseMonitor.Run},
db: func() error {
db.Close()
return nil
},
redis: redisClient.Close,
}, nil
}
func (a *App) Run(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
a.logger.Info("http server starting", "addr", a.httpServer.Addr, "service", a.cfg.App.Name)
if err := a.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
return
}
errCh <- nil
}()
for _, runner := range a.workers {
runner := runner
go func() {
if err := runner(ctx); err != nil {
errCh <- err
}
}()
}
if a.fabricControl != nil && a.cfg.FabricControl.Enabled {
go func() {
a.logger.Info("fabric control quic starting", "addr", a.cfg.FabricControl.ListenAddr, "service", a.cfg.App.Name)
if err := a.fabricControl.ListenAndServe(ctx); err != nil {
errCh <- err
}
}()
}
select {
case <-ctx.Done():
a.logger.Info("shutdown signal received")
case err := <-errCh:
if err != nil {
return err
}
return nil
}
shutdownCtx, cancel := context.WithTimeout(context.Background(), a.cfg.HTTP.ShutdownTimeout)
defer cancel()
if err := a.httpServer.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("shutdown http server: %w", err)
}
if a.fabricControl != nil {
_ = a.fabricControl.Close()
}
if err := a.redis(); err != nil {
return fmt.Errorf("close redis: %w", err)
}
if err := a.db(); err != nil {
return fmt.Errorf("close postgres: %w", err)
}
a.logger.Info("app stopped", "at", time.Now().UTC())
return nil
}
func buildRouter(logger *slog.Logger, modules ...module.Module) http.Handler {
router := chi.NewRouter()
router.Use(chimiddleware.RequestID)
router.Use(chimiddleware.RealIP)
router.Use(chimiddleware.Recoverer)
router.Use(chimiddleware.Timeout(60 * time.Second))
router.Use(chimiddleware.Heartbeat("/healthz"))
router.Get("/readyz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ready"))
})
router.Post("/mesh/v1/health", controlPlaneMeshHealth)
router.Route("/api/v1", func(r chi.Router) {
for _, mod := range modules {
logger.Info("register module routes", "module", mod.Name())
mod.RegisterRoutes(r)
}
})
return router
}
func controlPlaneMeshHealth(w http.ResponseWriter, r *http.Request) {
var message struct {
ProtocolVersion string `json:"protocol_version"`
From struct {
ClusterID string `json:"cluster_id"`
NodeID string `json:"node_id"`
} `json:"from"`
To struct {
ClusterID string `json:"cluster_id"`
NodeID string `json:"node_id"`
} `json:"to"`
}
if err := json.NewDecoder(r.Body).Decode(&message); err != nil {
http.Error(w, "invalid mesh health message", http.StatusBadRequest)
return
}
if message.ProtocolVersion != "mesh-control-v1" || message.From.ClusterID == "" || message.From.NodeID == "" {
http.Error(w, "invalid mesh health message", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"protocol_version": "mesh-control-v1",
"accepted": true,
"by": map[string]string{
"cluster_id": message.From.ClusterID,
"node_id": "control-plane-relay",
},
})
}