743 lines
26 KiB
Go
743 lines
26 KiB
Go
package mesh
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ed25519"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
|
)
|
|
|
|
const (
|
|
FabricRegistryGossipRecordSchema = "rap.fabric.registry.gossip_record.v1"
|
|
|
|
FabricRegistryScopeFarm = "farm"
|
|
FabricRegistryScopeCluster = "cluster"
|
|
FabricRegistryScopeOrganization = "organization"
|
|
|
|
FabricRegistryServiceControlAPI = "control-api"
|
|
FabricRegistryServiceUpdateStore = "update-store"
|
|
FabricRegistryServiceUpdateCache = "update-cache"
|
|
FabricRegistryServiceWebAdmin = "web-admin"
|
|
FabricRegistryServiceVPNExitPool = "vpn-egress-pool"
|
|
|
|
FabricRegistryAuthorityControl = "control-authority"
|
|
FabricRegistryAuthorityUpdate = "update-authority"
|
|
FabricRegistryAuthorityStorage = "storage-authority"
|
|
FabricRegistryAuthorityRoute = "route-authority"
|
|
)
|
|
|
|
type FabricRegistryEndpoint struct {
|
|
EndpointID string `json:"endpoint_id"`
|
|
Address string `json:"address"`
|
|
Transport string `json:"transport"`
|
|
Reachability string `json:"reachability,omitempty"`
|
|
ConnectivityMode string `json:"connectivity_mode,omitempty"`
|
|
Region string `json:"region,omitempty"`
|
|
Priority int `json:"priority,omitempty"`
|
|
Weight int `json:"weight,omitempty"`
|
|
PeerCertSHA256 string `json:"peer_cert_sha256,omitempty"`
|
|
LastVerifiedAt *time.Time `json:"last_verified_at,omitempty"`
|
|
Metadata json.RawMessage `json:"metadata,omitempty"`
|
|
}
|
|
|
|
type FabricRegistrySignature struct {
|
|
KeyID string `json:"key_id"`
|
|
IssuerID string `json:"issuer_id"`
|
|
Role string `json:"role"`
|
|
Alg string `json:"alg"`
|
|
Value string `json:"value"`
|
|
}
|
|
|
|
type FabricRegistryGossipRecord struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
ClusterID string `json:"cluster_id"`
|
|
Service string `json:"service"`
|
|
Scope string `json:"scope"`
|
|
OrganizationID string `json:"organization_id,omitempty"`
|
|
Epoch int64 `json:"epoch"`
|
|
Generation string `json:"generation,omitempty"`
|
|
IssuedAt time.Time `json:"issued_at"`
|
|
ExpiresAt time.Time `json:"expires_at"`
|
|
IssuerNodeID string `json:"issuer_node_id"`
|
|
IssuerRole string `json:"issuer_role"`
|
|
Endpoints []FabricRegistryEndpoint `json:"endpoints"`
|
|
Metadata json.RawMessage `json:"metadata,omitempty"`
|
|
Signatures []FabricRegistrySignature `json:"signatures,omitempty"`
|
|
}
|
|
|
|
type FabricRegistryTrustedIssuer struct {
|
|
IssuerID string
|
|
Role string
|
|
PublicKey ed25519.PublicKey
|
|
Scopes []string
|
|
Services []string
|
|
}
|
|
|
|
type FabricRegistryVerificationPolicy struct {
|
|
LocalClusterID string
|
|
TrustedIssuers []FabricRegistryTrustedIssuer
|
|
RequiredSignatures int
|
|
MaxClockSkew time.Duration
|
|
Now time.Time
|
|
}
|
|
|
|
type FabricRegistryVerificationResult struct {
|
|
AcceptedSignatureCount int `json:"accepted_signature_count"`
|
|
AcceptedIssuers []string `json:"accepted_issuers,omitempty"`
|
|
RecordHash string `json:"record_hash"`
|
|
}
|
|
|
|
type FabricRegistryEntryState string
|
|
|
|
const (
|
|
FabricRegistryCandidate FabricRegistryEntryState = "candidate"
|
|
FabricRegistryActive FabricRegistryEntryState = "active"
|
|
FabricRegistryExpired FabricRegistryEntryState = "expired"
|
|
FabricRegistryRejected FabricRegistryEntryState = "rejected"
|
|
)
|
|
|
|
type FabricRegistryEntry struct {
|
|
Record FabricRegistryGossipRecord `json:"record"`
|
|
State FabricRegistryEntryState `json:"state"`
|
|
AcceptedAt time.Time `json:"accepted_at"`
|
|
PromotedAt *time.Time `json:"promoted_at,omitempty"`
|
|
VerifyResult FabricRegistryVerificationResult `json:"verify_result"`
|
|
}
|
|
|
|
type FabricRegistryBootstrapReport struct {
|
|
Total int `json:"total"`
|
|
Active int `json:"active"`
|
|
Candidate int `json:"candidate"`
|
|
Rejected int `json:"rejected"`
|
|
Rejects []string `json:"rejects,omitempty"`
|
|
RecordKeys []string `json:"record_keys,omitempty"`
|
|
}
|
|
|
|
type FabricRegistryResolveRequest struct {
|
|
ClusterID string
|
|
Service string
|
|
Scope string
|
|
OrganizationID string
|
|
PreferredRegion string
|
|
Now time.Time
|
|
}
|
|
|
|
type FabricRegistryResolvedService struct {
|
|
Found bool `json:"found"`
|
|
Service string `json:"service"`
|
|
Scope string `json:"scope,omitempty"`
|
|
OrganizationID string `json:"organization_id,omitempty"`
|
|
RecordEpoch int64 `json:"record_epoch,omitempty"`
|
|
RecordHash string `json:"record_hash,omitempty"`
|
|
Endpoints []FabricRegistryEndpoint `json:"endpoints,omitempty"`
|
|
Reason string `json:"reason,omitempty"`
|
|
}
|
|
|
|
type FabricRegistryLiveProbeRequest struct {
|
|
ClusterID string
|
|
PreferredRegion string
|
|
Timeout time.Duration
|
|
Now time.Time
|
|
MaxCandidates int
|
|
}
|
|
|
|
type FabricRegistryLiveProbeResult struct {
|
|
Service string `json:"service"`
|
|
Scope string `json:"scope"`
|
|
OrganizationID string `json:"organization_id,omitempty"`
|
|
EndpointID string `json:"endpoint_id,omitempty"`
|
|
Address string `json:"address,omitempty"`
|
|
Status string `json:"status"`
|
|
LatencyMs int64 `json:"latency_ms,omitempty"`
|
|
Promoted bool `json:"promoted"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type FabricRegistrySnapshot struct {
|
|
Active int `json:"active"`
|
|
Candidate int `json:"candidate"`
|
|
ActiveKeys []string `json:"active_keys,omitempty"`
|
|
CandidateKeys []string `json:"candidate_keys,omitempty"`
|
|
}
|
|
|
|
type FabricRegistry struct {
|
|
entries map[string]FabricRegistryEntry
|
|
candidates map[string]FabricRegistryEntry
|
|
}
|
|
|
|
func NewFabricRegistry() *FabricRegistry {
|
|
return &FabricRegistry{entries: map[string]FabricRegistryEntry{}, candidates: map[string]FabricRegistryEntry{}}
|
|
}
|
|
|
|
func LoadFabricRegistryBootstrapRecords(recordsJSON string, policy FabricRegistryVerificationPolicy, liveVerified bool) (*FabricRegistry, FabricRegistryBootstrapReport, error) {
|
|
registry := NewFabricRegistry()
|
|
recordsJSON = strings.TrimSpace(recordsJSON)
|
|
if recordsJSON == "" {
|
|
return registry, FabricRegistryBootstrapReport{}, nil
|
|
}
|
|
var records []FabricRegistryGossipRecord
|
|
if err := json.Unmarshal([]byte(recordsJSON), &records); err != nil {
|
|
return nil, FabricRegistryBootstrapReport{}, fmt.Errorf("decode fabric registry bootstrap records: %w", err)
|
|
}
|
|
report := FabricRegistryBootstrapReport{Total: len(records)}
|
|
for _, record := range records {
|
|
entry, changed, err := registry.ApplyGossipRecord(record, policy, liveVerified)
|
|
if err != nil {
|
|
report.Rejected++
|
|
report.Rejects = append(report.Rejects, err.Error())
|
|
continue
|
|
}
|
|
if !changed {
|
|
continue
|
|
}
|
|
report.RecordKeys = append(report.RecordKeys, fabricRegistryRecordKey(record))
|
|
switch entry.State {
|
|
case FabricRegistryActive:
|
|
report.Active++
|
|
case FabricRegistryCandidate:
|
|
report.Candidate++
|
|
}
|
|
}
|
|
return registry, report, nil
|
|
}
|
|
|
|
func (r *FabricRegistry) ApplyGossipRecord(record FabricRegistryGossipRecord, policy FabricRegistryVerificationPolicy, liveVerified bool) (FabricRegistryEntry, bool, error) {
|
|
if r == nil {
|
|
return FabricRegistryEntry{}, false, fmt.Errorf("fabric registry is nil")
|
|
}
|
|
result, err := VerifyFabricRegistryGossipRecord(record, policy)
|
|
if err != nil {
|
|
return FabricRegistryEntry{}, false, err
|
|
}
|
|
now := registryNow(policy.Now)
|
|
key := fabricRegistryRecordKey(record)
|
|
current, exists := r.entries[key]
|
|
if exists && !fabricRegistryRecordNewer(record, current.Record, now) {
|
|
return current, false, nil
|
|
}
|
|
state := FabricRegistryCandidate
|
|
var promotedAt *time.Time
|
|
if liveVerified {
|
|
state = FabricRegistryActive
|
|
t := now
|
|
promotedAt = &t
|
|
}
|
|
entry := FabricRegistryEntry{
|
|
Record: normalizeFabricRegistryRecord(record),
|
|
State: state,
|
|
AcceptedAt: now,
|
|
PromotedAt: promotedAt,
|
|
VerifyResult: result,
|
|
}
|
|
if state == FabricRegistryActive {
|
|
r.entries[key] = entry
|
|
delete(r.candidates, key)
|
|
return entry, true, nil
|
|
}
|
|
if r.candidates == nil {
|
|
r.candidates = map[string]FabricRegistryEntry{}
|
|
}
|
|
r.candidates[key] = entry
|
|
return entry, true, nil
|
|
}
|
|
|
|
func (r *FabricRegistry) MarkLiveVerified(clusterID, service, scope, organizationID string, now time.Time) bool {
|
|
if r == nil {
|
|
return false
|
|
}
|
|
key := fabricRegistryKey(clusterID, service, scope, organizationID)
|
|
entry, ok := r.candidates[key]
|
|
if !ok || entry.State == FabricRegistryExpired || entry.State == FabricRegistryRejected {
|
|
return false
|
|
}
|
|
t := registryNow(now)
|
|
entry.State = FabricRegistryActive
|
|
entry.PromotedAt = &t
|
|
r.entries[key] = entry
|
|
delete(r.candidates, key)
|
|
return true
|
|
}
|
|
|
|
func (r *FabricRegistry) Active(clusterID, service, scope, organizationID string, now time.Time) (FabricRegistryGossipRecord, bool) {
|
|
if r == nil {
|
|
return FabricRegistryGossipRecord{}, false
|
|
}
|
|
entry, ok := r.entries[fabricRegistryKey(clusterID, service, scope, organizationID)]
|
|
if !ok || entry.State != FabricRegistryActive || !entry.Record.ExpiresAt.After(registryNow(now)) {
|
|
return FabricRegistryGossipRecord{}, false
|
|
}
|
|
return entry.Record, true
|
|
}
|
|
|
|
func (r *FabricRegistry) ResolveService(req FabricRegistryResolveRequest) FabricRegistryResolvedService {
|
|
service := strings.ToLower(strings.TrimSpace(req.Service))
|
|
if service == "" {
|
|
return FabricRegistryResolvedService{Found: false, Reason: "service_required"}
|
|
}
|
|
scopeOrder := fabricRegistryScopeResolutionOrder(req.Scope, req.OrganizationID)
|
|
if resolved := r.resolveServiceFromRecords(req, service, scopeOrder, false); resolved.Found || resolved.Reason == "no_usable_endpoints" {
|
|
return resolved
|
|
}
|
|
if resolved := r.resolveServiceFromRecords(req, service, scopeOrder, true); resolved.Found || resolved.Reason == "no_usable_endpoints" {
|
|
return resolved
|
|
}
|
|
return FabricRegistryResolvedService{Found: false, Service: service, Reason: "no_active_record"}
|
|
}
|
|
|
|
func (r *FabricRegistry) resolveServiceFromRecords(req FabricRegistryResolveRequest, service string, scopeOrder []string, candidateOnly bool) FabricRegistryResolvedService {
|
|
for _, scope := range scopeOrder {
|
|
organizationID := strings.TrimSpace(req.OrganizationID)
|
|
if scope != FabricRegistryScopeOrganization {
|
|
organizationID = ""
|
|
}
|
|
var record FabricRegistryGossipRecord
|
|
var ok bool
|
|
if candidateOnly {
|
|
record, ok = r.Candidate(req.ClusterID, service, scope, organizationID, req.Now)
|
|
} else {
|
|
record, ok = r.Active(req.ClusterID, service, scope, organizationID, req.Now)
|
|
}
|
|
if !ok {
|
|
continue
|
|
}
|
|
endpoints := selectFabricRegistryEndpoints(record.Endpoints, req.PreferredRegion)
|
|
if len(endpoints) == 0 {
|
|
return FabricRegistryResolvedService{Found: false, Service: service, Scope: scope, OrganizationID: organizationID, Reason: "no_usable_endpoints"}
|
|
}
|
|
result, _ := canonicalFabricRegistryPayload(record)
|
|
sum := sha256.Sum256(result)
|
|
return FabricRegistryResolvedService{
|
|
Found: true,
|
|
Service: service,
|
|
Scope: scope,
|
|
OrganizationID: organizationID,
|
|
RecordEpoch: record.Epoch,
|
|
RecordHash: hex.EncodeToString(sum[:]),
|
|
Endpoints: endpoints,
|
|
Reason: fabricRegistryResolveReason(candidateOnly),
|
|
}
|
|
}
|
|
return FabricRegistryResolvedService{Found: false, Service: service}
|
|
}
|
|
|
|
func (r *FabricRegistry) Candidate(clusterID, service, scope, organizationID string, now time.Time) (FabricRegistryGossipRecord, bool) {
|
|
if r == nil {
|
|
return FabricRegistryGossipRecord{}, false
|
|
}
|
|
entry, ok := r.candidates[fabricRegistryKey(clusterID, service, scope, organizationID)]
|
|
if !ok || entry.State != FabricRegistryCandidate || !entry.Record.ExpiresAt.After(registryNow(now)) {
|
|
return FabricRegistryGossipRecord{}, false
|
|
}
|
|
return entry.Record, true
|
|
}
|
|
|
|
func fabricRegistryResolveReason(candidateOnly bool) string {
|
|
if candidateOnly {
|
|
return "candidate_record_pending_live_verification"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (r *FabricRegistry) Snapshot(now time.Time) FabricRegistrySnapshot {
|
|
if r == nil {
|
|
return FabricRegistrySnapshot{}
|
|
}
|
|
now = registryNow(now)
|
|
out := FabricRegistrySnapshot{}
|
|
for key, entry := range r.entries {
|
|
if entry.State == FabricRegistryActive && entry.Record.ExpiresAt.After(now) {
|
|
out.Active++
|
|
out.ActiveKeys = append(out.ActiveKeys, key)
|
|
}
|
|
}
|
|
for key, entry := range r.candidates {
|
|
if entry.State == FabricRegistryCandidate && entry.Record.ExpiresAt.After(now) {
|
|
out.Candidate++
|
|
out.CandidateKeys = append(out.CandidateKeys, key)
|
|
}
|
|
}
|
|
sort.Strings(out.ActiveKeys)
|
|
sort.Strings(out.CandidateKeys)
|
|
return out
|
|
}
|
|
|
|
func (r *FabricRegistry) VerifyCandidates(ctx context.Context, transport FabricTransport, req FabricRegistryLiveProbeRequest) []FabricRegistryLiveProbeResult {
|
|
if r == nil {
|
|
return nil
|
|
}
|
|
now := registryNow(req.Now)
|
|
timeout := req.Timeout
|
|
if timeout <= 0 {
|
|
timeout = 2 * time.Second
|
|
}
|
|
maxCandidates := req.MaxCandidates
|
|
if maxCandidates <= 0 {
|
|
maxCandidates = 16
|
|
}
|
|
candidates := make([]FabricRegistryEntry, 0, len(r.candidates))
|
|
for _, entry := range r.candidates {
|
|
if entry.State != FabricRegistryCandidate || !entry.Record.ExpiresAt.After(now) {
|
|
continue
|
|
}
|
|
if clusterID := strings.TrimSpace(req.ClusterID); clusterID != "" && entry.Record.ClusterID != clusterID {
|
|
continue
|
|
}
|
|
candidates = append(candidates, entry)
|
|
}
|
|
sort.SliceStable(candidates, func(i, j int) bool {
|
|
if candidates[i].Record.Service != candidates[j].Record.Service {
|
|
return candidates[i].Record.Service < candidates[j].Record.Service
|
|
}
|
|
if candidates[i].Record.Scope != candidates[j].Record.Scope {
|
|
return candidates[i].Record.Scope < candidates[j].Record.Scope
|
|
}
|
|
return candidates[i].Record.Epoch > candidates[j].Record.Epoch
|
|
})
|
|
if len(candidates) > maxCandidates {
|
|
candidates = candidates[:maxCandidates]
|
|
}
|
|
results := make([]FabricRegistryLiveProbeResult, 0, len(candidates))
|
|
for _, entry := range candidates {
|
|
record := entry.Record
|
|
result := FabricRegistryLiveProbeResult{
|
|
Service: record.Service,
|
|
Scope: record.Scope,
|
|
OrganizationID: record.OrganizationID,
|
|
Status: "unreachable",
|
|
}
|
|
endpoints := selectFabricRegistryEndpoints(record.Endpoints, req.PreferredRegion)
|
|
if len(endpoints) == 0 {
|
|
result.Error = "no_usable_endpoints"
|
|
results = append(results, result)
|
|
continue
|
|
}
|
|
for _, endpoint := range endpoints {
|
|
probeCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
latency, err := probeFabricRegistryEndpoint(probeCtx, transport, endpoint, timeout)
|
|
cancel()
|
|
result.EndpointID = endpoint.EndpointID
|
|
result.Address = endpoint.Address
|
|
if err != nil {
|
|
result.Error = err.Error()
|
|
continue
|
|
}
|
|
result.Status = "reachable"
|
|
result.LatencyMs = latency.Milliseconds()
|
|
result.Promoted = r.MarkLiveVerified(record.ClusterID, record.Service, record.Scope, record.OrganizationID, now)
|
|
result.Error = ""
|
|
break
|
|
}
|
|
results = append(results, result)
|
|
}
|
|
return results
|
|
}
|
|
|
|
func SignFabricRegistryGossipRecord(record FabricRegistryGossipRecord, issuer FabricRegistryTrustedIssuer, privateKey ed25519.PrivateKey) (FabricRegistryGossipRecord, error) {
|
|
payload, err := canonicalFabricRegistryPayload(record)
|
|
if err != nil {
|
|
return record, err
|
|
}
|
|
sig := ed25519.Sign(privateKey, payload)
|
|
record.Signatures = append(record.Signatures, FabricRegistrySignature{
|
|
KeyID: firstNonEmpty(issuer.IssuerID, record.IssuerNodeID),
|
|
IssuerID: firstNonEmpty(issuer.IssuerID, record.IssuerNodeID),
|
|
Role: firstNonEmpty(issuer.Role, record.IssuerRole),
|
|
Alg: "ed25519",
|
|
Value: hex.EncodeToString(sig),
|
|
})
|
|
return record, nil
|
|
}
|
|
|
|
func VerifyFabricRegistryGossipRecord(record FabricRegistryGossipRecord, policy FabricRegistryVerificationPolicy) (FabricRegistryVerificationResult, error) {
|
|
record = normalizeFabricRegistryRecord(record)
|
|
if err := validateFabricRegistryGossipRecord(record, policy); err != nil {
|
|
return FabricRegistryVerificationResult{}, err
|
|
}
|
|
payload, err := canonicalFabricRegistryPayload(record)
|
|
if err != nil {
|
|
return FabricRegistryVerificationResult{}, err
|
|
}
|
|
sum := sha256.Sum256(payload)
|
|
trusted := map[string]FabricRegistryTrustedIssuer{}
|
|
for _, issuer := range policy.TrustedIssuers {
|
|
if strings.TrimSpace(issuer.IssuerID) != "" {
|
|
trusted[issuer.IssuerID] = issuer
|
|
}
|
|
if strings.TrimSpace(issuer.IssuerID) != "" && strings.TrimSpace(issuer.Role) != "" {
|
|
trusted[issuer.IssuerID+"\x00"+issuer.Role] = issuer
|
|
}
|
|
}
|
|
accepted := map[string]struct{}{}
|
|
for _, signature := range record.Signatures {
|
|
if strings.ToLower(strings.TrimSpace(signature.Alg)) != "ed25519" {
|
|
continue
|
|
}
|
|
issuer, ok := trusted[strings.TrimSpace(signature.IssuerID)+"\x00"+strings.TrimSpace(signature.Role)]
|
|
if !ok {
|
|
issuer, ok = trusted[strings.TrimSpace(signature.IssuerID)]
|
|
}
|
|
if !ok || !fabricRegistryIssuerAllowed(issuer, record) {
|
|
continue
|
|
}
|
|
rawSig, err := hex.DecodeString(strings.TrimSpace(signature.Value))
|
|
if err != nil || len(rawSig) != ed25519.SignatureSize || len(issuer.PublicKey) != ed25519.PublicKeySize {
|
|
continue
|
|
}
|
|
if ed25519.Verify(issuer.PublicKey, payload, rawSig) {
|
|
accepted[signature.IssuerID] = struct{}{}
|
|
}
|
|
}
|
|
required := policy.RequiredSignatures
|
|
if required <= 0 {
|
|
required = 1
|
|
}
|
|
if len(accepted) < required {
|
|
return FabricRegistryVerificationResult{RecordHash: hex.EncodeToString(sum[:])}, fmt.Errorf("fabric registry gossip record lacks required trusted signatures")
|
|
}
|
|
issuers := make([]string, 0, len(accepted))
|
|
for issuer := range accepted {
|
|
issuers = append(issuers, issuer)
|
|
}
|
|
sort.Strings(issuers)
|
|
return FabricRegistryVerificationResult{
|
|
AcceptedSignatureCount: len(accepted),
|
|
AcceptedIssuers: issuers,
|
|
RecordHash: hex.EncodeToString(sum[:]),
|
|
}, nil
|
|
}
|
|
|
|
func validateFabricRegistryGossipRecord(record FabricRegistryGossipRecord, policy FabricRegistryVerificationPolicy) error {
|
|
if record.SchemaVersion != FabricRegistryGossipRecordSchema {
|
|
return fmt.Errorf("fabric registry gossip record schema_version is invalid")
|
|
}
|
|
if strings.TrimSpace(record.ClusterID) == "" || (strings.TrimSpace(policy.LocalClusterID) != "" && record.ClusterID != policy.LocalClusterID) {
|
|
return ErrClusterMismatch
|
|
}
|
|
if strings.TrimSpace(record.Service) == "" || strings.TrimSpace(record.Scope) == "" || strings.TrimSpace(record.IssuerNodeID) == "" || strings.TrimSpace(record.IssuerRole) == "" {
|
|
return fmt.Errorf("fabric registry gossip record is missing service, scope, or issuer")
|
|
}
|
|
if record.Epoch <= 0 || record.IssuedAt.IsZero() || record.ExpiresAt.IsZero() || !record.ExpiresAt.After(record.IssuedAt) {
|
|
return fmt.Errorf("fabric registry gossip record has invalid epoch or validity window")
|
|
}
|
|
now := registryNow(policy.Now)
|
|
skew := policy.MaxClockSkew
|
|
if skew <= 0 {
|
|
skew = time.Minute
|
|
}
|
|
if record.IssuedAt.After(now.Add(skew)) || !record.ExpiresAt.After(now) {
|
|
return fmt.Errorf("fabric registry gossip record is not currently valid")
|
|
}
|
|
if len(record.Endpoints) == 0 {
|
|
return fmt.Errorf("fabric registry gossip record has no endpoints")
|
|
}
|
|
for _, endpoint := range record.Endpoints {
|
|
if strings.TrimSpace(endpoint.EndpointID) == "" || strings.TrimSpace(endpoint.Address) == "" || strings.TrimSpace(endpoint.Transport) == "" {
|
|
return fmt.Errorf("fabric registry gossip record contains invalid endpoint")
|
|
}
|
|
if !isQUICOnlyCandidateTransport(endpoint.Transport) || hasUnsupportedEndpointScheme(endpoint.Address) {
|
|
return fmt.Errorf("fabric registry gossip endpoint must be QUIC-only")
|
|
}
|
|
if len(endpoint.Metadata) > 0 && !json.Valid(endpoint.Metadata) {
|
|
return fmt.Errorf("fabric registry gossip endpoint metadata is invalid")
|
|
}
|
|
}
|
|
if len(record.Metadata) > 0 && !json.Valid(record.Metadata) {
|
|
return fmt.Errorf("fabric registry gossip metadata is invalid")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func canonicalFabricRegistryPayload(record FabricRegistryGossipRecord) ([]byte, error) {
|
|
record = normalizeFabricRegistryRecord(record)
|
|
record.Signatures = nil
|
|
payload, err := json.Marshal(record)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var compact bytes.Buffer
|
|
if err := json.Compact(&compact, payload); err != nil {
|
|
return nil, err
|
|
}
|
|
return compact.Bytes(), nil
|
|
}
|
|
|
|
func normalizeFabricRegistryRecord(record FabricRegistryGossipRecord) FabricRegistryGossipRecord {
|
|
record.SchemaVersion = strings.TrimSpace(record.SchemaVersion)
|
|
record.ClusterID = strings.TrimSpace(record.ClusterID)
|
|
record.Service = strings.ToLower(strings.TrimSpace(record.Service))
|
|
record.Scope = strings.ToLower(strings.TrimSpace(record.Scope))
|
|
record.OrganizationID = strings.TrimSpace(record.OrganizationID)
|
|
record.IssuerNodeID = strings.TrimSpace(record.IssuerNodeID)
|
|
record.IssuerRole = strings.TrimSpace(record.IssuerRole)
|
|
record.Generation = strings.TrimSpace(record.Generation)
|
|
for i := range record.Endpoints {
|
|
record.Endpoints[i].EndpointID = strings.TrimSpace(record.Endpoints[i].EndpointID)
|
|
record.Endpoints[i].Address = strings.TrimSpace(record.Endpoints[i].Address)
|
|
record.Endpoints[i].Transport = strings.TrimSpace(record.Endpoints[i].Transport)
|
|
record.Endpoints[i].Reachability = strings.TrimSpace(record.Endpoints[i].Reachability)
|
|
record.Endpoints[i].ConnectivityMode = strings.TrimSpace(record.Endpoints[i].ConnectivityMode)
|
|
record.Endpoints[i].Region = strings.TrimSpace(record.Endpoints[i].Region)
|
|
record.Endpoints[i].PeerCertSHA256 = normalizeCertSHA256(record.Endpoints[i].PeerCertSHA256)
|
|
}
|
|
sort.SliceStable(record.Endpoints, func(i, j int) bool {
|
|
if record.Endpoints[i].Priority != record.Endpoints[j].Priority {
|
|
return record.Endpoints[i].Priority < record.Endpoints[j].Priority
|
|
}
|
|
return record.Endpoints[i].EndpointID < record.Endpoints[j].EndpointID
|
|
})
|
|
sort.SliceStable(record.Signatures, func(i, j int) bool {
|
|
if record.Signatures[i].IssuerID != record.Signatures[j].IssuerID {
|
|
return record.Signatures[i].IssuerID < record.Signatures[j].IssuerID
|
|
}
|
|
return record.Signatures[i].KeyID < record.Signatures[j].KeyID
|
|
})
|
|
return record
|
|
}
|
|
|
|
func fabricRegistryIssuerAllowed(issuer FabricRegistryTrustedIssuer, record FabricRegistryGossipRecord) bool {
|
|
if strings.TrimSpace(issuer.Role) != "" && issuer.Role != record.IssuerRole {
|
|
return false
|
|
}
|
|
if len(issuer.Scopes) > 0 && !stringInSlice(record.Scope, issuer.Scopes) {
|
|
return false
|
|
}
|
|
if len(issuer.Services) > 0 && !stringInSlice(record.Service, issuer.Services) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func fabricRegistryRecordKey(record FabricRegistryGossipRecord) string {
|
|
return fabricRegistryKey(record.ClusterID, record.Service, record.Scope, record.OrganizationID)
|
|
}
|
|
|
|
func fabricRegistryScopeResolutionOrder(scope string, organizationID string) []string {
|
|
scope = strings.ToLower(strings.TrimSpace(scope))
|
|
switch scope {
|
|
case FabricRegistryScopeOrganization:
|
|
if strings.TrimSpace(organizationID) != "" {
|
|
return []string{FabricRegistryScopeOrganization, FabricRegistryScopeCluster, FabricRegistryScopeFarm}
|
|
}
|
|
return []string{FabricRegistryScopeCluster, FabricRegistryScopeFarm}
|
|
case FabricRegistryScopeFarm:
|
|
return []string{FabricRegistryScopeFarm}
|
|
case FabricRegistryScopeCluster, "":
|
|
return []string{FabricRegistryScopeCluster, FabricRegistryScopeFarm}
|
|
default:
|
|
return []string{scope, FabricRegistryScopeCluster, FabricRegistryScopeFarm}
|
|
}
|
|
}
|
|
|
|
func selectFabricRegistryEndpoints(endpoints []FabricRegistryEndpoint, preferredRegion string) []FabricRegistryEndpoint {
|
|
preferredRegion = strings.TrimSpace(preferredRegion)
|
|
out := make([]FabricRegistryEndpoint, 0, len(endpoints))
|
|
for _, endpoint := range endpoints {
|
|
if strings.TrimSpace(endpoint.Address) == "" || !isQUICOnlyCandidateTransport(endpoint.Transport) || hasUnsupportedEndpointScheme(endpoint.Address) {
|
|
continue
|
|
}
|
|
out = append(out, endpoint)
|
|
}
|
|
sort.SliceStable(out, func(i, j int) bool {
|
|
if preferredRegion != "" {
|
|
iMatch := strings.EqualFold(out[i].Region, preferredRegion)
|
|
jMatch := strings.EqualFold(out[j].Region, preferredRegion)
|
|
if iMatch != jMatch {
|
|
return iMatch
|
|
}
|
|
}
|
|
if out[i].Priority != out[j].Priority {
|
|
return out[i].Priority < out[j].Priority
|
|
}
|
|
if out[i].Weight != out[j].Weight {
|
|
return out[i].Weight > out[j].Weight
|
|
}
|
|
return out[i].EndpointID < out[j].EndpointID
|
|
})
|
|
return out
|
|
}
|
|
|
|
func probeFabricRegistryEndpoint(ctx context.Context, transport FabricTransport, endpoint FabricRegistryEndpoint, timeout time.Duration) (time.Duration, error) {
|
|
if transport == nil {
|
|
return 0, fmt.Errorf("fabric registry live probe transport is unavailable")
|
|
}
|
|
if timeout <= 0 {
|
|
timeout = 2 * time.Second
|
|
}
|
|
target := FabricTransportTargetFromRegistryEndpoint(endpoint)
|
|
target.Timeout = timeout
|
|
target.InboundBuffer = 2
|
|
target.ErrorBuffer = 2
|
|
startedAt := time.Now()
|
|
session, err := transport.Connect(ctx, target)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer session.Close()
|
|
sequence := uint64(startedAt.UnixNano())
|
|
if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, TrafficClass: fabricproto.TrafficClassReliable, Sequence: sequence, Payload: []byte("fabric-registry-live-probe")}); err != nil {
|
|
return 0, err
|
|
}
|
|
for {
|
|
select {
|
|
case frame, ok := <-session.Frames():
|
|
if !ok {
|
|
return 0, fmt.Errorf("fabric registry live probe session closed")
|
|
}
|
|
if frame.Type == fabricproto.FramePong && frame.Sequence == sequence {
|
|
return time.Since(startedAt), nil
|
|
}
|
|
case err, ok := <-session.Errors():
|
|
if !ok {
|
|
return 0, fmt.Errorf("fabric registry live probe error channel closed")
|
|
}
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
case <-ctx.Done():
|
|
return 0, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func fabricRegistryKey(clusterID, service, scope, organizationID string) string {
|
|
return strings.TrimSpace(clusterID) + "\x00" + strings.ToLower(strings.TrimSpace(service)) + "\x00" + strings.ToLower(strings.TrimSpace(scope)) + "\x00" + strings.TrimSpace(organizationID)
|
|
}
|
|
|
|
func fabricRegistryRecordNewer(next, current FabricRegistryGossipRecord, now time.Time) bool {
|
|
if !current.ExpiresAt.After(now) {
|
|
return true
|
|
}
|
|
if next.Epoch != current.Epoch {
|
|
return next.Epoch > current.Epoch
|
|
}
|
|
if !next.IssuedAt.Equal(current.IssuedAt) {
|
|
return next.IssuedAt.After(current.IssuedAt)
|
|
}
|
|
return strings.TrimSpace(next.Generation) > strings.TrimSpace(current.Generation)
|
|
}
|
|
|
|
func registryNow(now time.Time) time.Time {
|
|
if now.IsZero() {
|
|
return time.Now().UTC()
|
|
}
|
|
return now.UTC()
|
|
}
|
|
|
|
func stringInSlice(value string, values []string) bool {
|
|
value = strings.TrimSpace(value)
|
|
for _, candidate := range values {
|
|
if strings.TrimSpace(candidate) == value {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|