Files
rdp-proxy/backend/internal/modules/cluster/postgres_store.go
T
2026-04-28 22:29:50 +03:00

2995 lines
101 KiB
Go

package cluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/example/remote-access-platform/backend/internal/platform/authority"
"github.com/example/remote-access-platform/backend/internal/platform/clusterauth"
"github.com/example/remote-access-platform/backend/internal/platform/secrets"
)
type PostgresStore struct {
db *pgxpool.Pool
authority *authority.Verifier
clusterKeyEncryptor *secrets.Encryptor
}
const encryptedClusterAuthorityKeyPrefix = "enc:v1:"
func NewPostgresStore(db *pgxpool.Pool, verifiers ...*authority.Verifier) *PostgresStore {
var authorityVerifier *authority.Verifier
if len(verifiers) > 0 {
authorityVerifier = verifiers[0]
}
return &PostgresStore{db: db, authority: authorityVerifier}
}
func (s *PostgresStore) WithClusterKeyEncryptor(encryptor *secrets.Encryptor) *PostgresStore {
if s != nil {
s.clusterKeyEncryptor = encryptor
}
return s
}
func (s *PostgresStore) encodeClusterAuthorityPrivateKey(clusterID, privateKey string) (string, error) {
if s == nil || s.clusterKeyEncryptor == nil {
return privateKey, nil
}
encrypted, err := s.clusterKeyEncryptor.Encrypt([]byte(privateKey), clusterAuthorityPrivateKeyAAD(clusterID))
if err != nil {
return "", err
}
payload, err := json.Marshal(encrypted)
if err != nil {
return "", err
}
return encryptedClusterAuthorityKeyPrefix + string(payload), nil
}
func (s *PostgresStore) decodeClusterAuthorityPrivateKey(clusterID, stored string) (string, error) {
if !strings.HasPrefix(stored, encryptedClusterAuthorityKeyPrefix) {
if s != nil && s.clusterKeyEncryptor != nil {
return "", fmt.Errorf("cluster authority private key is not encrypted")
}
return stored, nil
}
if s == nil || s.clusterKeyEncryptor == nil {
return "", secrets.ErrSecretEncryptionKeyMissing
}
var encrypted secrets.EncryptedPayload
if err := json.Unmarshal([]byte(strings.TrimPrefix(stored, encryptedClusterAuthorityKeyPrefix)), &encrypted); err != nil {
return "", fmt.Errorf("decode encrypted cluster authority private key: %w", err)
}
plaintext, err := s.clusterKeyEncryptor.Decrypt(encrypted, clusterAuthorityPrivateKeyAAD(clusterID))
if err != nil {
return "", err
}
return string(plaintext), nil
}
func clusterAuthorityPrivateKeyAAD(clusterID string) []byte {
return []byte("rap-cluster-authority-v1|" + strings.TrimSpace(clusterID))
}
func (s *PostgresStore) GetPlatformRole(ctx context.Context, userID string) (string, error) {
return authority.EffectivePlatformRole(ctx, s.db, s.authority, userID)
}
func (s *PostgresStore) ListClusters(ctx context.Context) ([]Cluster, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, slug, name, status, region, metadata, created_at, updated_at
FROM clusters
ORDER BY created_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []Cluster
for rows.Next() {
item, err := scanCluster(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) GetCluster(ctx context.Context, clusterID string) (Cluster, error) {
row := s.db.QueryRow(ctx, `
SELECT id::text, slug, name, status, region, metadata, created_at, updated_at
FROM clusters
WHERE id = $1::uuid
`, clusterID)
return scanCluster(row)
}
func (s *PostgresStore) CreateCluster(ctx context.Context, input CreateClusterInput) (Cluster, error) {
id := uuid.NewString()
now := time.Now().UTC()
keys, err := clusterauth.GenerateKeyPair()
if err != nil {
return Cluster{}, err
}
storedPrivateKey, err := s.encodeClusterAuthorityPrivateKey(id, keys.PrivateKeyB64)
if err != nil {
return Cluster{}, err
}
tx, err := s.db.Begin(ctx)
if err != nil {
return Cluster{}, err
}
defer func() { _ = tx.Rollback(ctx) }()
row := tx.QueryRow(ctx, `
INSERT INTO clusters (id, slug, name, status, region, metadata, created_at, updated_at)
VALUES ($1::uuid, $2, $3, 'active', $4, $5::jsonb, $6, $6)
RETURNING id::text, slug, name, status, region, metadata, created_at, updated_at
`, id, input.Slug, input.Name, input.Region, []byte(input.Metadata), now)
item, err := scanCluster(row)
if err != nil {
return Cluster{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO cluster_authority_states (cluster_id, authority_state, mutation_mode, term, notes, updated_by_user_id, updated_at)
VALUES ($1::uuid, 'authoritative', 'normal', 1, 'cluster created with authority key', $2::uuid, $3)
ON CONFLICT (cluster_id) DO NOTHING
`, id, input.ActorUserID, now); err != nil {
return Cluster{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO cluster_authorities (
cluster_id, authority_state, key_algorithm, public_key, public_key_fingerprint,
private_key, created_by_user_id, created_at, updated_at, metadata
) VALUES ($1::uuid, 'active', 'ed25519', $2, $3, $4, $5::uuid, $6, $6, $7::jsonb)
`, id, keys.PublicKeyB64, keys.Fingerprint, storedPrivateKey, input.ActorUserID, now, []byte(`{"storage":"database_signer","production_target":"external_cluster_signer_or_hsm"}`)); err != nil {
return Cluster{}, err
}
if err := tx.Commit(ctx); err != nil {
return Cluster{}, err
}
return item, nil
}
func (s *PostgresStore) UpdateCluster(ctx context.Context, input UpdateClusterInput) (Cluster, error) {
now := time.Now().UTC()
row := s.db.QueryRow(ctx, `
UPDATE clusters
SET name = $2,
status = $3,
region = $4,
metadata = $5::jsonb,
updated_at = $6
WHERE id = $1::uuid
RETURNING id::text, slug, name, status, region, metadata, created_at, updated_at
`, input.ClusterID, input.Name, input.Status, input.Region, []byte(input.Metadata), now)
return scanCluster(row)
}
func (s *PostgresStore) GetClusterAuthority(ctx context.Context, clusterID string) (ClusterAuthorityKey, error) {
row := s.db.QueryRow(ctx, `
SELECT cluster_id::text, authority_state, key_algorithm, public_key,
public_key_fingerprint, private_key, created_at, updated_at
FROM cluster_authorities
WHERE cluster_id = $1::uuid
`, clusterID)
item, err := scanClusterAuthority(row)
if err != nil {
return ClusterAuthorityKey{}, err
}
privateKey, err := s.decodeClusterAuthorityPrivateKey(item.ClusterID, item.PrivateKey)
if err != nil {
return ClusterAuthorityKey{}, err
}
item.PrivateKey = privateKey
return item, nil
}
func (s *PostgresStore) EnsureClusterAuthority(ctx context.Context, clusterID string, actorUserID *string) (ClusterAuthorityKey, error) {
keys, err := clusterauth.GenerateKeyPair()
if err != nil {
return ClusterAuthorityKey{}, err
}
storedPrivateKey, err := s.encodeClusterAuthorityPrivateKey(clusterID, keys.PrivateKeyB64)
if err != nil {
return ClusterAuthorityKey{}, err
}
now := time.Now().UTC()
_, err = s.db.Exec(ctx, `
INSERT INTO cluster_authorities (
cluster_id, authority_state, key_algorithm, public_key, public_key_fingerprint,
private_key, created_by_user_id, created_at, updated_at, metadata
)
SELECT c.id, 'active', 'ed25519', $2, $3, $4, $5::uuid, $6, $6,
'{"storage":"database_signer","created_by":"ensure_cluster_authority"}'::jsonb
FROM clusters c
WHERE c.id = $1::uuid
ON CONFLICT (cluster_id) DO NOTHING
`, clusterID, keys.PublicKeyB64, keys.Fingerprint, storedPrivateKey, actorUserID, now)
if err != nil {
return ClusterAuthorityKey{}, err
}
_, _ = s.db.Exec(ctx, `
INSERT INTO cluster_authority_states (cluster_id, authority_state, mutation_mode, term, notes, updated_by_user_id, updated_at)
VALUES ($1::uuid, 'authoritative', 'normal', 1, 'authority key ensured', $2::uuid, $3)
ON CONFLICT (cluster_id) DO NOTHING
`, clusterID, actorUserID, now)
return s.GetClusterAuthority(ctx, clusterID)
}
func (s *PostgresStore) ListClusterNodes(ctx context.Context, clusterID string) ([]ClusterNode, error) {
rows, err := s.db.Query(ctx, `
SELECT n.id::text, n.owner_organization_id::text, n.node_key, n.name, n.ownership_type,
n.registration_status, n.health_status, n.version_state, n.partition_state,
n.reported_version, n.last_seen_at, cm.membership_status, cm.metadata,
ng.id::text, ng.name,
n.created_at, n.updated_at
FROM cluster_memberships cm
JOIN nodes n ON n.id = cm.node_id
LEFT JOIN cluster_node_group_memberships ngm ON ngm.cluster_id = cm.cluster_id AND ngm.node_id = cm.node_id
LEFT JOIN cluster_node_groups ng ON ng.cluster_id = ngm.cluster_id AND ng.id = ngm.group_id
WHERE cm.cluster_id = $1::uuid
ORDER BY n.created_at DESC
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []ClusterNode
for rows.Next() {
item, err := scanClusterNode(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ListNodeGroups(ctx context.Context, clusterID string) ([]ClusterNodeGroup, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, parent_group_id::text, name, description,
sort_order, metadata, created_by_user_id::text, created_at, updated_at
FROM cluster_node_groups
WHERE cluster_id = $1::uuid
ORDER BY sort_order, name
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []ClusterNodeGroup
for rows.Next() {
item, err := scanNodeGroup(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
if out == nil {
out = []ClusterNodeGroup{}
}
return out, rows.Err()
}
func (s *PostgresStore) CreateNodeGroup(ctx context.Context, input CreateNodeGroupInput) (ClusterNodeGroup, error) {
id := uuid.NewString()
row := s.db.QueryRow(ctx, `
WITH parent_ok AS (
SELECT $3::uuid AS parent_group_id
WHERE $3::uuid IS NULL
OR EXISTS (
SELECT 1
FROM cluster_node_groups parent
WHERE parent.cluster_id = $2::uuid
AND parent.id = $3::uuid
)
)
INSERT INTO cluster_node_groups (
id, cluster_id, parent_group_id, name, description, sort_order, metadata, created_by_user_id, created_at, updated_at
)
SELECT $1::uuid, $2::uuid, parent_group_id, $4, $5, $6, $7::jsonb, $8::uuid, NOW(), NOW()
FROM parent_ok
RETURNING id::text, cluster_id::text, parent_group_id::text, name, description,
sort_order, metadata, created_by_user_id::text, created_at, updated_at
`, id, input.ClusterID, input.ParentGroupID, input.Name, input.Description, input.SortOrder, []byte(input.Metadata), input.ActorUserID)
item, err := scanNodeGroup(row)
if err != nil {
return ClusterNodeGroup{}, err
}
_ = s.RecordAudit(ctx, ClusterAuditEvent{
ClusterID: &input.ClusterID,
ActorUserID: &input.ActorUserID,
EventType: "cluster_node_group.created",
TargetType: "cluster_node_group",
TargetID: &item.ID,
Payload: json.RawMessage(`{"hierarchical":true}`),
CreatedAt: time.Now().UTC(),
})
return item, nil
}
func (s *PostgresStore) CreateJoinToken(ctx context.Context, input CreateJoinTokenInput, tokenHash string) (NodeJoinToken, error) {
id := uuid.NewString()
row := s.db.QueryRow(ctx, `
INSERT INTO node_join_tokens (
id, cluster_id, token_hash, scope, expires_at, max_uses, used_count, status, created_by_user_id, created_at
) VALUES ($1::uuid, $2::uuid, $3, $4::jsonb, $5, $6, 0, 'active', $7::uuid, NOW())
RETURNING id::text, cluster_id::text, scope, expires_at, max_uses, used_count, status,
created_by_user_id::text, created_at, revoked_at, authority_payload, authority_signature
`, id, input.ClusterID, tokenHash, []byte(input.Scope), input.ExpiresAt, input.MaxUses, input.ActorUserID)
return scanJoinToken(row)
}
func (s *PostgresStore) SetJoinTokenAuthority(ctx context.Context, clusterID, tokenID string, payload json.RawMessage, signature ClusterSignature) (NodeJoinToken, error) {
signatureJSON, err := json.Marshal(signature)
if err != nil {
return NodeJoinToken{}, err
}
row := s.db.QueryRow(ctx, `
UPDATE node_join_tokens
SET authority_payload = $3::jsonb,
authority_signature = $4::jsonb
WHERE cluster_id = $1::uuid
AND id = $2::uuid
RETURNING id::text, cluster_id::text, scope, expires_at, max_uses, used_count, status,
created_by_user_id::text, created_at, revoked_at, authority_payload, authority_signature
`, clusterID, tokenID, []byte(payload), signatureJSON)
return scanJoinToken(row)
}
func (s *PostgresStore) GetValidJoinTokenByHash(ctx context.Context, clusterID, tokenHash string) (NodeJoinToken, error) {
row := s.db.QueryRow(ctx, `
SELECT id::text, cluster_id::text, scope, expires_at, max_uses, used_count, status,
created_by_user_id::text, created_at, revoked_at, authority_payload, authority_signature
FROM node_join_tokens
WHERE cluster_id = $1::uuid
AND token_hash = $2
AND status = 'active'
AND expires_at > NOW()
AND used_count < max_uses
`, clusterID, tokenHash)
return scanJoinToken(row)
}
func (s *PostgresStore) RevokeJoinToken(ctx context.Context, input RevokeJoinTokenInput) (NodeJoinToken, error) {
row := s.db.QueryRow(ctx, `
UPDATE node_join_tokens
SET status = 'revoked',
revoked_at = NOW()
WHERE id = $1::uuid
AND cluster_id = $2::uuid
AND status = 'active'
RETURNING id::text, cluster_id::text, scope, expires_at, max_uses, used_count, status,
created_by_user_id::text, created_at, revoked_at, authority_payload, authority_signature
`, input.TokenID, input.ClusterID)
return scanJoinToken(row)
}
func (s *PostgresStore) ExpireJoinTokens(ctx context.Context, clusterID string) error {
_, err := s.db.Exec(ctx, `
UPDATE node_join_tokens
SET status = 'expired'
WHERE cluster_id = $1::uuid
AND status = 'active'
AND expires_at <= NOW()
`, clusterID)
return err
}
func (s *PostgresStore) CreateJoinRequest(ctx context.Context, input CreateJoinRequestInput, joinTokenID string) (NodeJoinRequest, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return NodeJoinRequest{}, err
}
defer tx.Rollback(ctx)
tag, err := tx.Exec(ctx, `
UPDATE node_join_tokens
SET used_count = used_count + 1
WHERE id = $1::uuid
AND cluster_id = $2::uuid
AND status = 'active'
AND expires_at > NOW()
AND used_count < max_uses
`, joinTokenID, input.ClusterID)
if err != nil {
return NodeJoinRequest{}, err
}
if tag.RowsAffected() != 1 {
return NodeJoinRequest{}, pgx.ErrNoRows
}
id := uuid.NewString()
row := tx.QueryRow(ctx, `
INSERT INTO node_join_requests (
id, cluster_id, join_token_id, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, created_at, updated_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, 'pending', NOW(), NOW())
RETURNING id::text, cluster_id::text, join_token_id::text, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, reviewed_by_user_id::text,
reviewed_at, approved_node_id::text, rejection_reason, created_at, updated_at, approval_payload, approval_signature
`, id, input.ClusterID, joinTokenID, input.NodeName, input.NodeFingerprint, input.PublicKey, []byte(input.ReportedCapabilities), []byte(input.ReportedFacts), []byte(input.RequestedRoles))
item, err := scanJoinRequest(row)
if err != nil {
return NodeJoinRequest{}, err
}
if err := tx.Commit(ctx); err != nil {
return NodeJoinRequest{}, err
}
return item, nil
}
func (s *PostgresStore) GetJoinRequestForBootstrap(ctx context.Context, input GetJoinRequestBootstrapInput) (NodeJoinRequest, error) {
row := s.db.QueryRow(ctx, `
SELECT id::text, cluster_id::text, join_token_id::text, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, reviewed_by_user_id::text,
reviewed_at, approved_node_id::text, rejection_reason, created_at, updated_at, approval_payload, approval_signature
FROM node_join_requests
WHERE cluster_id = $1::uuid
AND id = $2::uuid
AND node_fingerprint = $3
AND public_key = $4
`, input.ClusterID, input.JoinRequestID, input.NodeFingerprint, input.PublicKey)
return scanJoinRequest(row)
}
func (s *PostgresStore) ListJoinRequests(ctx context.Context, clusterID string) ([]NodeJoinRequest, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, join_token_id::text, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, reviewed_by_user_id::text,
reviewed_at, approved_node_id::text, rejection_reason, created_at, updated_at, approval_payload, approval_signature
FROM node_join_requests
WHERE cluster_id = $1::uuid
ORDER BY created_at DESC
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []NodeJoinRequest
for rows.Next() {
item, err := scanJoinRequest(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ApproveJoinRequest(ctx context.Context, input ApproveJoinRequestInput) (ApprovedJoinRequest, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return ApprovedJoinRequest{}, err
}
defer tx.Rollback(ctx)
req, err := getJoinRequestForUpdate(ctx, tx, input.ClusterID, input.JoinRequestID)
if err != nil {
return ApprovedJoinRequest{}, err
}
if req.Status != JoinRequestStatusPending {
return ApprovedJoinRequest{}, errors.New("join request is not pending")
}
now := time.Now().UTC()
nodeID := uuid.NewString()
nodeKey := input.NodeKey
if nodeKey == "" {
nodeKey = req.NodeFingerprint
}
ownershipType := input.OwnershipType
if ownershipType == "" {
ownershipType = "platform_managed"
}
if _, err := tx.Exec(ctx, `
INSERT INTO nodes (
id, owner_organization_id, node_key, name, ownership_type, registration_status, health_status,
version_state, partition_state, metadata, created_at, updated_at
) VALUES ($1::uuid, $2::uuid, $3, $4, $5, 'active', 'unknown', 'unknown', 'healthy', $6::jsonb, $7, $7)
`, nodeID, input.OwnerOrganizationID, nodeKey, req.NodeName, ownershipType, []byte(`{"created_from_join_request":true}`), now); err != nil {
return ApprovedJoinRequest{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO cluster_memberships (cluster_id, node_id, membership_status, joined_at, metadata)
VALUES ($1::uuid, $2::uuid, 'active', $3, $4::jsonb)
`, input.ClusterID, nodeID, now, []byte(`{"created_from_join_request":true}`)); err != nil {
return ApprovedJoinRequest{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO node_identities (node_id, public_key, identity_status, metadata, created_at, updated_at)
VALUES ($1::uuid, $2, 'active', $3::jsonb, $4, $4)
`, nodeID, req.PublicKey, []byte(`{"source":"join_request"}`), now); err != nil {
return ApprovedJoinRequest{}, err
}
row := tx.QueryRow(ctx, `
UPDATE node_join_requests
SET status = 'approved',
reviewed_by_user_id = $3::uuid,
reviewed_at = $4,
approved_node_id = $5::uuid,
updated_at = $4
WHERE cluster_id = $1::uuid
AND id = $2::uuid
RETURNING id::text, cluster_id::text, join_token_id::text, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, reviewed_by_user_id::text,
reviewed_at, approved_node_id::text, rejection_reason, created_at, updated_at, approval_payload, approval_signature
`, input.ClusterID, input.JoinRequestID, input.ActorUserID, now, nodeID)
updated, err := scanJoinRequest(row)
if err != nil {
return ApprovedJoinRequest{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO cluster_audit_events (cluster_id, actor_user_id, event_type, target_type, target_id, payload, created_at)
VALUES ($1::uuid, $2::uuid, 'node_join_request.approved', 'node_join_request', $3, $4::jsonb, $5)
`, input.ClusterID, input.ActorUserID, input.JoinRequestID, []byte(fmt.Sprintf(`{"node_id":%q}`, nodeID)), now); err != nil {
return ApprovedJoinRequest{}, err
}
if err := tx.Commit(ctx); err != nil {
return ApprovedJoinRequest{}, err
}
return ApprovedJoinRequest{
JoinRequest: updated,
Bootstrap: NodeBootstrap{
NodeID: nodeID,
ClusterID: input.ClusterID,
IdentityStatus: "active",
Certificate: map[string]any{
"status": "pending_issuer_integration",
},
HeartbeatEndpoint: fmt.Sprintf("/api/v1/clusters/%s/nodes/%s/heartbeats", input.ClusterID, nodeID),
},
}, nil
}
func (s *PostgresStore) SetJoinRequestApprovalAuthority(ctx context.Context, clusterID, joinRequestID string, payload json.RawMessage, signature ClusterSignature) (NodeJoinRequest, error) {
signatureJSON, err := json.Marshal(signature)
if err != nil {
return NodeJoinRequest{}, err
}
row := s.db.QueryRow(ctx, `
UPDATE node_join_requests
SET approval_payload = $3::jsonb,
approval_signature = $4::jsonb,
updated_at = NOW()
WHERE cluster_id = $1::uuid
AND id = $2::uuid
RETURNING id::text, cluster_id::text, join_token_id::text, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, reviewed_by_user_id::text,
reviewed_at, approved_node_id::text, rejection_reason, created_at, updated_at, approval_payload, approval_signature
`, clusterID, joinRequestID, []byte(payload), signatureJSON)
return scanJoinRequest(row)
}
func (s *PostgresStore) RejectJoinRequest(ctx context.Context, input RejectJoinRequestInput) (NodeJoinRequest, error) {
now := time.Now().UTC()
row := s.db.QueryRow(ctx, `
UPDATE node_join_requests
SET status = 'rejected',
reviewed_by_user_id = $3::uuid,
reviewed_at = $4,
rejection_reason = $5,
updated_at = $4
WHERE cluster_id = $1::uuid
AND id = $2::uuid
AND status = 'pending'
RETURNING id::text, cluster_id::text, join_token_id::text, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, reviewed_by_user_id::text,
reviewed_at, approved_node_id::text, rejection_reason, created_at, updated_at, approval_payload, approval_signature
`, input.ClusterID, input.JoinRequestID, input.ActorUserID, now, input.Reason)
return scanJoinRequest(row)
}
func (s *PostgresStore) AssignNodeRole(ctx context.Context, input AssignNodeRoleInput) (NodeRoleAssignment, error) {
id := uuid.NewString()
status := input.Status
if status == "" {
status = "active"
}
if status != "active" {
row := s.db.QueryRow(ctx, `
UPDATE node_role_assignments
SET status = $6,
revoked_at = CASE WHEN $6 = 'revoked' THEN NOW() ELSE revoked_at END,
policy = $7::jsonb,
assigned_by_user_id = $8::uuid
WHERE cluster_id = $2::uuid
AND node_id = $3::uuid
AND role = $5
AND COALESCE(organization_id, '00000000-0000-0000-0000-000000000000'::uuid) =
COALESCE($4::uuid, '00000000-0000-0000-0000-000000000000'::uuid)
AND status = 'active'
RETURNING id::text, cluster_id::text, node_id::text, organization_id::text, role, status,
policy, assigned_by_user_id::text, assigned_at, revoked_at
`, id, input.ClusterID, input.NodeID, input.OrganizationID, input.Role, status, []byte(input.Policy), input.ActorUserID)
return scanRoleAssignment(row)
}
row := s.db.QueryRow(ctx, `
INSERT INTO node_role_assignments (
id, cluster_id, node_id, organization_id, role, status, policy, assigned_by_user_id, assigned_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4::uuid, $5, $6, $7::jsonb, $8::uuid, NOW())
ON CONFLICT (cluster_id, node_id, role, COALESCE(organization_id, '00000000-0000-0000-0000-000000000000'::uuid))
WHERE status = 'active'
DO UPDATE SET
policy = EXCLUDED.policy,
assigned_by_user_id = EXCLUDED.assigned_by_user_id
RETURNING id::text, cluster_id::text, node_id::text, organization_id::text, role, status,
policy, assigned_by_user_id::text, assigned_at, revoked_at
`, id, input.ClusterID, input.NodeID, input.OrganizationID, input.Role, status, []byte(input.Policy), input.ActorUserID)
return scanRoleAssignment(row)
}
func (s *PostgresStore) ListNodeRoleAssignments(ctx context.Context, clusterID, nodeID string) ([]NodeRoleAssignment, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, node_id::text, organization_id::text, role, status,
policy, assigned_by_user_id::text, assigned_at, revoked_at
FROM node_role_assignments
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
ORDER BY assigned_at DESC
`, clusterID, nodeID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []NodeRoleAssignment
for rows.Next() {
item, err := scanRoleAssignment(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) AttachExistingNodeToCluster(ctx context.Context, input AttachExistingNodeInput) (ClusterNode, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return ClusterNode{}, err
}
defer tx.Rollback(ctx)
now := time.Now().UTC()
membershipMetadata, err := json.Marshal(map[string]any{
"attached_from_existing_node": true,
"attached_at": now.Format(time.RFC3339Nano),
})
if err != nil {
return ClusterNode{}, err
}
tag, err := tx.Exec(ctx, `
WITH eligible_node AS (
SELECT id
FROM nodes
WHERE id = $2::uuid
AND registration_status = 'active'
)
INSERT INTO cluster_memberships (cluster_id, node_id, membership_status, joined_at, metadata)
SELECT $1::uuid, id, 'active', $3, $4::jsonb
FROM eligible_node
ON CONFLICT (cluster_id, node_id) DO UPDATE SET
membership_status = 'active',
metadata = cluster_memberships.metadata || EXCLUDED.metadata
WHERE cluster_memberships.membership_status <> 'revoked'
`, input.ClusterID, input.NodeID, now, membershipMetadata)
if err != nil {
return ClusterNode{}, err
}
if tag.RowsAffected() != 1 {
return ClusterNode{}, pgx.ErrNoRows
}
for _, role := range input.Roles {
_, err := tx.Exec(ctx, `
INSERT INTO node_role_assignments (
id, cluster_id, node_id, role, status, policy, assigned_by_user_id, assigned_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, 'active', '{}'::jsonb, $5::uuid, $6)
ON CONFLICT DO NOTHING
`, uuid.NewString(), input.ClusterID, input.NodeID, role, input.ActorUserID, now)
if err != nil {
return ClusterNode{}, err
}
}
auditPayload, err := json.Marshal(map[string]any{
"attached_from_existing_node": true,
"roles": input.Roles,
})
if err != nil {
return ClusterNode{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO cluster_audit_events (cluster_id, actor_user_id, event_type, target_type, target_id, payload, created_at)
VALUES ($1::uuid, $2::uuid, 'cluster_membership.attached_existing_node', 'node', $3, $4::jsonb, $5)
`, input.ClusterID, input.ActorUserID, input.NodeID, auditPayload, now); err != nil {
return ClusterNode{}, err
}
row := tx.QueryRow(ctx, `
SELECT n.id::text, n.owner_organization_id::text, n.node_key, n.name, n.ownership_type,
n.registration_status, n.health_status, n.version_state, n.partition_state,
n.reported_version, n.last_seen_at, cm.membership_status, cm.metadata,
ng.id::text, ng.name,
n.created_at, n.updated_at
FROM cluster_memberships cm
JOIN nodes n ON n.id = cm.node_id
LEFT JOIN cluster_node_group_memberships ngm ON ngm.cluster_id = cm.cluster_id AND ngm.node_id = cm.node_id
LEFT JOIN cluster_node_groups ng ON ng.cluster_id = ngm.cluster_id AND ng.id = ngm.group_id
WHERE cm.cluster_id = $1::uuid
AND cm.node_id = $2::uuid
`, input.ClusterID, input.NodeID)
item, err := scanClusterNode(row)
if err != nil {
return ClusterNode{}, err
}
if err := tx.Commit(ctx); err != nil {
return ClusterNode{}, err
}
return item, nil
}
func (s *PostgresStore) AssignNodeToGroup(ctx context.Context, input AssignNodeGroupInput) (ClusterNode, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return ClusterNode{}, err
}
defer tx.Rollback(ctx)
now := time.Now().UTC()
if input.GroupID == nil {
tag, err := tx.Exec(ctx, `
DELETE FROM cluster_node_group_memberships
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
AND EXISTS (
SELECT 1
FROM cluster_memberships cm
WHERE cm.cluster_id = $1::uuid
AND cm.node_id = $2::uuid
AND cm.membership_status <> 'revoked'
)
`, input.ClusterID, input.NodeID)
if err != nil {
return ClusterNode{}, err
}
if tag.RowsAffected() == 0 {
var exists bool
if err := tx.QueryRow(ctx, `
SELECT EXISTS (
SELECT 1
FROM cluster_memberships
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
AND membership_status <> 'revoked'
)
`, input.ClusterID, input.NodeID).Scan(&exists); err != nil {
return ClusterNode{}, err
}
if !exists {
return ClusterNode{}, pgx.ErrNoRows
}
}
} else {
tag, err := tx.Exec(ctx, `
INSERT INTO cluster_node_group_memberships (
cluster_id, node_id, group_id, assigned_by_user_id, assigned_at, metadata
)
SELECT $1::uuid, $2::uuid, $3::uuid, $4::uuid, $5, '{}'::jsonb
WHERE EXISTS (
SELECT 1
FROM cluster_memberships cm
WHERE cm.cluster_id = $1::uuid
AND cm.node_id = $2::uuid
AND cm.membership_status <> 'revoked'
)
AND EXISTS (
SELECT 1
FROM cluster_node_groups ng
WHERE ng.cluster_id = $1::uuid
AND ng.id = $3::uuid
)
ON CONFLICT (cluster_id, node_id) DO UPDATE SET
group_id = EXCLUDED.group_id,
assigned_by_user_id = EXCLUDED.assigned_by_user_id,
assigned_at = EXCLUDED.assigned_at
`, input.ClusterID, input.NodeID, input.GroupID, input.ActorUserID, now)
if err != nil {
return ClusterNode{}, err
}
if tag.RowsAffected() != 1 {
return ClusterNode{}, pgx.ErrNoRows
}
}
auditPayload := json.RawMessage(`{"group_id":null}`)
if input.GroupID != nil {
auditPayload = json.RawMessage(fmt.Sprintf(`{"group_id":%q}`, *input.GroupID))
}
if _, err := tx.Exec(ctx, `
INSERT INTO cluster_audit_events (cluster_id, actor_user_id, event_type, target_type, target_id, payload, created_at)
VALUES ($1::uuid, $2::uuid, 'cluster_node_group.assigned', 'node', $3, $4::jsonb, $5)
`, input.ClusterID, input.ActorUserID, input.NodeID, auditPayload, now); err != nil {
return ClusterNode{}, err
}
row := tx.QueryRow(ctx, `
SELECT n.id::text, n.owner_organization_id::text, n.node_key, n.name, n.ownership_type,
n.registration_status, n.health_status, n.version_state, n.partition_state,
n.reported_version, n.last_seen_at, cm.membership_status, cm.metadata,
ng.id::text, ng.name,
n.created_at, n.updated_at
FROM cluster_memberships cm
JOIN nodes n ON n.id = cm.node_id
LEFT JOIN cluster_node_group_memberships ngm ON ngm.cluster_id = cm.cluster_id AND ngm.node_id = cm.node_id
LEFT JOIN cluster_node_groups ng ON ng.cluster_id = ngm.cluster_id AND ng.id = ngm.group_id
WHERE cm.cluster_id = $1::uuid
AND cm.node_id = $2::uuid
`, input.ClusterID, input.NodeID)
item, err := scanClusterNode(row)
if err != nil {
return ClusterNode{}, err
}
if err := tx.Commit(ctx); err != nil {
return ClusterNode{}, err
}
return item, nil
}
func (s *PostgresStore) RecordHeartbeat(ctx context.Context, input RecordHeartbeatInput) (NodeHeartbeat, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return NodeHeartbeat{}, err
}
defer tx.Rollback(ctx)
id := uuid.NewString()
now := time.Now().UTC()
row := tx.QueryRow(ctx, `
INSERT INTO node_heartbeats (
id, cluster_id, node_id, health_status, reported_version, capabilities, service_states, metadata, observed_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, $9)
RETURNING id::text, cluster_id::text, node_id::text, health_status, reported_version,
capabilities, service_states, metadata, observed_at
`, id, input.ClusterID, input.NodeID, input.HealthStatus, input.ReportedVersion, []byte(input.Capabilities), []byte(input.ServiceStates), []byte(input.Metadata), now)
heartbeat, err := scanHeartbeat(row)
if err != nil {
return NodeHeartbeat{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO node_latest_heartbeats (
cluster_id, node_id, heartbeat_id, health_status, reported_version, capabilities, service_states, metadata, observed_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, $9)
ON CONFLICT (cluster_id, node_id) DO UPDATE SET
heartbeat_id = EXCLUDED.heartbeat_id,
health_status = EXCLUDED.health_status,
reported_version = EXCLUDED.reported_version,
capabilities = EXCLUDED.capabilities,
service_states = EXCLUDED.service_states,
metadata = EXCLUDED.metadata,
observed_at = EXCLUDED.observed_at
`, input.ClusterID, input.NodeID, heartbeat.ID, heartbeat.HealthStatus, heartbeat.ReportedVersion, []byte(heartbeat.Capabilities), []byte(heartbeat.ServiceStates), []byte(heartbeat.Metadata), heartbeat.ObservedAt); err != nil {
return NodeHeartbeat{}, err
}
if _, err := tx.Exec(ctx, `
UPDATE nodes
SET health_status = $2,
reported_version = COALESCE($3, reported_version),
last_seen_at = $4,
updated_at = $4
WHERE id = $1::uuid
`, input.NodeID, input.HealthStatus, input.ReportedVersion, heartbeat.ObservedAt); err != nil {
return NodeHeartbeat{}, err
}
if _, err := tx.Exec(ctx, `
UPDATE cluster_memberships
SET last_seen_at = $3
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
`, input.ClusterID, input.NodeID, heartbeat.ObservedAt); err != nil {
return NodeHeartbeat{}, err
}
if err := tx.Commit(ctx); err != nil {
return NodeHeartbeat{}, err
}
return heartbeat, nil
}
func (s *PostgresStore) ListNodeHeartbeats(ctx context.Context, clusterID, nodeID string, limit int) ([]NodeHeartbeat, error) {
if limit <= 0 || limit > 500 {
limit = 100
}
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, node_id::text, health_status, reported_version,
capabilities, service_states, metadata, observed_at
FROM node_heartbeats
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
ORDER BY observed_at DESC
LIMIT $3
`, clusterID, nodeID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var out []NodeHeartbeat
for rows.Next() {
item, err := scanHeartbeat(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) RevokeNodeIdentity(ctx context.Context, input RevokeNodeIdentityInput) error {
tx, err := s.db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
now := time.Now().UTC()
tag, err := tx.Exec(ctx, `
UPDATE node_identities
SET identity_status = 'revoked',
revoked_at = $3,
updated_at = $3,
metadata = metadata || $4::jsonb
WHERE node_id = $1::uuid
AND EXISTS (
SELECT 1 FROM cluster_memberships cm
WHERE cm.cluster_id = $2::uuid
AND cm.node_id = node_identities.node_id
)
`, input.NodeID, input.ClusterID, now, []byte(fmt.Sprintf(`{"revocation_reason":%q}`, input.Reason)))
if err != nil {
return err
}
if tag.RowsAffected() != 1 {
return pgx.ErrNoRows
}
if _, err := tx.Exec(ctx, `
UPDATE nodes
SET registration_status = 'revoked',
updated_at = $2
WHERE id = $1::uuid
`, input.NodeID, now); err != nil {
return err
}
if _, err := tx.Exec(ctx, `
INSERT INTO cluster_audit_events (cluster_id, actor_user_id, event_type, target_type, target_id, payload, created_at)
VALUES ($1::uuid, $2::uuid, 'node_identity.revoked', 'node', $3, $4::jsonb, $5)
`, input.ClusterID, input.ActorUserID, input.NodeID, []byte(fmt.Sprintf(`{"reason":%q}`, input.Reason)), now); err != nil {
return err
}
return tx.Commit(ctx)
}
func (s *PostgresStore) DisableClusterMembership(ctx context.Context, input DisableMembershipInput) error {
now := time.Now().UTC()
tag, err := s.db.Exec(ctx, `
UPDATE cluster_memberships
SET membership_status = 'disabled',
metadata = metadata || $4::jsonb
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
AND membership_status <> 'revoked'
`, input.ClusterID, input.NodeID, now, []byte(fmt.Sprintf(`{"disabled_reason":%q,"disabled_at":%q}`, input.Reason, now.Format(time.RFC3339Nano))))
if err != nil {
return err
}
if tag.RowsAffected() != 1 {
return pgx.ErrNoRows
}
return s.RecordAudit(ctx, ClusterAuditEvent{
ClusterID: &input.ClusterID,
ActorUserID: &input.ActorUserID,
EventType: "cluster_membership.disabled",
TargetType: "node",
TargetID: &input.NodeID,
Payload: json.RawMessage(fmt.Sprintf(`{"reason":%q}`, input.Reason)),
CreatedAt: now,
})
}
func (s *PostgresStore) UpsertFabricTestingFlag(ctx context.Context, input UpsertFabricTestingFlagInput) (FabricTestingFlag, error) {
if input.HistoryRetentionHours <= 0 {
input.HistoryRetentionHours = 24
}
if len(input.Metadata) == 0 {
input.Metadata = json.RawMessage(`{}`)
}
row := s.db.QueryRow(ctx, `
UPDATE fabric_testing_flags
SET enabled = $4,
telemetry_enabled = $5,
synthetic_links_enabled = $6,
history_retention_hours = $7,
metadata = $8::jsonb,
updated_by_user_id = $9::uuid,
updated_at = NOW()
WHERE scope_type = $1
AND COALESCE(scope_id, '00000000-0000-0000-0000-000000000000'::uuid) = COALESCE($2::uuid, '00000000-0000-0000-0000-000000000000'::uuid)
AND COALESCE(cluster_id, '00000000-0000-0000-0000-000000000000'::uuid) = COALESCE($3::uuid, '00000000-0000-0000-0000-000000000000'::uuid)
RETURNING id::text, scope_type, scope_id::text, cluster_id::text, enabled, telemetry_enabled,
synthetic_links_enabled, history_retention_hours, metadata, updated_by_user_id::text, updated_at
`, input.ScopeType, input.ScopeID, input.ClusterID, input.Enabled, input.TelemetryEnabled, input.SyntheticLinksEnabled, input.HistoryRetentionHours, []byte(input.Metadata), input.ActorUserID)
item, err := scanFabricTestingFlag(row)
if err == nil {
return item, nil
}
if !errors.Is(err, pgx.ErrNoRows) {
return FabricTestingFlag{}, err
}
row = s.db.QueryRow(ctx, `
INSERT INTO fabric_testing_flags (
id, scope_type, scope_id, cluster_id, enabled, telemetry_enabled, synthetic_links_enabled,
history_retention_hours, metadata, updated_by_user_id, updated_at
) VALUES ($1::uuid, $2, $3::uuid, $4::uuid, $5, $6, $7, $8, $9::jsonb, $10::uuid, NOW())
RETURNING id::text, scope_type, scope_id::text, cluster_id::text, enabled, telemetry_enabled,
synthetic_links_enabled, history_retention_hours, metadata, updated_by_user_id::text, updated_at
`, uuid.NewString(), input.ScopeType, input.ScopeID, input.ClusterID, input.Enabled, input.TelemetryEnabled, input.SyntheticLinksEnabled, input.HistoryRetentionHours, []byte(input.Metadata), input.ActorUserID)
return scanFabricTestingFlag(row)
}
func (s *PostgresStore) ListFabricTestingFlags(ctx context.Context) ([]FabricTestingFlag, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, scope_type, scope_id::text, cluster_id::text, enabled, telemetry_enabled,
synthetic_links_enabled, history_retention_hours, metadata, updated_by_user_id::text, updated_at
FROM fabric_testing_flags
ORDER BY scope_type, updated_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []FabricTestingFlag
for rows.Next() {
item, err := scanFabricTestingFlag(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) GetEffectiveNodeTestingFlags(ctx context.Context, clusterID, nodeID string) (EffectiveNodeTestingFlags, error) {
rows, err := s.db.Query(ctx, `
WITH node_scope AS (
SELECT n.owner_organization_id
FROM nodes n
JOIN cluster_memberships cm ON cm.node_id = n.id AND cm.cluster_id = $1::uuid
WHERE n.id = $2::uuid
)
SELECT f.scope_type, f.enabled, f.telemetry_enabled, f.synthetic_links_enabled,
f.history_retention_hours, f.metadata
FROM fabric_testing_flags f
LEFT JOIN node_scope ns ON TRUE
WHERE (
f.scope_type = 'platform'
OR (f.scope_type = 'organization' AND f.scope_id = ns.owner_organization_id)
OR (f.scope_type = 'node' AND f.scope_id = $2::uuid)
)
AND (f.cluster_id IS NULL OR f.cluster_id = $1::uuid)
ORDER BY CASE f.scope_type
WHEN 'platform' THEN 1
WHEN 'organization' THEN 2
WHEN 'node' THEN 3
ELSE 4
END
`, clusterID, nodeID)
if err != nil {
return EffectiveNodeTestingFlags{}, err
}
defer rows.Close()
out := EffectiveNodeTestingFlags{HistoryRetentionHours: 24, Metadata: json.RawMessage(`{}`)}
for rows.Next() {
var scope string
var metadata json.RawMessage
var retention int
var enabled, telemetry, links bool
if err := rows.Scan(&scope, &enabled, &telemetry, &links, &retention, &metadata); err != nil {
return EffectiveNodeTestingFlags{}, err
}
if enabled {
out.Enabled = true
}
if telemetry {
out.TelemetryEnabled = true
}
if links {
out.SyntheticLinksEnabled = true
}
if retention > 0 {
out.HistoryRetentionHours = retention
}
out.AppliedScopes = append(out.AppliedScopes, scope)
if len(metadata) > 0 && string(metadata) != "{}" {
out.Metadata = metadata
}
}
return out, rows.Err()
}
func (s *PostgresStore) RecordNodeTelemetry(ctx context.Context, input RecordNodeTelemetryInput) (NodeTelemetryObservation, error) {
if input.ObservedAt.IsZero() {
input.ObservedAt = time.Now().UTC()
}
if len(input.Payload) == 0 {
input.Payload = json.RawMessage(`{}`)
}
row := s.db.QueryRow(ctx, `
INSERT INTO node_telemetry_observations (
id, cluster_id, node_id, cpu_percent, memory_used_bytes, memory_total_bytes,
disk_used_bytes, disk_total_bytes, network_rx_bytes, network_tx_bytes,
process_count, payload, observed_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13)
RETURNING id::text, cluster_id::text, node_id::text, cpu_percent, memory_used_bytes,
memory_total_bytes, disk_used_bytes, disk_total_bytes, network_rx_bytes,
network_tx_bytes, process_count, payload, observed_at
`, uuid.NewString(), input.ClusterID, input.NodeID, input.CPUPercent, input.MemoryUsedBytes, input.MemoryTotalBytes, input.DiskUsedBytes, input.DiskTotalBytes, input.NetworkRxBytes, input.NetworkTxBytes, input.ProcessCount, []byte(input.Payload), input.ObservedAt)
return scanNodeTelemetry(row)
}
func (s *PostgresStore) ListNodeTelemetry(ctx context.Context, clusterID, nodeID string, limit int) ([]NodeTelemetryObservation, error) {
if limit <= 0 || limit > 1000 {
limit = 240
}
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, node_id::text, cpu_percent, memory_used_bytes,
memory_total_bytes, disk_used_bytes, disk_total_bytes, network_rx_bytes,
network_tx_bytes, process_count, payload, observed_at
FROM node_telemetry_observations
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
ORDER BY observed_at DESC
LIMIT $3
`, clusterID, nodeID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var out []NodeTelemetryObservation
for rows.Next() {
item, err := scanNodeTelemetry(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) SetDesiredWorkload(ctx context.Context, input SetDesiredWorkloadInput) (NodeWorkloadDesiredState, error) {
row := s.db.QueryRow(ctx, `
INSERT INTO node_workload_desired_states (
cluster_id, node_id, service_type, desired_state, version, runtime_mode,
artifact_ref, config, environment, updated_by_user_id, updated_at
) VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8::jsonb, $9::jsonb, $10::uuid, NOW())
ON CONFLICT (cluster_id, node_id, service_type) DO UPDATE SET
desired_state = EXCLUDED.desired_state,
version = EXCLUDED.version,
runtime_mode = EXCLUDED.runtime_mode,
artifact_ref = EXCLUDED.artifact_ref,
config = EXCLUDED.config,
environment = EXCLUDED.environment,
updated_by_user_id = EXCLUDED.updated_by_user_id,
updated_at = EXCLUDED.updated_at
RETURNING cluster_id::text, node_id::text, service_type, desired_state, version, runtime_mode,
artifact_ref, config, environment, updated_by_user_id::text, updated_at
`, input.ClusterID, input.NodeID, input.ServiceType, input.DesiredState, input.Version, input.RuntimeMode, input.ArtifactRef, []byte(input.Config), []byte(input.Environment), input.ActorUserID)
return scanDesiredWorkload(row)
}
func (s *PostgresStore) ListDesiredWorkloads(ctx context.Context, clusterID, nodeID string) ([]NodeWorkloadDesiredState, error) {
rows, err := s.db.Query(ctx, `
SELECT cluster_id::text, node_id::text, service_type, desired_state, version, runtime_mode,
artifact_ref, config, environment, updated_by_user_id::text, updated_at
FROM node_workload_desired_states
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
ORDER BY service_type
`, clusterID, nodeID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []NodeWorkloadDesiredState
for rows.Next() {
item, err := scanDesiredWorkload(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ReportWorkloadStatus(ctx context.Context, input ReportWorkloadStatusInput) (NodeWorkloadStatus, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return NodeWorkloadStatus{}, err
}
defer tx.Rollback(ctx)
id := uuid.NewString()
row := tx.QueryRow(ctx, `
INSERT INTO node_workload_status_reports (
id, cluster_id, node_id, service_type, reported_state, runtime_mode, version, status_payload, observed_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7, $8::jsonb, NOW())
RETURNING id::text, cluster_id::text, node_id::text, service_type, reported_state,
runtime_mode, version, status_payload, observed_at
`, id, input.ClusterID, input.NodeID, input.ServiceType, input.ReportedState, input.RuntimeMode, input.Version, []byte(input.StatusPayload))
status, err := scanWorkloadStatus(row)
if err != nil {
return NodeWorkloadStatus{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO node_workload_latest_statuses (
cluster_id, node_id, service_type, status_report_id, reported_state, runtime_mode, version, status_payload, observed_at
) VALUES ($1::uuid, $2::uuid, $3, $4::uuid, $5, $6, $7, $8::jsonb, $9)
ON CONFLICT (cluster_id, node_id, service_type) DO UPDATE SET
status_report_id = EXCLUDED.status_report_id,
reported_state = EXCLUDED.reported_state,
runtime_mode = EXCLUDED.runtime_mode,
version = EXCLUDED.version,
status_payload = EXCLUDED.status_payload,
observed_at = EXCLUDED.observed_at
`, status.ClusterID, status.NodeID, status.ServiceType, status.ID, status.ReportedState, status.RuntimeMode, status.Version, []byte(status.StatusPayload), status.ObservedAt); err != nil {
return NodeWorkloadStatus{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO node_services (node_id, service_type, enabled, desired_state, reported_state, last_reported_at, metadata, updated_at)
VALUES ($1::uuid, $2, FALSE, 'disabled', $3, $4, $5::jsonb, $4)
ON CONFLICT (node_id, service_type) DO UPDATE SET
reported_state = EXCLUDED.reported_state,
last_reported_at = EXCLUDED.last_reported_at,
metadata = EXCLUDED.metadata,
updated_at = EXCLUDED.updated_at
`, status.NodeID, status.ServiceType, status.ReportedState, status.ObservedAt, []byte(status.StatusPayload)); err != nil {
return NodeWorkloadStatus{}, err
}
if err := tx.Commit(ctx); err != nil {
return NodeWorkloadStatus{}, err
}
return status, nil
}
func (s *PostgresStore) ListLatestWorkloadStatuses(ctx context.Context, clusterID, nodeID string) ([]NodeWorkloadStatus, error) {
rows, err := s.db.Query(ctx, `
SELECT COALESCE(status_report_id::text, '00000000-0000-0000-0000-000000000000'), cluster_id::text, node_id::text,
service_type, reported_state, runtime_mode, version, status_payload, observed_at
FROM node_workload_latest_statuses
WHERE cluster_id = $1::uuid
AND node_id = $2::uuid
ORDER BY service_type
`, clusterID, nodeID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []NodeWorkloadStatus
for rows.Next() {
item, err := scanWorkloadStatus(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ReportMeshLink(ctx context.Context, input ReportMeshLinkInput) (MeshLinkObservation, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return MeshLinkObservation{}, err
}
defer tx.Rollback(ctx)
id := uuid.NewString()
row := tx.QueryRow(ctx, `
INSERT INTO mesh_link_observations (
id, cluster_id, source_node_id, target_node_id, link_status, latency_ms, quality_score, metadata, observed_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4::uuid, $5, $6, $7, $8::jsonb, NOW())
RETURNING id::text, cluster_id::text, source_node_id::text, target_node_id::text, link_status,
latency_ms, quality_score, metadata, observed_at
`, id, input.ClusterID, input.SourceNodeID, input.TargetNodeID, input.LinkStatus, input.LatencyMs, input.QualityScore, []byte(input.Metadata))
observation, err := scanMeshLink(row)
if err != nil {
return MeshLinkObservation{}, err
}
observationKey := meshLatestObservationKey(observation.Metadata)
if _, err := tx.Exec(ctx, `
INSERT INTO mesh_latest_links (
cluster_id, source_node_id, target_node_id, observation_id, link_status, latency_ms, quality_score, metadata, observed_at, observation_key
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4::uuid, $5, $6, $7, $8::jsonb, $9, $10)
ON CONFLICT (cluster_id, source_node_id, target_node_id, observation_key) DO UPDATE SET
observation_id = EXCLUDED.observation_id,
link_status = EXCLUDED.link_status,
latency_ms = EXCLUDED.latency_ms,
quality_score = EXCLUDED.quality_score,
metadata = EXCLUDED.metadata,
observed_at = EXCLUDED.observed_at
`, observation.ClusterID, observation.SourceNodeID, observation.TargetNodeID, observation.ID, observation.LinkStatus, observation.LatencyMs, observation.QualityScore, []byte(observation.Metadata), observation.ObservedAt, observationKey); err != nil {
return MeshLinkObservation{}, err
}
if err := tx.Commit(ctx); err != nil {
return MeshLinkObservation{}, err
}
return observation, nil
}
func meshLatestObservationKey(metadata json.RawMessage) string {
var values map[string]any
if err := json.Unmarshal(metadata, &values); err != nil {
return "default"
}
observationType := meshMetadataString(values, "observation_type")
if observationType == "" {
observationType = "default"
}
switch observationType {
case "synthetic_route_health":
if routeID := meshMetadataString(values, "route_id"); routeID != "" {
return observationType + ":" + routeID
}
case "peer_connection_manager":
transportMode := meshMetadataString(values, "transport_mode")
relayNodeID := meshMetadataString(values, "relay_node_id")
if transportMode != "" || relayNodeID != "" {
return observationType + ":" + transportMode + ":" + relayNodeID
}
}
return observationType
}
func meshMetadataString(values map[string]any, key string) string {
value, ok := values[key].(string)
if !ok {
return ""
}
return value
}
func (s *PostgresStore) ListMeshLinks(ctx context.Context, clusterID string) ([]MeshLinkObservation, error) {
rows, err := s.db.Query(ctx, `
SELECT COALESCE(observation_id::text, '00000000-0000-0000-0000-000000000000'), cluster_id::text,
source_node_id::text, target_node_id::text, link_status, latency_ms, quality_score, metadata, observed_at
FROM mesh_latest_links
WHERE cluster_id = $1::uuid
ORDER BY observed_at DESC
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []MeshLinkObservation
for rows.Next() {
item, err := scanMeshLink(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) CreateRouteIntent(ctx context.Context, input CreateRouteIntentInput) (MeshRouteIntent, error) {
id := uuid.NewString()
row := s.db.QueryRow(ctx, `
INSERT INTO mesh_route_intents (
id, cluster_id, source_selector, destination_selector, service_class,
priority, status, policy, created_by_user_id, created_at, updated_at
) VALUES ($1::uuid, $2::uuid, $3::jsonb, $4::jsonb, $5, $6, 'active', $7::jsonb, $8::uuid, NOW(), NOW())
RETURNING id::text, cluster_id::text, source_selector, destination_selector, service_class,
priority, status, policy, created_by_user_id::text, created_at, updated_at
`, id, input.ClusterID, []byte(input.SourceSelector), []byte(input.DestinationSelector), input.ServiceClass, input.Priority, []byte(input.Policy), input.ActorUserID)
return scanRouteIntent(row)
}
func (s *PostgresStore) ListRouteIntents(ctx context.Context, clusterID string) ([]MeshRouteIntent, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, source_selector, destination_selector, service_class,
priority, status, policy, created_by_user_id::text, created_at, updated_at
FROM mesh_route_intents
WHERE cluster_id = $1::uuid
ORDER BY priority ASC, created_at DESC
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []MeshRouteIntent
for rows.Next() {
item, err := scanRouteIntent(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ListQoSPolicies(ctx context.Context, clusterID string) ([]MeshQoSPolicy, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, service_class, priority, reliability_mode,
drop_policy, bandwidth_policy, metadata, created_at, updated_at
FROM mesh_qos_policies
WHERE cluster_id = $1::uuid
ORDER BY priority ASC
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []MeshQoSPolicy
for rows.Next() {
item, err := scanQoSPolicy(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ListFabricEntryPoints(ctx context.Context, clusterID string) ([]FabricEntryPoint, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, name, status, endpoint_type, public_endpoint,
policy, metadata, created_by_user_id::text, created_at, updated_at
FROM fabric_entry_points
WHERE cluster_id = $1::uuid
ORDER BY name
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []FabricEntryPoint
for rows.Next() {
item, err := scanFabricEntryPoint(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
if out == nil {
out = []FabricEntryPoint{}
}
return out, rows.Err()
}
func (s *PostgresStore) CreateFabricEntryPoint(ctx context.Context, input CreateFabricEntryPointInput) (FabricEntryPoint, error) {
id := uuid.NewString()
row := s.db.QueryRow(ctx, `
INSERT INTO fabric_entry_points (
id, cluster_id, name, status, endpoint_type, public_endpoint,
policy, metadata, created_by_user_id, created_at, updated_at
) VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::uuid, NOW(), NOW())
RETURNING id::text, cluster_id::text, name, status, endpoint_type, public_endpoint,
policy, metadata, created_by_user_id::text, created_at, updated_at
`, id, input.ClusterID, input.Name, input.Status, input.EndpointType, input.PublicEndpoint, []byte(input.Policy), []byte(input.Metadata), input.ActorUserID)
return scanFabricEntryPoint(row)
}
func (s *PostgresStore) SetFabricEntryPointNode(ctx context.Context, input SetFabricEntryPointNodeInput) (FabricEntryPointNode, error) {
row := s.db.QueryRow(ctx, `
WITH endpoint_ok AS (
SELECT id
FROM fabric_entry_points
WHERE id = $2::uuid
AND cluster_id = $1::uuid
), membership_ok AS (
SELECT node_id
FROM cluster_memberships
WHERE cluster_id = $1::uuid
AND node_id = $3::uuid
AND membership_status = 'active'
)
INSERT INTO fabric_entry_point_nodes (
entry_point_id, cluster_id, node_id, status, priority, metadata, added_by_user_id, added_at
)
SELECT endpoint_ok.id, $1::uuid, membership_ok.node_id, $4, $5, $6::jsonb, $7::uuid, NOW()
FROM endpoint_ok
CROSS JOIN membership_ok
ON CONFLICT (entry_point_id, node_id) DO UPDATE SET
status = EXCLUDED.status,
priority = EXCLUDED.priority,
metadata = EXCLUDED.metadata
RETURNING entry_point_id::text, cluster_id::text, node_id::text, status, priority,
metadata, added_by_user_id::text, added_at
`, input.ClusterID, input.EntryPointID, input.NodeID, input.Status, input.Priority, []byte(input.Metadata), input.ActorUserID)
return scanFabricEntryPointNode(row)
}
func (s *PostgresStore) ListFabricEntryPointNodes(ctx context.Context, clusterID, entryPointID string) ([]FabricEntryPointNode, error) {
rows, err := s.db.Query(ctx, `
SELECT entry_point_id::text, cluster_id::text, node_id::text, status, priority,
metadata, added_by_user_id::text, added_at
FROM fabric_entry_point_nodes
WHERE cluster_id = $1::uuid
AND entry_point_id = $2::uuid
ORDER BY priority, added_at
`, clusterID, entryPointID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []FabricEntryPointNode
for rows.Next() {
item, err := scanFabricEntryPointNode(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
if out == nil {
out = []FabricEntryPointNode{}
}
return out, rows.Err()
}
func (s *PostgresStore) ListFabricEgressPools(ctx context.Context, clusterID string) ([]FabricEgressPool, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, name, status, description, route_scope,
policy, metadata, created_by_user_id::text, created_at, updated_at
FROM fabric_egress_pools
WHERE cluster_id = $1::uuid
ORDER BY name
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []FabricEgressPool
for rows.Next() {
item, err := scanFabricEgressPool(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
if out == nil {
out = []FabricEgressPool{}
}
return out, rows.Err()
}
func (s *PostgresStore) CreateFabricEgressPool(ctx context.Context, input CreateFabricEgressPoolInput) (FabricEgressPool, error) {
id := uuid.NewString()
row := s.db.QueryRow(ctx, `
INSERT INTO fabric_egress_pools (
id, cluster_id, name, status, description, route_scope,
policy, metadata, created_by_user_id, created_at, updated_at
) VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, $9::uuid, NOW(), NOW())
RETURNING id::text, cluster_id::text, name, status, description, route_scope,
policy, metadata, created_by_user_id::text, created_at, updated_at
`, id, input.ClusterID, input.Name, input.Status, input.Description, []byte(input.RouteScope), []byte(input.Policy), []byte(input.Metadata), input.ActorUserID)
return scanFabricEgressPool(row)
}
func (s *PostgresStore) SetFabricEgressPoolNode(ctx context.Context, input SetFabricEgressPoolNodeInput) (FabricEgressPoolNode, error) {
row := s.db.QueryRow(ctx, `
WITH pool_ok AS (
SELECT id
FROM fabric_egress_pools
WHERE id = $2::uuid
AND cluster_id = $1::uuid
), membership_ok AS (
SELECT node_id
FROM cluster_memberships
WHERE cluster_id = $1::uuid
AND node_id = $3::uuid
AND membership_status = 'active'
)
INSERT INTO fabric_egress_pool_nodes (
egress_pool_id, cluster_id, node_id, status, priority, metadata, added_by_user_id, added_at
)
SELECT pool_ok.id, $1::uuid, membership_ok.node_id, $4, $5, $6::jsonb, $7::uuid, NOW()
FROM pool_ok
CROSS JOIN membership_ok
ON CONFLICT (egress_pool_id, node_id) DO UPDATE SET
status = EXCLUDED.status,
priority = EXCLUDED.priority,
metadata = EXCLUDED.metadata
RETURNING egress_pool_id::text, cluster_id::text, node_id::text, status, priority,
metadata, added_by_user_id::text, added_at
`, input.ClusterID, input.EgressPoolID, input.NodeID, input.Status, input.Priority, []byte(input.Metadata), input.ActorUserID)
return scanFabricEgressPoolNode(row)
}
func (s *PostgresStore) ListFabricEgressPoolNodes(ctx context.Context, clusterID, egressPoolID string) ([]FabricEgressPoolNode, error) {
rows, err := s.db.Query(ctx, `
SELECT egress_pool_id::text, cluster_id::text, node_id::text, status, priority,
metadata, added_by_user_id::text, added_at
FROM fabric_egress_pool_nodes
WHERE cluster_id = $1::uuid
AND egress_pool_id = $2::uuid
ORDER BY priority, added_at
`, clusterID, egressPoolID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []FabricEgressPoolNode
for rows.Next() {
item, err := scanFabricEgressPoolNode(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
if out == nil {
out = []FabricEgressPoolNode{}
}
return out, rows.Err()
}
func (s *PostgresStore) GetClusterAuthorityState(ctx context.Context, clusterID string) (ClusterAuthorityState, error) {
row := s.db.QueryRow(ctx, `
SELECT cluster_id::text, authority_state, mutation_mode, term, notes, updated_by_user_id::text, updated_at
FROM cluster_authority_states
WHERE cluster_id = $1::uuid
`, clusterID)
return scanAuthorityState(row)
}
func (s *PostgresStore) UpdateClusterAuthorityState(ctx context.Context, input UpdateClusterAuthorityInput) (ClusterAuthorityState, error) {
row := s.db.QueryRow(ctx, `
INSERT INTO cluster_authority_states (
cluster_id, authority_state, mutation_mode, term, notes, updated_by_user_id, updated_at
) VALUES ($1::uuid, $2, $3, 1, $4, $5::uuid, NOW())
ON CONFLICT (cluster_id) DO UPDATE SET
authority_state = EXCLUDED.authority_state,
mutation_mode = EXCLUDED.mutation_mode,
term = cluster_authority_states.term + 1,
notes = EXCLUDED.notes,
updated_by_user_id = EXCLUDED.updated_by_user_id,
updated_at = EXCLUDED.updated_at
RETURNING cluster_id::text, authority_state, mutation_mode, term, notes, updated_by_user_id::text, updated_at
`, input.ClusterID, input.AuthorityState, input.MutationMode, input.Notes, input.ActorUserID)
return scanAuthorityState(row)
}
func (s *PostgresStore) ListClusterAdminSummaries(ctx context.Context) ([]ClusterAdminSummary, error) {
rows, err := s.db.Query(ctx, `
SELECT cluster_id::text, slug, name, status, region, authority_state, mutation_mode,
cluster_key_algorithm, cluster_key_fingerprint,
node_count, healthy_node_count, pending_join_count, active_role_assignment_count, last_node_seen_at
FROM cluster_admin_summaries
ORDER BY name
`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []ClusterAdminSummary
for rows.Next() {
var item ClusterAdminSummary
if err := rows.Scan(
&item.ClusterID,
&item.Slug,
&item.Name,
&item.Status,
&item.Region,
&item.AuthorityState,
&item.MutationMode,
&item.ClusterKeyAlgorithm,
&item.ClusterKeyFingerprint,
&item.NodeCount,
&item.HealthyNodeCount,
&item.PendingJoinCount,
&item.ActiveRoleAssignmentCount,
&item.LastNodeSeenAt,
); err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) CreateVPNConnection(ctx context.Context, input CreateVPNConnectionInput) (VPNConnection, error) {
id := uuid.NewString()
status := VPNConnectionStatusDisabled
if input.DesiredState == VPNConnectionDesiredEnabled {
status = VPNConnectionStatusEnabled
}
row := s.db.QueryRow(ctx, `
INSERT INTO vpn_connections (
id, cluster_id, organization_id, name, target_endpoint, protocol_family,
credential_ref, mode, desired_state, allowed_node_policy, routing_usage,
route_policy, qos_policy, placement_policy, status, metadata,
created_by_user_id, updated_by_user_id, created_at, updated_at
) VALUES (
$1::uuid, $2::uuid, $3::uuid, $4, $5::jsonb, $6,
$7, $8, $9, $10::jsonb, $11::jsonb,
$12::jsonb, $13::jsonb, $14::jsonb, $15, $16::jsonb,
$17::uuid, $17::uuid, NOW(), NOW()
)
RETURNING id::text, cluster_id::text, organization_id::text, name, target_endpoint,
protocol_family, credential_ref, mode, desired_state, allowed_node_policy,
routing_usage, route_policy, qos_policy, placement_policy, status, metadata,
created_by_user_id::text, updated_by_user_id::text, created_at, updated_at
`, id, input.ClusterID, input.OrganizationID, input.Name, []byte(input.TargetEndpoint), input.ProtocolFamily,
input.CredentialRef, input.Mode, input.DesiredState, []byte(input.AllowedNodePolicy), []byte(input.RoutingUsage),
[]byte(input.RoutePolicy), []byte(input.QoSPolicy), []byte(input.PlacementPolicy), status, []byte(input.Metadata),
input.ActorUserID)
return scanVPNConnection(row)
}
func (s *PostgresStore) ListVPNConnections(ctx context.Context, clusterID string) ([]VPNConnection, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, organization_id::text, name, target_endpoint,
protocol_family, credential_ref, mode, desired_state, allowed_node_policy,
routing_usage, route_policy, qos_policy, placement_policy, status, metadata,
created_by_user_id::text, updated_by_user_id::text, created_at, updated_at
FROM vpn_connections
WHERE cluster_id = $1::uuid
ORDER BY created_at DESC
`, clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
return scanVPNConnections(rows)
}
func (s *PostgresStore) GetVPNConnection(ctx context.Context, clusterID, vpnConnectionID string) (VPNConnection, error) {
row := s.db.QueryRow(ctx, `
SELECT id::text, cluster_id::text, organization_id::text, name, target_endpoint,
protocol_family, credential_ref, mode, desired_state, allowed_node_policy,
routing_usage, route_policy, qos_policy, placement_policy, status, metadata,
created_by_user_id::text, updated_by_user_id::text, created_at, updated_at
FROM vpn_connections
WHERE cluster_id = $1::uuid
AND id = $2::uuid
`, clusterID, vpnConnectionID)
return scanVPNConnection(row)
}
func (s *PostgresStore) UpdateVPNConnectionDesiredState(ctx context.Context, input UpdateVPNConnectionDesiredStateInput) (VPNConnection, error) {
status := VPNConnectionStatusDisabled
if input.DesiredState == VPNConnectionDesiredEnabled {
status = VPNConnectionStatusEnabled
}
row := s.db.QueryRow(ctx, `
UPDATE vpn_connections
SET desired_state = $3,
status = $4,
updated_by_user_id = $5::uuid,
updated_at = NOW()
WHERE cluster_id = $1::uuid
AND id = $2::uuid
RETURNING id::text, cluster_id::text, organization_id::text, name, target_endpoint,
protocol_family, credential_ref, mode, desired_state, allowed_node_policy,
routing_usage, route_policy, qos_policy, placement_policy, status, metadata,
created_by_user_id::text, updated_by_user_id::text, created_at, updated_at
`, input.ClusterID, input.VPNConnectionID, input.DesiredState, status, input.ActorUserID)
return scanVPNConnection(row)
}
func (s *PostgresStore) UpsertVPNConnectionRoutePolicy(ctx context.Context, input UpsertVPNConnectionRoutePolicyInput) (VPNConnectionRoutePolicy, error) {
id := uuid.NewString()
row := s.db.QueryRow(ctx, `
INSERT INTO vpn_connection_route_policies (
id, vpn_connection_id, cluster_id, organization_id, route_type, destination,
action, service_type, priority, policy, status, created_by_user_id, created_at, updated_at
)
SELECT $1::uuid, vc.id, vc.cluster_id, vc.organization_id, $4, $5,
$6, $7, $8, $9::jsonb, $10, $11::uuid, NOW(), NOW()
FROM vpn_connections vc
WHERE vc.cluster_id = $2::uuid
AND vc.id = $3::uuid
RETURNING id::text, vpn_connection_id::text, cluster_id::text, organization_id::text,
route_type, destination, action, service_type, priority, policy, status,
created_by_user_id::text, created_at, updated_at
`, id, input.ClusterID, input.VPNConnectionID, input.RouteType, input.Destination, input.Action,
input.ServiceType, input.Priority, []byte(input.Policy), input.Status, input.ActorUserID)
return scanVPNRoutePolicy(row)
}
func (s *PostgresStore) ListVPNConnectionRoutePolicies(ctx context.Context, clusterID, vpnConnectionID string) ([]VPNConnectionRoutePolicy, error) {
rows, err := s.db.Query(ctx, `
SELECT id::text, vpn_connection_id::text, cluster_id::text, organization_id::text,
route_type, destination, action, service_type, priority, policy, status,
created_by_user_id::text, created_at, updated_at
FROM vpn_connection_route_policies
WHERE cluster_id = $1::uuid
AND vpn_connection_id = $2::uuid
ORDER BY priority ASC, created_at DESC
`, clusterID, vpnConnectionID)
if err != nil {
return nil, err
}
defer rows.Close()
return scanVPNRoutePolicies(rows)
}
func (s *PostgresStore) SetVPNConnectionAllowedNodes(ctx context.Context, input SetVPNConnectionAllowedNodesInput) ([]VPNConnectionAllowedNode, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)
var exists string
if err := tx.QueryRow(ctx, `
SELECT id::text
FROM vpn_connections
WHERE cluster_id = $1::uuid
AND id = $2::uuid
FOR UPDATE
`, input.ClusterID, input.VPNConnectionID).Scan(&exists); err != nil {
return nil, err
}
if _, err := tx.Exec(ctx, `
DELETE FROM vpn_connection_allowed_nodes
WHERE cluster_id = $1::uuid
AND vpn_connection_id = $2::uuid
`, input.ClusterID, input.VPNConnectionID); err != nil {
return nil, err
}
for _, nodeID := range input.NodeIDs {
if _, err := tx.Exec(ctx, `
INSERT INTO vpn_connection_allowed_nodes (
vpn_connection_id, cluster_id, node_id, role_preference, status,
metadata, created_by_user_id, created_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4, 'active', $5::jsonb, $6::uuid, NOW())
`, input.VPNConnectionID, input.ClusterID, nodeID, input.RolePreference, []byte(input.Metadata), input.ActorUserID); err != nil {
return nil, err
}
}
items, err := listVPNConnectionAllowedNodes(ctx, tx, input.ClusterID, input.VPNConnectionID)
if err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}
return items, nil
}
func (s *PostgresStore) ListVPNConnectionAllowedNodes(ctx context.Context, clusterID, vpnConnectionID string) ([]VPNConnectionAllowedNode, error) {
return listVPNConnectionAllowedNodes(ctx, s.db, clusterID, vpnConnectionID)
}
func (s *PostgresStore) AcquireVPNConnectionLease(ctx context.Context, input AcquireVPNConnectionLeaseInput, expiresAt time.Time, fencingToken string) (VPNConnectionLease, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return VPNConnectionLease{}, err
}
defer tx.Rollback(ctx)
if _, err := tx.Exec(ctx, `
UPDATE vpn_connection_leases
SET status = 'expired'
WHERE vpn_connection_id = $1::uuid
AND cluster_id = $2::uuid
AND status = 'active'
AND expires_at <= NOW()
`, input.VPNConnectionID, input.ClusterID); err != nil {
return VPNConnectionLease{}, err
}
existingRow := tx.QueryRow(ctx, `
SELECT id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
FROM vpn_connection_leases
WHERE vpn_connection_id = $1::uuid
AND cluster_id = $2::uuid
AND status = 'active'
AND expires_at > NOW()
FOR UPDATE
`, input.VPNConnectionID, input.ClusterID)
existing, err := scanVPNLease(existingRow)
if err == nil {
if existing.OwnerNodeID == input.OwnerNodeID {
if err := tx.Commit(ctx); err != nil {
return VPNConnectionLease{}, err
}
return existing, nil
}
return VPNConnectionLease{}, ErrVPNLeaseAlreadyActive
}
if !errors.Is(err, pgx.ErrNoRows) {
return VPNConnectionLease{}, err
}
id := uuid.NewString()
row := tx.QueryRow(ctx, `
WITH next_generation AS (
SELECT COALESCE(MAX(lease_generation), 0) + 1 AS value
FROM vpn_connection_leases
WHERE vpn_connection_id = $2::uuid
)
INSERT INTO vpn_connection_leases (
id, vpn_connection_id, cluster_id, owner_node_id, lease_generation,
fencing_token, status, acquired_at, renewed_at, expires_at, metadata
)
SELECT $1::uuid, vc.id, vc.cluster_id, $4::uuid, next_generation.value,
$5, 'active', NOW(), NOW(), $6, $7::jsonb
FROM vpn_connections vc, next_generation
WHERE vc.cluster_id = $3::uuid
AND vc.id = $2::uuid
RETURNING id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
`, id, input.VPNConnectionID, input.ClusterID, input.OwnerNodeID, fencingToken, expiresAt, []byte(input.Metadata))
item, err := scanVPNLease(row)
if err != nil {
if isUniqueViolation(err) {
return VPNConnectionLease{}, ErrVPNLeaseAlreadyActive
}
return VPNConnectionLease{}, err
}
if err := tx.Commit(ctx); err != nil {
return VPNConnectionLease{}, err
}
return item, nil
}
func (s *PostgresStore) RenewVPNConnectionLease(ctx context.Context, input RenewVPNConnectionLeaseInput, expiresAt time.Time) (VPNConnectionLease, error) {
row := s.db.QueryRow(ctx, `
UPDATE vpn_connection_leases
SET renewed_at = NOW(),
expires_at = $6
WHERE id = $1::uuid
AND vpn_connection_id = $2::uuid
AND cluster_id = $3::uuid
AND owner_node_id = $4::uuid
AND fencing_token = $5
AND status = 'active'
AND expires_at > NOW()
RETURNING id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
`, input.LeaseID, input.VPNConnectionID, input.ClusterID, input.OwnerNodeID, input.FencingToken, expiresAt)
return scanVPNLease(row)
}
func (s *PostgresStore) ReleaseVPNConnectionLease(ctx context.Context, input ReleaseVPNConnectionLeaseInput) (VPNConnectionLease, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return VPNConnectionLease{}, err
}
defer tx.Rollback(ctx)
row := tx.QueryRow(ctx, `
UPDATE vpn_connection_leases
SET status = 'released',
released_at = NOW()
WHERE id = $1::uuid
AND vpn_connection_id = $2::uuid
AND cluster_id = $3::uuid
AND owner_node_id = $4::uuid
AND fencing_token = $5
AND status = 'active'
RETURNING id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
`, input.LeaseID, input.VPNConnectionID, input.ClusterID, input.OwnerNodeID, input.FencingToken)
item, err := scanVPNLease(row)
if err == nil {
if err := tx.Commit(ctx); err != nil {
return VPNConnectionLease{}, err
}
return item, nil
}
if !errors.Is(err, pgx.ErrNoRows) {
return VPNConnectionLease{}, err
}
row = tx.QueryRow(ctx, `
SELECT id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
FROM vpn_connection_leases
WHERE id = $1::uuid
AND vpn_connection_id = $2::uuid
AND cluster_id = $3::uuid
AND owner_node_id = $4::uuid
AND fencing_token = $5
AND status = 'released'
`, input.LeaseID, input.VPNConnectionID, input.ClusterID, input.OwnerNodeID, input.FencingToken)
item, err = scanVPNLease(row)
if err != nil {
return VPNConnectionLease{}, err
}
if err := tx.Commit(ctx); err != nil {
return VPNConnectionLease{}, err
}
return item, nil
}
func (s *PostgresStore) FenceVPNConnectionLease(ctx context.Context, input FenceVPNConnectionLeaseInput) (VPNConnectionLease, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return VPNConnectionLease{}, err
}
defer tx.Rollback(ctx)
row := tx.QueryRow(ctx, `
UPDATE vpn_connection_leases
SET status = 'fenced',
fenced_at = NOW(),
metadata = metadata || $4::jsonb
WHERE id = $1::uuid
AND vpn_connection_id = $2::uuid
AND cluster_id = $3::uuid
AND status = 'active'
RETURNING id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
`, input.LeaseID, input.VPNConnectionID, input.ClusterID, []byte(fmt.Sprintf(`{"fence_reason":%q}`, input.Reason)))
item, err := scanVPNLease(row)
if err == nil {
if err := tx.Commit(ctx); err != nil {
return VPNConnectionLease{}, err
}
return item, nil
}
if !errors.Is(err, pgx.ErrNoRows) {
return VPNConnectionLease{}, err
}
row = tx.QueryRow(ctx, `
SELECT id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
FROM vpn_connection_leases
WHERE id = $1::uuid
AND vpn_connection_id = $2::uuid
AND cluster_id = $3::uuid
AND status = 'fenced'
`, input.LeaseID, input.VPNConnectionID, input.ClusterID)
item, err = scanVPNLease(row)
if err != nil {
return VPNConnectionLease{}, err
}
if err := tx.Commit(ctx); err != nil {
return VPNConnectionLease{}, err
}
return item, nil
}
func (s *PostgresStore) GetActiveVPNConnectionLease(ctx context.Context, clusterID, vpnConnectionID string) (VPNConnectionLease, error) {
row := s.db.QueryRow(ctx, `
SELECT id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
FROM vpn_connection_leases
WHERE cluster_id = $1::uuid
AND vpn_connection_id = $2::uuid
AND status = 'active'
AND expires_at > NOW()
`, clusterID, vpnConnectionID)
return scanVPNLease(row)
}
func (s *PostgresStore) CheckVPNLeaseOwnerEligibility(ctx context.Context, clusterID, vpnConnectionID, ownerNodeID string) (VPNLeaseOwnerEligibility, error) {
row := s.db.QueryRow(ctx, `
SELECT vc.id::text,
vc.cluster_id::text,
vc.organization_id::text,
$3::text AS owner_node_id,
COALESCE(cm.membership_status, '') AS membership_status,
COALESCE(n.registration_status, '') AS node_registration_status,
(
COALESCE(vc.allowed_node_policy->>'mode', 'explicit') = 'any_capable'
OR EXISTS (
SELECT 1
FROM vpn_connection_allowed_nodes van
WHERE van.vpn_connection_id = vc.id
AND van.cluster_id = vc.cluster_id
AND van.node_id = $3::uuid
AND van.status = 'active'
)
OR (
COALESCE(vc.allowed_node_policy->>'mode', 'explicit') = 'explicit'
AND COALESCE(vc.allowed_node_policy->'node_ids', '[]'::jsonb) ? $3::text
)
) AS allowed_by_policy,
EXISTS (
SELECT 1
FROM node_role_assignments nra
WHERE nra.cluster_id = vc.cluster_id
AND nra.node_id = $3::uuid
AND nra.status = 'active'
AND nra.role IN ('vpn-exit', 'vpn-connector')
AND (nra.organization_id IS NULL OR nra.organization_id = vc.organization_id)
) AS has_authorized_role
FROM vpn_connections vc
LEFT JOIN cluster_memberships cm ON cm.cluster_id = vc.cluster_id AND cm.node_id = $3::uuid
LEFT JOIN nodes n ON n.id = $3::uuid
WHERE vc.cluster_id = $1::uuid
AND vc.id = $2::uuid
`, clusterID, vpnConnectionID, ownerNodeID)
var item VPNLeaseOwnerEligibility
if err := row.Scan(
&item.VPNConnectionID,
&item.ClusterID,
&item.OrganizationID,
&item.OwnerNodeID,
&item.MembershipStatus,
&item.NodeRegistrationStatus,
&item.AllowedByPolicy,
&item.HasAuthorizedRole,
); err != nil {
return VPNLeaseOwnerEligibility{}, err
}
return item, nil
}
func (s *PostgresStore) ExpireStaleVPNConnectionLeases(ctx context.Context, clusterID string, now time.Time) ([]VPNConnectionLease, error) {
rows, err := s.db.Query(ctx, `
UPDATE vpn_connection_leases
SET status = 'expired'
WHERE cluster_id = $1::uuid
AND status = 'active'
AND expires_at <= $2
RETURNING id::text, vpn_connection_id::text, cluster_id::text, owner_node_id::text,
lease_generation, fencing_token, status, acquired_at, renewed_at, expires_at,
released_at, fenced_at, metadata
`, clusterID, now)
if err != nil {
return nil, err
}
defer rows.Close()
var out []VPNConnectionLease
for rows.Next() {
item, err := scanVPNLease(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ListNodeVPNAssignments(ctx context.Context, clusterID, nodeID string) ([]NodeVPNAssignment, error) {
rows, err := s.db.Query(ctx, `
WITH active_node AS (
SELECT 1
FROM cluster_memberships cm
JOIN nodes n ON n.id = cm.node_id
WHERE cm.cluster_id = $1::uuid
AND cm.node_id = $2::uuid
AND cm.membership_status = 'active'
AND n.registration_status = 'active'
),
visible AS (
SELECT vc.*,
(
COALESCE(vc.allowed_node_policy->>'mode', 'explicit') = 'any_capable'
OR EXISTS (
SELECT 1
FROM vpn_connection_allowed_nodes van
WHERE van.vpn_connection_id = vc.id
AND van.cluster_id = vc.cluster_id
AND van.node_id = $2::uuid
AND van.status = 'active'
)
OR (
COALESCE(vc.allowed_node_policy->>'mode', 'explicit') = 'explicit'
AND COALESCE(vc.allowed_node_policy->'node_ids', '[]'::jsonb) ? $2::text
)
) AS allowed_by_policy,
EXISTS (
SELECT 1
FROM node_role_assignments nra
WHERE nra.cluster_id = vc.cluster_id
AND nra.node_id = $2::uuid
AND nra.status = 'active'
AND nra.role IN ('vpn-exit', 'vpn-connector')
AND (nra.organization_id IS NULL OR nra.organization_id = vc.organization_id)
) AS has_authorized_role,
EXISTS (
SELECT 1
FROM vpn_connection_leases active_owner
WHERE active_owner.cluster_id = vc.cluster_id
AND active_owner.vpn_connection_id = vc.id
AND active_owner.owner_node_id = $2::uuid
AND active_owner.status = 'active'
AND active_owner.expires_at > NOW()
) AS is_active_owner
FROM vpn_connections vc
WHERE vc.cluster_id = $1::uuid
AND vc.desired_state = 'enabled'
)
SELECT v.id::text,
v.cluster_id::text,
v.organization_id::text,
v.name,
v.target_endpoint,
v.protocol_family,
v.mode,
v.desired_state,
v.routing_usage,
v.route_policy,
v.qos_policy,
v.placement_policy,
v.status,
(v.credential_ref IS NOT NULL) AS has_credential_ref,
CASE WHEN v.is_active_owner THEN 'active_owner' ELSE 'eligible_candidate' END AS assignment_reason,
CASE WHEN l.id IS NULL THEN NULL ELSE jsonb_build_object(
'lease_id', l.id::text,
'owner_node_id', l.owner_node_id::text,
'lease_generation', l.lease_generation,
'status', l.status,
'renewed_at', l.renewed_at,
'expires_at', l.expires_at
) END AS active_lease,
v.updated_at
FROM visible v
JOIN active_node an ON TRUE
LEFT JOIN vpn_connection_leases l
ON l.cluster_id = v.cluster_id
AND l.vpn_connection_id = v.id
AND l.status = 'active'
AND l.expires_at > NOW()
WHERE (v.allowed_by_policy AND v.has_authorized_role)
OR v.is_active_owner
ORDER BY v.name ASC, v.id ASC
`, clusterID, nodeID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []NodeVPNAssignment
for rows.Next() {
item, err := scanNodeVPNAssignment(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func (s *PostgresStore) ReportNodeVPNAssignmentStatus(ctx context.Context, input ReportNodeVPNAssignmentStatusInput) (NodeVPNAssignmentStatus, error) {
tx, err := s.db.Begin(ctx)
if err != nil {
return NodeVPNAssignmentStatus{}, err
}
defer func() { _ = tx.Rollback(ctx) }()
id := uuid.NewString()
row := tx.QueryRow(ctx, `
INSERT INTO vpn_connection_assignment_status_reports (
id, vpn_connection_id, cluster_id, node_id, observed_status, status_payload, observed_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4::uuid, $5, $6::jsonb, $7)
RETURNING id::text, vpn_connection_id::text, cluster_id::text, node_id::text,
observed_status, status_payload, observed_at
`, id, input.VPNConnectionID, input.ClusterID, input.NodeID, input.ObservedStatus, []byte(input.StatusPayload), input.ObservedAt)
item, err := scanNodeVPNAssignmentStatus(row)
if err != nil {
return NodeVPNAssignmentStatus{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO vpn_connection_assignment_latest_statuses (
vpn_connection_id, cluster_id, node_id, report_id, observed_status, status_payload, observed_at, updated_at
) VALUES ($1::uuid, $2::uuid, $3::uuid, $4::uuid, $5, $6::jsonb, $7, NOW())
ON CONFLICT (vpn_connection_id, node_id) DO UPDATE SET
report_id = EXCLUDED.report_id,
observed_status = EXCLUDED.observed_status,
status_payload = EXCLUDED.status_payload,
observed_at = EXCLUDED.observed_at,
updated_at = NOW()
`, input.VPNConnectionID, input.ClusterID, input.NodeID, item.ID, input.ObservedStatus, []byte(input.StatusPayload), input.ObservedAt); err != nil {
return NodeVPNAssignmentStatus{}, err
}
if err := tx.Commit(ctx); err != nil {
return NodeVPNAssignmentStatus{}, err
}
return item, nil
}
func (s *PostgresStore) RecordAudit(ctx context.Context, event ClusterAuditEvent) error {
if len(event.Payload) == 0 {
event.Payload = json.RawMessage(`{}`)
}
_, err := s.db.Exec(ctx, `
INSERT INTO cluster_audit_events (id, cluster_id, actor_user_id, event_type, target_type, target_id, payload, created_at)
VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7::jsonb, COALESCE($8, NOW()))
`, uuid.NewString(), event.ClusterID, event.ActorUserID, event.EventType, event.TargetType, event.TargetID, []byte(event.Payload), event.CreatedAt)
return err
}
func (s *PostgresStore) ListAuditEvents(ctx context.Context, clusterID string, limit int) ([]ClusterAuditEvent, error) {
if limit <= 0 || limit > 200 {
limit = 100
}
rows, err := s.db.Query(ctx, `
SELECT id::text, cluster_id::text, actor_user_id::text, event_type, target_type, target_id, payload, created_at
FROM cluster_audit_events
WHERE cluster_id = $1::uuid
ORDER BY created_at DESC
LIMIT $2
`, clusterID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var out []ClusterAuditEvent
for rows.Next() {
item, err := scanAuditEvent(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
type scanner interface {
Scan(dest ...any) error
}
type rowQuerier interface {
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}
func scanCluster(row scanner) (Cluster, error) {
var item Cluster
if err := row.Scan(&item.ID, &item.Slug, &item.Name, &item.Status, &item.Region, &item.Metadata, &item.CreatedAt, &item.UpdatedAt); err != nil {
return Cluster{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanClusterAuthority(row scanner) (ClusterAuthorityKey, error) {
var item ClusterAuthorityKey
if err := row.Scan(
&item.ClusterID,
&item.AuthorityState,
&item.KeyAlgorithm,
&item.PublicKey,
&item.PublicKeyFingerprint,
&item.PrivateKey,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return ClusterAuthorityKey{}, err
}
item.SchemaVersion = clusterauth.AuthoritySchemaVersion
return item, nil
}
func scanNodeGroup(row scanner) (ClusterNodeGroup, error) {
var item ClusterNodeGroup
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.ParentGroupID,
&item.Name,
&item.Description,
&item.SortOrder,
&item.Metadata,
&item.CreatedByUserID,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return ClusterNodeGroup{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanClusterNode(row scanner) (ClusterNode, error) {
var item ClusterNode
if err := row.Scan(
&item.ID,
&item.OwnerOrganizationID,
&item.NodeKey,
&item.Name,
&item.OwnershipType,
&item.RegistrationStatus,
&item.HealthStatus,
&item.VersionState,
&item.PartitionState,
&item.ReportedVersion,
&item.LastSeenAt,
&item.MembershipStatus,
&item.MembershipMetadata,
&item.NodeGroupID,
&item.NodeGroupName,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return ClusterNode{}, err
}
ensureRaw(&item.MembershipMetadata, `{}`)
return item, nil
}
func scanJoinToken(row scanner) (NodeJoinToken, error) {
var item NodeJoinToken
var signatureRaw json.RawMessage
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.Scope,
&item.ExpiresAt,
&item.MaxUses,
&item.UsedCount,
&item.Status,
&item.CreatedByUserID,
&item.CreatedAt,
&item.RevokedAt,
&item.AuthorityPayload,
&signatureRaw,
); err != nil {
return NodeJoinToken{}, err
}
ensureRaw(&item.Scope, `{}`)
ensureRaw(&item.AuthorityPayload, `{}`)
if len(signatureRaw) > 0 && string(signatureRaw) != "{}" {
var signature ClusterSignature
if err := json.Unmarshal(signatureRaw, &signature); err != nil {
return NodeJoinToken{}, err
}
item.AuthoritySignature = &signature
}
return item, nil
}
func scanJoinRequest(row scanner) (NodeJoinRequest, error) {
var item NodeJoinRequest
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.JoinTokenID,
&item.NodeName,
&item.NodeFingerprint,
&item.PublicKey,
&item.ReportedCapabilities,
&item.ReportedFacts,
&item.RequestedRoles,
&item.Status,
&item.ReviewedByUserID,
&item.ReviewedAt,
&item.ApprovedNodeID,
&item.RejectionReason,
&item.CreatedAt,
&item.UpdatedAt,
&item.ApprovalPayload,
&item.ApprovalSignature,
); err != nil {
return NodeJoinRequest{}, err
}
ensureRaw(&item.ReportedCapabilities, `{}`)
ensureRaw(&item.ReportedFacts, `{}`)
ensureRaw(&item.RequestedRoles, `[]`)
ensureRaw(&item.ApprovalPayload, `{}`)
ensureRaw(&item.ApprovalSignature, `{}`)
return item, nil
}
func scanRoleAssignment(row scanner) (NodeRoleAssignment, error) {
var item NodeRoleAssignment
if err := row.Scan(&item.ID, &item.ClusterID, &item.NodeID, &item.OrganizationID, &item.Role, &item.Status, &item.Policy, &item.AssignedByUserID, &item.AssignedAt, &item.RevokedAt); err != nil {
return NodeRoleAssignment{}, err
}
ensureRaw(&item.Policy, `{}`)
return item, nil
}
func scanHeartbeat(row scanner) (NodeHeartbeat, error) {
var item NodeHeartbeat
if err := row.Scan(&item.ID, &item.ClusterID, &item.NodeID, &item.HealthStatus, &item.ReportedVersion, &item.Capabilities, &item.ServiceStates, &item.Metadata, &item.ObservedAt); err != nil {
return NodeHeartbeat{}, err
}
ensureRaw(&item.Capabilities, `{}`)
ensureRaw(&item.ServiceStates, `{}`)
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanAuditEvent(row scanner) (ClusterAuditEvent, error) {
var item ClusterAuditEvent
if err := row.Scan(&item.ID, &item.ClusterID, &item.ActorUserID, &item.EventType, &item.TargetType, &item.TargetID, &item.Payload, &item.CreatedAt); err != nil {
return ClusterAuditEvent{}, err
}
ensureRaw(&item.Payload, `{}`)
return item, nil
}
func scanFabricTestingFlag(row scanner) (FabricTestingFlag, error) {
var item FabricTestingFlag
if err := row.Scan(
&item.ID,
&item.ScopeType,
&item.ScopeID,
&item.ClusterID,
&item.Enabled,
&item.TelemetryEnabled,
&item.SyntheticLinksEnabled,
&item.HistoryRetentionHours,
&item.Metadata,
&item.UpdatedByUserID,
&item.UpdatedAt,
); err != nil {
return FabricTestingFlag{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanNodeTelemetry(row scanner) (NodeTelemetryObservation, error) {
var item NodeTelemetryObservation
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.NodeID,
&item.CPUPercent,
&item.MemoryUsedBytes,
&item.MemoryTotalBytes,
&item.DiskUsedBytes,
&item.DiskTotalBytes,
&item.NetworkRxBytes,
&item.NetworkTxBytes,
&item.ProcessCount,
&item.Payload,
&item.ObservedAt,
); err != nil {
return NodeTelemetryObservation{}, err
}
ensureRaw(&item.Payload, `{}`)
return item, nil
}
func scanDesiredWorkload(row scanner) (NodeWorkloadDesiredState, error) {
var item NodeWorkloadDesiredState
if err := row.Scan(
&item.ClusterID,
&item.NodeID,
&item.ServiceType,
&item.DesiredState,
&item.Version,
&item.RuntimeMode,
&item.ArtifactRef,
&item.Config,
&item.Environment,
&item.UpdatedByUserID,
&item.UpdatedAt,
); err != nil {
return NodeWorkloadDesiredState{}, err
}
ensureRaw(&item.Config, `{}`)
ensureRaw(&item.Environment, `{}`)
return item, nil
}
func scanWorkloadStatus(row scanner) (NodeWorkloadStatus, error) {
var item NodeWorkloadStatus
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.NodeID,
&item.ServiceType,
&item.ReportedState,
&item.RuntimeMode,
&item.Version,
&item.StatusPayload,
&item.ObservedAt,
); err != nil {
return NodeWorkloadStatus{}, err
}
ensureRaw(&item.StatusPayload, `{}`)
return item, nil
}
func scanMeshLink(row scanner) (MeshLinkObservation, error) {
var item MeshLinkObservation
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.SourceNodeID,
&item.TargetNodeID,
&item.LinkStatus,
&item.LatencyMs,
&item.QualityScore,
&item.Metadata,
&item.ObservedAt,
); err != nil {
return MeshLinkObservation{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanRouteIntent(row scanner) (MeshRouteIntent, error) {
var item MeshRouteIntent
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.SourceSelector,
&item.DestinationSelector,
&item.ServiceClass,
&item.Priority,
&item.Status,
&item.Policy,
&item.CreatedByUserID,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return MeshRouteIntent{}, err
}
ensureRaw(&item.SourceSelector, `{}`)
ensureRaw(&item.DestinationSelector, `{}`)
ensureRaw(&item.Policy, `{}`)
return item, nil
}
func scanQoSPolicy(row scanner) (MeshQoSPolicy, error) {
var item MeshQoSPolicy
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.ServiceClass,
&item.Priority,
&item.ReliabilityMode,
&item.DropPolicy,
&item.BandwidthPolicy,
&item.Metadata,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return MeshQoSPolicy{}, err
}
ensureRaw(&item.BandwidthPolicy, `{}`)
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanFabricEntryPoint(row scanner) (FabricEntryPoint, error) {
var item FabricEntryPoint
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.Name,
&item.Status,
&item.EndpointType,
&item.PublicEndpoint,
&item.Policy,
&item.Metadata,
&item.CreatedByUserID,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return FabricEntryPoint{}, err
}
ensureRaw(&item.Policy, `{}`)
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanFabricEntryPointNode(row scanner) (FabricEntryPointNode, error) {
var item FabricEntryPointNode
if err := row.Scan(
&item.EntryPointID,
&item.ClusterID,
&item.NodeID,
&item.Status,
&item.Priority,
&item.Metadata,
&item.AddedByUserID,
&item.AddedAt,
); err != nil {
return FabricEntryPointNode{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanFabricEgressPool(row scanner) (FabricEgressPool, error) {
var item FabricEgressPool
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.Name,
&item.Status,
&item.Description,
&item.RouteScope,
&item.Policy,
&item.Metadata,
&item.CreatedByUserID,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return FabricEgressPool{}, err
}
ensureRaw(&item.RouteScope, `{}`)
ensureRaw(&item.Policy, `{}`)
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanFabricEgressPoolNode(row scanner) (FabricEgressPoolNode, error) {
var item FabricEgressPoolNode
if err := row.Scan(
&item.EgressPoolID,
&item.ClusterID,
&item.NodeID,
&item.Status,
&item.Priority,
&item.Metadata,
&item.AddedByUserID,
&item.AddedAt,
); err != nil {
return FabricEgressPoolNode{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanAuthorityState(row scanner) (ClusterAuthorityState, error) {
var item ClusterAuthorityState
if err := row.Scan(
&item.ClusterID,
&item.AuthorityState,
&item.MutationMode,
&item.Term,
&item.Notes,
&item.UpdatedByUserID,
&item.UpdatedAt,
); err != nil {
return ClusterAuthorityState{}, err
}
return item, nil
}
func scanVPNConnection(row scanner) (VPNConnection, error) {
var item VPNConnection
if err := row.Scan(
&item.ID,
&item.ClusterID,
&item.OrganizationID,
&item.Name,
&item.TargetEndpoint,
&item.ProtocolFamily,
&item.CredentialRef,
&item.Mode,
&item.DesiredState,
&item.AllowedNodePolicy,
&item.RoutingUsage,
&item.RoutePolicy,
&item.QoSPolicy,
&item.PlacementPolicy,
&item.Status,
&item.Metadata,
&item.CreatedByUserID,
&item.UpdatedByUserID,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return VPNConnection{}, err
}
ensureRaw(&item.TargetEndpoint, `{}`)
ensureRaw(&item.AllowedNodePolicy, `{"mode":"explicit","node_ids":[]}`)
ensureRaw(&item.RoutingUsage, `[]`)
ensureRaw(&item.RoutePolicy, `{}`)
ensureRaw(&item.QoSPolicy, `{}`)
ensureRaw(&item.PlacementPolicy, `{}`)
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanVPNConnections(rows pgx.Rows) ([]VPNConnection, error) {
var out []VPNConnection
for rows.Next() {
item, err := scanVPNConnection(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func scanVPNAllowedNode(row scanner) (VPNConnectionAllowedNode, error) {
var item VPNConnectionAllowedNode
if err := row.Scan(
&item.VPNConnectionID,
&item.ClusterID,
&item.NodeID,
&item.RolePreference,
&item.Status,
&item.Metadata,
&item.CreatedByUserID,
&item.CreatedAt,
); err != nil {
return VPNConnectionAllowedNode{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func listVPNConnectionAllowedNodes(ctx context.Context, q rowQuerier, clusterID, vpnConnectionID string) ([]VPNConnectionAllowedNode, error) {
rows, err := q.Query(ctx, `
SELECT vpn_connection_id::text, cluster_id::text, node_id::text, role_preference,
status, metadata, created_by_user_id::text, created_at
FROM vpn_connection_allowed_nodes
WHERE cluster_id = $1::uuid
AND vpn_connection_id = $2::uuid
ORDER BY role_preference, created_at DESC
`, clusterID, vpnConnectionID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []VPNConnectionAllowedNode
for rows.Next() {
item, err := scanVPNAllowedNode(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func scanVPNRoutePolicy(row scanner) (VPNConnectionRoutePolicy, error) {
var item VPNConnectionRoutePolicy
if err := row.Scan(
&item.ID,
&item.VPNConnectionID,
&item.ClusterID,
&item.OrganizationID,
&item.RouteType,
&item.Destination,
&item.Action,
&item.ServiceType,
&item.Priority,
&item.Policy,
&item.Status,
&item.CreatedByUserID,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return VPNConnectionRoutePolicy{}, err
}
ensureRaw(&item.Policy, `{}`)
return item, nil
}
func scanVPNRoutePolicies(rows pgx.Rows) ([]VPNConnectionRoutePolicy, error) {
var out []VPNConnectionRoutePolicy
for rows.Next() {
item, err := scanVPNRoutePolicy(rows)
if err != nil {
return nil, err
}
out = append(out, item)
}
return out, rows.Err()
}
func scanVPNLease(row scanner) (VPNConnectionLease, error) {
var item VPNConnectionLease
if err := row.Scan(
&item.ID,
&item.VPNConnectionID,
&item.ClusterID,
&item.OwnerNodeID,
&item.LeaseGeneration,
&item.FencingToken,
&item.Status,
&item.AcquiredAt,
&item.RenewedAt,
&item.ExpiresAt,
&item.ReleasedAt,
&item.FencedAt,
&item.Metadata,
); err != nil {
return VPNConnectionLease{}, err
}
ensureRaw(&item.Metadata, `{}`)
return item, nil
}
func scanNodeVPNAssignment(row scanner) (NodeVPNAssignment, error) {
var item NodeVPNAssignment
var activeLeaseRaw json.RawMessage
if err := row.Scan(
&item.VPNConnectionID,
&item.ClusterID,
&item.OrganizationID,
&item.Name,
&item.TargetEndpoint,
&item.ProtocolFamily,
&item.Mode,
&item.DesiredState,
&item.RoutingUsage,
&item.RoutePolicy,
&item.QoSPolicy,
&item.PlacementPolicy,
&item.Status,
&item.HasCredentialRef,
&item.AssignmentReason,
&activeLeaseRaw,
&item.UpdatedAt,
); err != nil {
return NodeVPNAssignment{}, err
}
ensureRaw(&item.TargetEndpoint, `{}`)
ensureRaw(&item.RoutingUsage, `[]`)
ensureRaw(&item.RoutePolicy, `{}`)
ensureRaw(&item.QoSPolicy, `{}`)
ensureRaw(&item.PlacementPolicy, `{}`)
if len(activeLeaseRaw) > 0 && string(activeLeaseRaw) != "null" {
var lease NodeVPNAssignmentLease
if err := json.Unmarshal(activeLeaseRaw, &lease); err != nil {
return NodeVPNAssignment{}, err
}
item.ActiveLease = &lease
}
return item, nil
}
func scanNodeVPNAssignmentStatus(row scanner) (NodeVPNAssignmentStatus, error) {
var item NodeVPNAssignmentStatus
if err := row.Scan(
&item.ID,
&item.VPNConnectionID,
&item.ClusterID,
&item.NodeID,
&item.ObservedStatus,
&item.StatusPayload,
&item.ObservedAt,
); err != nil {
return NodeVPNAssignmentStatus{}, err
}
ensureRaw(&item.StatusPayload, `{}`)
return item, nil
}
func isUniqueViolation(err error) bool {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
return pgErr.Code == "23505"
}
return false
}
func getJoinRequestForUpdate(ctx context.Context, tx pgx.Tx, clusterID, joinRequestID string) (NodeJoinRequest, error) {
row := tx.QueryRow(ctx, `
SELECT id::text, cluster_id::text, join_token_id::text, node_name, node_fingerprint, public_key,
reported_capabilities, reported_facts, requested_roles, status, reviewed_by_user_id::text,
reviewed_at, approved_node_id::text, rejection_reason, created_at, updated_at, approval_payload, approval_signature
FROM node_join_requests
WHERE cluster_id = $1::uuid
AND id = $2::uuid
FOR UPDATE
`, clusterID, joinRequestID)
return scanJoinRequest(row)
}
func ensureRaw(raw *json.RawMessage, fallback string) {
if len(*raw) == 0 {
*raw = json.RawMessage(fallback)
}
}