271 lines
8.3 KiB
Go
271 lines
8.3 KiB
Go
package runtime
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
chimiddleware "github.com/go-chi/chi/v5/middleware"
|
|
|
|
"github.com/example/remote-access-platform/backend/internal/modules/auth"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/cluster"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/identitysource"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/node"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/nodeagent"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/organization"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/resource"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/sessionbroker"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/sessiongateway"
|
|
"github.com/example/remote-access-platform/backend/internal/modules/worker"
|
|
"github.com/example/remote-access-platform/backend/internal/platform/authority"
|
|
"github.com/example/remote-access-platform/backend/internal/platform/config"
|
|
"github.com/example/remote-access-platform/backend/internal/platform/fabriccontrol"
|
|
"github.com/example/remote-access-platform/backend/internal/platform/httpserver"
|
|
"github.com/example/remote-access-platform/backend/internal/platform/logging"
|
|
"github.com/example/remote-access-platform/backend/internal/platform/module"
|
|
postgresplatform "github.com/example/remote-access-platform/backend/internal/platform/postgres"
|
|
redisplatform "github.com/example/remote-access-platform/backend/internal/platform/redis"
|
|
"github.com/example/remote-access-platform/backend/internal/platform/secrets"
|
|
)
|
|
|
|
type App struct {
|
|
cfg config.Config
|
|
logger *slog.Logger
|
|
httpServer *http.Server
|
|
fabricControl *fabriccontrol.Server
|
|
workers []backgroundRunner
|
|
db closeFunc
|
|
redis closeFunc
|
|
}
|
|
|
|
type closeFunc func() error
|
|
type backgroundRunner func(context.Context) error
|
|
|
|
func NewApp(ctx context.Context) (*App, error) {
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("load config: %w", err)
|
|
}
|
|
|
|
logger := logging.New(cfg.App.Env)
|
|
|
|
db, err := postgresplatform.Open(ctx, cfg.Postgres)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
redisClient, err := redisplatform.Open(ctx, cfg.Redis)
|
|
if err != nil {
|
|
db.Close()
|
|
return nil, err
|
|
}
|
|
|
|
authorityVerifier, err := authority.NewVerifier(cfg.Installation)
|
|
if err != nil {
|
|
redisClient.Close()
|
|
db.Close()
|
|
return nil, fmt.Errorf("create installation authority verifier: %w", err)
|
|
}
|
|
|
|
deps := module.Dependencies{
|
|
Config: module.Config{
|
|
App: cfg.App,
|
|
Auth: cfg.Auth,
|
|
Installation: cfg.Installation,
|
|
DataPlane: cfg.DataPlane,
|
|
Secret: cfg.Secret,
|
|
Session: cfg.Session,
|
|
Worker: cfg.Worker,
|
|
WebSocket: cfg.WebSocket,
|
|
},
|
|
Infra: module.Infra{
|
|
Logger: logger,
|
|
DB: db,
|
|
Redis: redisClient,
|
|
},
|
|
}
|
|
|
|
workerStore := worker.NewRedisStore(redisClient)
|
|
workerService := worker.NewService(deps, workerStore)
|
|
authStore := auth.NewPostgresStore(db)
|
|
authTx := auth.NewPostgresTransactor(db)
|
|
authService := auth.NewService(deps, authStore, authTx, authorityVerifier)
|
|
var resourceSecretStore *secrets.ResourceSecretStore
|
|
if cfg.Secret.EncryptionKeyBase64 != "" {
|
|
secretEncryptor, err := secrets.NewEncryptor(cfg.Secret.EncryptionKeyBase64, cfg.Secret.EncryptionKeyID)
|
|
if err != nil {
|
|
redisClient.Close()
|
|
db.Close()
|
|
return nil, fmt.Errorf("create resource secret encryptor: %w", err)
|
|
}
|
|
resourceSecretStore = secrets.NewResourceSecretStore(db, secretEncryptor)
|
|
}
|
|
|
|
brokerStore := sessionbroker.NewPostgresStore(db, authorityVerifier)
|
|
brokerTx := sessionbroker.NewPostgresTransactor(db, authorityVerifier)
|
|
liveStateStore := sessionbroker.NewRedisLiveStateStore(redisClient)
|
|
brokerService := sessionbroker.NewService(deps, brokerStore, brokerTx, liveStateStore, workerService, resourceSecretStore)
|
|
workerEvents := worker.NewEventProcessor(redisClient, brokerService)
|
|
leaseMonitor := worker.NewLeaseMonitor(workerService, brokerService, cfg.Worker.StaleLeaseGracePeriod)
|
|
|
|
brokerModule := sessionbroker.NewModule(brokerService)
|
|
authModule := auth.NewModule(deps, authService)
|
|
clusterModule := cluster.NewModule(deps, authorityVerifier)
|
|
organizationModule := organization.NewModule(deps)
|
|
identitySourceModule := identitysource.NewModule(deps)
|
|
resourceModule := resource.NewModule(deps, resourceSecretStore)
|
|
nodeModule := node.NewModule(deps)
|
|
nodeAgentModule := nodeagent.NewModule(deps)
|
|
sessionGatewayModule := sessiongateway.NewModule(deps, brokerModule.Service(), workerService)
|
|
|
|
router := buildRouter(
|
|
logger,
|
|
authModule,
|
|
clusterModule,
|
|
organizationModule,
|
|
identitySourceModule,
|
|
resourceModule,
|
|
brokerModule,
|
|
nodeModule,
|
|
nodeAgentModule,
|
|
sessionGatewayModule,
|
|
)
|
|
|
|
return &App{
|
|
cfg: cfg,
|
|
logger: logger,
|
|
httpServer: httpserver.New(cfg.HTTP, router),
|
|
fabricControl: fabriccontrol.New(fabriccontrol.Config{
|
|
Enabled: cfg.FabricControl.Enabled,
|
|
ListenAddr: cfg.FabricControl.ListenAddr,
|
|
}, router),
|
|
workers: []backgroundRunner{workerEvents.Run, leaseMonitor.Run},
|
|
db: func() error {
|
|
db.Close()
|
|
return nil
|
|
},
|
|
redis: redisClient.Close,
|
|
}, nil
|
|
}
|
|
|
|
func (a *App) Run(ctx context.Context) error {
|
|
errCh := make(chan error, 1)
|
|
|
|
go func() {
|
|
a.logger.Info("http server starting", "addr", a.httpServer.Addr, "service", a.cfg.App.Name)
|
|
if err := a.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
errCh <- err
|
|
return
|
|
}
|
|
errCh <- nil
|
|
}()
|
|
|
|
for _, runner := range a.workers {
|
|
runner := runner
|
|
go func() {
|
|
if err := runner(ctx); err != nil {
|
|
errCh <- err
|
|
}
|
|
}()
|
|
}
|
|
if a.fabricControl != nil && a.cfg.FabricControl.Enabled {
|
|
go func() {
|
|
a.logger.Info("fabric control quic starting", "addr", a.cfg.FabricControl.ListenAddr, "service", a.cfg.App.Name)
|
|
if err := a.fabricControl.ListenAndServe(ctx); err != nil {
|
|
errCh <- err
|
|
}
|
|
}()
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
a.logger.Info("shutdown signal received")
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), a.cfg.HTTP.ShutdownTimeout)
|
|
defer cancel()
|
|
|
|
if err := a.httpServer.Shutdown(shutdownCtx); err != nil {
|
|
return fmt.Errorf("shutdown http server: %w", err)
|
|
}
|
|
if a.fabricControl != nil {
|
|
_ = a.fabricControl.Close()
|
|
}
|
|
|
|
if err := a.redis(); err != nil {
|
|
return fmt.Errorf("close redis: %w", err)
|
|
}
|
|
|
|
if err := a.db(); err != nil {
|
|
return fmt.Errorf("close postgres: %w", err)
|
|
}
|
|
|
|
a.logger.Info("app stopped", "at", time.Now().UTC())
|
|
return nil
|
|
}
|
|
|
|
func buildRouter(logger *slog.Logger, modules ...module.Module) http.Handler {
|
|
router := chi.NewRouter()
|
|
router.Use(chimiddleware.RequestID)
|
|
router.Use(chimiddleware.RealIP)
|
|
router.Use(chimiddleware.Recoverer)
|
|
router.Use(chimiddleware.Timeout(60 * time.Second))
|
|
router.Use(chimiddleware.Heartbeat("/healthz"))
|
|
|
|
router.Get("/readyz", func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ready"))
|
|
})
|
|
router.Post("/mesh/v1/health", controlPlaneMeshHealth)
|
|
|
|
router.Route("/api/v1", func(r chi.Router) {
|
|
for _, mod := range modules {
|
|
logger.Info("register module routes", "module", mod.Name())
|
|
mod.RegisterRoutes(r)
|
|
}
|
|
})
|
|
|
|
return router
|
|
}
|
|
|
|
func controlPlaneMeshHealth(w http.ResponseWriter, r *http.Request) {
|
|
var message struct {
|
|
ProtocolVersion string `json:"protocol_version"`
|
|
From struct {
|
|
ClusterID string `json:"cluster_id"`
|
|
NodeID string `json:"node_id"`
|
|
} `json:"from"`
|
|
To struct {
|
|
ClusterID string `json:"cluster_id"`
|
|
NodeID string `json:"node_id"`
|
|
} `json:"to"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&message); err != nil {
|
|
http.Error(w, "invalid mesh health message", http.StatusBadRequest)
|
|
return
|
|
}
|
|
if message.ProtocolVersion != "mesh-control-v1" || message.From.ClusterID == "" || message.From.NodeID == "" {
|
|
http.Error(w, "invalid mesh health message", http.StatusBadRequest)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
|
"protocol_version": "mesh-control-v1",
|
|
"accepted": true,
|
|
"by": map[string]string{
|
|
"cluster_id": message.From.ClusterID,
|
|
"node_id": "control-plane-relay",
|
|
},
|
|
})
|
|
}
|