Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

743 lines
26 KiB
Go

package mesh
import (
"bytes"
"context"
"crypto/ed25519"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
)
const (
FabricRegistryGossipRecordSchema = "rap.fabric.registry.gossip_record.v1"
FabricRegistryScopeFarm = "farm"
FabricRegistryScopeCluster = "cluster"
FabricRegistryScopeOrganization = "organization"
FabricRegistryServiceControlAPI = "control-api"
FabricRegistryServiceUpdateStore = "update-store"
FabricRegistryServiceUpdateCache = "update-cache"
FabricRegistryServiceWebAdmin = "web-admin"
FabricRegistryServiceVPNExitPool = "vpn-egress-pool"
FabricRegistryAuthorityControl = "control-authority"
FabricRegistryAuthorityUpdate = "update-authority"
FabricRegistryAuthorityStorage = "storage-authority"
FabricRegistryAuthorityRoute = "route-authority"
)
type FabricRegistryEndpoint struct {
EndpointID string `json:"endpoint_id"`
Address string `json:"address"`
Transport string `json:"transport"`
Reachability string `json:"reachability,omitempty"`
ConnectivityMode string `json:"connectivity_mode,omitempty"`
Region string `json:"region,omitempty"`
Priority int `json:"priority,omitempty"`
Weight int `json:"weight,omitempty"`
PeerCertSHA256 string `json:"peer_cert_sha256,omitempty"`
LastVerifiedAt *time.Time `json:"last_verified_at,omitempty"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
type FabricRegistrySignature struct {
KeyID string `json:"key_id"`
IssuerID string `json:"issuer_id"`
Role string `json:"role"`
Alg string `json:"alg"`
Value string `json:"value"`
}
type FabricRegistryGossipRecord struct {
SchemaVersion string `json:"schema_version"`
ClusterID string `json:"cluster_id"`
Service string `json:"service"`
Scope string `json:"scope"`
OrganizationID string `json:"organization_id,omitempty"`
Epoch int64 `json:"epoch"`
Generation string `json:"generation,omitempty"`
IssuedAt time.Time `json:"issued_at"`
ExpiresAt time.Time `json:"expires_at"`
IssuerNodeID string `json:"issuer_node_id"`
IssuerRole string `json:"issuer_role"`
Endpoints []FabricRegistryEndpoint `json:"endpoints"`
Metadata json.RawMessage `json:"metadata,omitempty"`
Signatures []FabricRegistrySignature `json:"signatures,omitempty"`
}
type FabricRegistryTrustedIssuer struct {
IssuerID string
Role string
PublicKey ed25519.PublicKey
Scopes []string
Services []string
}
type FabricRegistryVerificationPolicy struct {
LocalClusterID string
TrustedIssuers []FabricRegistryTrustedIssuer
RequiredSignatures int
MaxClockSkew time.Duration
Now time.Time
}
type FabricRegistryVerificationResult struct {
AcceptedSignatureCount int `json:"accepted_signature_count"`
AcceptedIssuers []string `json:"accepted_issuers,omitempty"`
RecordHash string `json:"record_hash"`
}
type FabricRegistryEntryState string
const (
FabricRegistryCandidate FabricRegistryEntryState = "candidate"
FabricRegistryActive FabricRegistryEntryState = "active"
FabricRegistryExpired FabricRegistryEntryState = "expired"
FabricRegistryRejected FabricRegistryEntryState = "rejected"
)
type FabricRegistryEntry struct {
Record FabricRegistryGossipRecord `json:"record"`
State FabricRegistryEntryState `json:"state"`
AcceptedAt time.Time `json:"accepted_at"`
PromotedAt *time.Time `json:"promoted_at,omitempty"`
VerifyResult FabricRegistryVerificationResult `json:"verify_result"`
}
type FabricRegistryBootstrapReport struct {
Total int `json:"total"`
Active int `json:"active"`
Candidate int `json:"candidate"`
Rejected int `json:"rejected"`
Rejects []string `json:"rejects,omitempty"`
RecordKeys []string `json:"record_keys,omitempty"`
}
type FabricRegistryResolveRequest struct {
ClusterID string
Service string
Scope string
OrganizationID string
PreferredRegion string
Now time.Time
}
type FabricRegistryResolvedService struct {
Found bool `json:"found"`
Service string `json:"service"`
Scope string `json:"scope,omitempty"`
OrganizationID string `json:"organization_id,omitempty"`
RecordEpoch int64 `json:"record_epoch,omitempty"`
RecordHash string `json:"record_hash,omitempty"`
Endpoints []FabricRegistryEndpoint `json:"endpoints,omitempty"`
Reason string `json:"reason,omitempty"`
}
type FabricRegistryLiveProbeRequest struct {
ClusterID string
PreferredRegion string
Timeout time.Duration
Now time.Time
MaxCandidates int
}
type FabricRegistryLiveProbeResult struct {
Service string `json:"service"`
Scope string `json:"scope"`
OrganizationID string `json:"organization_id,omitempty"`
EndpointID string `json:"endpoint_id,omitempty"`
Address string `json:"address,omitempty"`
Status string `json:"status"`
LatencyMs int64 `json:"latency_ms,omitempty"`
Promoted bool `json:"promoted"`
Error string `json:"error,omitempty"`
}
type FabricRegistrySnapshot struct {
Active int `json:"active"`
Candidate int `json:"candidate"`
ActiveKeys []string `json:"active_keys,omitempty"`
CandidateKeys []string `json:"candidate_keys,omitempty"`
}
type FabricRegistry struct {
entries map[string]FabricRegistryEntry
candidates map[string]FabricRegistryEntry
}
func NewFabricRegistry() *FabricRegistry {
return &FabricRegistry{entries: map[string]FabricRegistryEntry{}, candidates: map[string]FabricRegistryEntry{}}
}
func LoadFabricRegistryBootstrapRecords(recordsJSON string, policy FabricRegistryVerificationPolicy, liveVerified bool) (*FabricRegistry, FabricRegistryBootstrapReport, error) {
registry := NewFabricRegistry()
recordsJSON = strings.TrimSpace(recordsJSON)
if recordsJSON == "" {
return registry, FabricRegistryBootstrapReport{}, nil
}
var records []FabricRegistryGossipRecord
if err := json.Unmarshal([]byte(recordsJSON), &records); err != nil {
return nil, FabricRegistryBootstrapReport{}, fmt.Errorf("decode fabric registry bootstrap records: %w", err)
}
report := FabricRegistryBootstrapReport{Total: len(records)}
for _, record := range records {
entry, changed, err := registry.ApplyGossipRecord(record, policy, liveVerified)
if err != nil {
report.Rejected++
report.Rejects = append(report.Rejects, err.Error())
continue
}
if !changed {
continue
}
report.RecordKeys = append(report.RecordKeys, fabricRegistryRecordKey(record))
switch entry.State {
case FabricRegistryActive:
report.Active++
case FabricRegistryCandidate:
report.Candidate++
}
}
return registry, report, nil
}
func (r *FabricRegistry) ApplyGossipRecord(record FabricRegistryGossipRecord, policy FabricRegistryVerificationPolicy, liveVerified bool) (FabricRegistryEntry, bool, error) {
if r == nil {
return FabricRegistryEntry{}, false, fmt.Errorf("fabric registry is nil")
}
result, err := VerifyFabricRegistryGossipRecord(record, policy)
if err != nil {
return FabricRegistryEntry{}, false, err
}
now := registryNow(policy.Now)
key := fabricRegistryRecordKey(record)
current, exists := r.entries[key]
if exists && !fabricRegistryRecordNewer(record, current.Record, now) {
return current, false, nil
}
state := FabricRegistryCandidate
var promotedAt *time.Time
if liveVerified {
state = FabricRegistryActive
t := now
promotedAt = &t
}
entry := FabricRegistryEntry{
Record: normalizeFabricRegistryRecord(record),
State: state,
AcceptedAt: now,
PromotedAt: promotedAt,
VerifyResult: result,
}
if state == FabricRegistryActive {
r.entries[key] = entry
delete(r.candidates, key)
return entry, true, nil
}
if r.candidates == nil {
r.candidates = map[string]FabricRegistryEntry{}
}
r.candidates[key] = entry
return entry, true, nil
}
func (r *FabricRegistry) MarkLiveVerified(clusterID, service, scope, organizationID string, now time.Time) bool {
if r == nil {
return false
}
key := fabricRegistryKey(clusterID, service, scope, organizationID)
entry, ok := r.candidates[key]
if !ok || entry.State == FabricRegistryExpired || entry.State == FabricRegistryRejected {
return false
}
t := registryNow(now)
entry.State = FabricRegistryActive
entry.PromotedAt = &t
r.entries[key] = entry
delete(r.candidates, key)
return true
}
func (r *FabricRegistry) Active(clusterID, service, scope, organizationID string, now time.Time) (FabricRegistryGossipRecord, bool) {
if r == nil {
return FabricRegistryGossipRecord{}, false
}
entry, ok := r.entries[fabricRegistryKey(clusterID, service, scope, organizationID)]
if !ok || entry.State != FabricRegistryActive || !entry.Record.ExpiresAt.After(registryNow(now)) {
return FabricRegistryGossipRecord{}, false
}
return entry.Record, true
}
func (r *FabricRegistry) ResolveService(req FabricRegistryResolveRequest) FabricRegistryResolvedService {
service := strings.ToLower(strings.TrimSpace(req.Service))
if service == "" {
return FabricRegistryResolvedService{Found: false, Reason: "service_required"}
}
scopeOrder := fabricRegistryScopeResolutionOrder(req.Scope, req.OrganizationID)
if resolved := r.resolveServiceFromRecords(req, service, scopeOrder, false); resolved.Found || resolved.Reason == "no_usable_endpoints" {
return resolved
}
if resolved := r.resolveServiceFromRecords(req, service, scopeOrder, true); resolved.Found || resolved.Reason == "no_usable_endpoints" {
return resolved
}
return FabricRegistryResolvedService{Found: false, Service: service, Reason: "no_active_record"}
}
func (r *FabricRegistry) resolveServiceFromRecords(req FabricRegistryResolveRequest, service string, scopeOrder []string, candidateOnly bool) FabricRegistryResolvedService {
for _, scope := range scopeOrder {
organizationID := strings.TrimSpace(req.OrganizationID)
if scope != FabricRegistryScopeOrganization {
organizationID = ""
}
var record FabricRegistryGossipRecord
var ok bool
if candidateOnly {
record, ok = r.Candidate(req.ClusterID, service, scope, organizationID, req.Now)
} else {
record, ok = r.Active(req.ClusterID, service, scope, organizationID, req.Now)
}
if !ok {
continue
}
endpoints := selectFabricRegistryEndpoints(record.Endpoints, req.PreferredRegion)
if len(endpoints) == 0 {
return FabricRegistryResolvedService{Found: false, Service: service, Scope: scope, OrganizationID: organizationID, Reason: "no_usable_endpoints"}
}
result, _ := canonicalFabricRegistryPayload(record)
sum := sha256.Sum256(result)
return FabricRegistryResolvedService{
Found: true,
Service: service,
Scope: scope,
OrganizationID: organizationID,
RecordEpoch: record.Epoch,
RecordHash: hex.EncodeToString(sum[:]),
Endpoints: endpoints,
Reason: fabricRegistryResolveReason(candidateOnly),
}
}
return FabricRegistryResolvedService{Found: false, Service: service}
}
func (r *FabricRegistry) Candidate(clusterID, service, scope, organizationID string, now time.Time) (FabricRegistryGossipRecord, bool) {
if r == nil {
return FabricRegistryGossipRecord{}, false
}
entry, ok := r.candidates[fabricRegistryKey(clusterID, service, scope, organizationID)]
if !ok || entry.State != FabricRegistryCandidate || !entry.Record.ExpiresAt.After(registryNow(now)) {
return FabricRegistryGossipRecord{}, false
}
return entry.Record, true
}
func fabricRegistryResolveReason(candidateOnly bool) string {
if candidateOnly {
return "candidate_record_pending_live_verification"
}
return ""
}
func (r *FabricRegistry) Snapshot(now time.Time) FabricRegistrySnapshot {
if r == nil {
return FabricRegistrySnapshot{}
}
now = registryNow(now)
out := FabricRegistrySnapshot{}
for key, entry := range r.entries {
if entry.State == FabricRegistryActive && entry.Record.ExpiresAt.After(now) {
out.Active++
out.ActiveKeys = append(out.ActiveKeys, key)
}
}
for key, entry := range r.candidates {
if entry.State == FabricRegistryCandidate && entry.Record.ExpiresAt.After(now) {
out.Candidate++
out.CandidateKeys = append(out.CandidateKeys, key)
}
}
sort.Strings(out.ActiveKeys)
sort.Strings(out.CandidateKeys)
return out
}
func (r *FabricRegistry) VerifyCandidates(ctx context.Context, transport FabricTransport, req FabricRegistryLiveProbeRequest) []FabricRegistryLiveProbeResult {
if r == nil {
return nil
}
now := registryNow(req.Now)
timeout := req.Timeout
if timeout <= 0 {
timeout = 2 * time.Second
}
maxCandidates := req.MaxCandidates
if maxCandidates <= 0 {
maxCandidates = 16
}
candidates := make([]FabricRegistryEntry, 0, len(r.candidates))
for _, entry := range r.candidates {
if entry.State != FabricRegistryCandidate || !entry.Record.ExpiresAt.After(now) {
continue
}
if clusterID := strings.TrimSpace(req.ClusterID); clusterID != "" && entry.Record.ClusterID != clusterID {
continue
}
candidates = append(candidates, entry)
}
sort.SliceStable(candidates, func(i, j int) bool {
if candidates[i].Record.Service != candidates[j].Record.Service {
return candidates[i].Record.Service < candidates[j].Record.Service
}
if candidates[i].Record.Scope != candidates[j].Record.Scope {
return candidates[i].Record.Scope < candidates[j].Record.Scope
}
return candidates[i].Record.Epoch > candidates[j].Record.Epoch
})
if len(candidates) > maxCandidates {
candidates = candidates[:maxCandidates]
}
results := make([]FabricRegistryLiveProbeResult, 0, len(candidates))
for _, entry := range candidates {
record := entry.Record
result := FabricRegistryLiveProbeResult{
Service: record.Service,
Scope: record.Scope,
OrganizationID: record.OrganizationID,
Status: "unreachable",
}
endpoints := selectFabricRegistryEndpoints(record.Endpoints, req.PreferredRegion)
if len(endpoints) == 0 {
result.Error = "no_usable_endpoints"
results = append(results, result)
continue
}
for _, endpoint := range endpoints {
probeCtx, cancel := context.WithTimeout(ctx, timeout)
latency, err := probeFabricRegistryEndpoint(probeCtx, transport, endpoint, timeout)
cancel()
result.EndpointID = endpoint.EndpointID
result.Address = endpoint.Address
if err != nil {
result.Error = err.Error()
continue
}
result.Status = "reachable"
result.LatencyMs = latency.Milliseconds()
result.Promoted = r.MarkLiveVerified(record.ClusterID, record.Service, record.Scope, record.OrganizationID, now)
result.Error = ""
break
}
results = append(results, result)
}
return results
}
func SignFabricRegistryGossipRecord(record FabricRegistryGossipRecord, issuer FabricRegistryTrustedIssuer, privateKey ed25519.PrivateKey) (FabricRegistryGossipRecord, error) {
payload, err := canonicalFabricRegistryPayload(record)
if err != nil {
return record, err
}
sig := ed25519.Sign(privateKey, payload)
record.Signatures = append(record.Signatures, FabricRegistrySignature{
KeyID: firstNonEmpty(issuer.IssuerID, record.IssuerNodeID),
IssuerID: firstNonEmpty(issuer.IssuerID, record.IssuerNodeID),
Role: firstNonEmpty(issuer.Role, record.IssuerRole),
Alg: "ed25519",
Value: hex.EncodeToString(sig),
})
return record, nil
}
func VerifyFabricRegistryGossipRecord(record FabricRegistryGossipRecord, policy FabricRegistryVerificationPolicy) (FabricRegistryVerificationResult, error) {
record = normalizeFabricRegistryRecord(record)
if err := validateFabricRegistryGossipRecord(record, policy); err != nil {
return FabricRegistryVerificationResult{}, err
}
payload, err := canonicalFabricRegistryPayload(record)
if err != nil {
return FabricRegistryVerificationResult{}, err
}
sum := sha256.Sum256(payload)
trusted := map[string]FabricRegistryTrustedIssuer{}
for _, issuer := range policy.TrustedIssuers {
if strings.TrimSpace(issuer.IssuerID) != "" {
trusted[issuer.IssuerID] = issuer
}
if strings.TrimSpace(issuer.IssuerID) != "" && strings.TrimSpace(issuer.Role) != "" {
trusted[issuer.IssuerID+"\x00"+issuer.Role] = issuer
}
}
accepted := map[string]struct{}{}
for _, signature := range record.Signatures {
if strings.ToLower(strings.TrimSpace(signature.Alg)) != "ed25519" {
continue
}
issuer, ok := trusted[strings.TrimSpace(signature.IssuerID)+"\x00"+strings.TrimSpace(signature.Role)]
if !ok {
issuer, ok = trusted[strings.TrimSpace(signature.IssuerID)]
}
if !ok || !fabricRegistryIssuerAllowed(issuer, record) {
continue
}
rawSig, err := hex.DecodeString(strings.TrimSpace(signature.Value))
if err != nil || len(rawSig) != ed25519.SignatureSize || len(issuer.PublicKey) != ed25519.PublicKeySize {
continue
}
if ed25519.Verify(issuer.PublicKey, payload, rawSig) {
accepted[signature.IssuerID] = struct{}{}
}
}
required := policy.RequiredSignatures
if required <= 0 {
required = 1
}
if len(accepted) < required {
return FabricRegistryVerificationResult{RecordHash: hex.EncodeToString(sum[:])}, fmt.Errorf("fabric registry gossip record lacks required trusted signatures")
}
issuers := make([]string, 0, len(accepted))
for issuer := range accepted {
issuers = append(issuers, issuer)
}
sort.Strings(issuers)
return FabricRegistryVerificationResult{
AcceptedSignatureCount: len(accepted),
AcceptedIssuers: issuers,
RecordHash: hex.EncodeToString(sum[:]),
}, nil
}
func validateFabricRegistryGossipRecord(record FabricRegistryGossipRecord, policy FabricRegistryVerificationPolicy) error {
if record.SchemaVersion != FabricRegistryGossipRecordSchema {
return fmt.Errorf("fabric registry gossip record schema_version is invalid")
}
if strings.TrimSpace(record.ClusterID) == "" || (strings.TrimSpace(policy.LocalClusterID) != "" && record.ClusterID != policy.LocalClusterID) {
return ErrClusterMismatch
}
if strings.TrimSpace(record.Service) == "" || strings.TrimSpace(record.Scope) == "" || strings.TrimSpace(record.IssuerNodeID) == "" || strings.TrimSpace(record.IssuerRole) == "" {
return fmt.Errorf("fabric registry gossip record is missing service, scope, or issuer")
}
if record.Epoch <= 0 || record.IssuedAt.IsZero() || record.ExpiresAt.IsZero() || !record.ExpiresAt.After(record.IssuedAt) {
return fmt.Errorf("fabric registry gossip record has invalid epoch or validity window")
}
now := registryNow(policy.Now)
skew := policy.MaxClockSkew
if skew <= 0 {
skew = time.Minute
}
if record.IssuedAt.After(now.Add(skew)) || !record.ExpiresAt.After(now) {
return fmt.Errorf("fabric registry gossip record is not currently valid")
}
if len(record.Endpoints) == 0 {
return fmt.Errorf("fabric registry gossip record has no endpoints")
}
for _, endpoint := range record.Endpoints {
if strings.TrimSpace(endpoint.EndpointID) == "" || strings.TrimSpace(endpoint.Address) == "" || strings.TrimSpace(endpoint.Transport) == "" {
return fmt.Errorf("fabric registry gossip record contains invalid endpoint")
}
if !isQUICOnlyCandidateTransport(endpoint.Transport) || hasUnsupportedEndpointScheme(endpoint.Address) {
return fmt.Errorf("fabric registry gossip endpoint must be QUIC-only")
}
if len(endpoint.Metadata) > 0 && !json.Valid(endpoint.Metadata) {
return fmt.Errorf("fabric registry gossip endpoint metadata is invalid")
}
}
if len(record.Metadata) > 0 && !json.Valid(record.Metadata) {
return fmt.Errorf("fabric registry gossip metadata is invalid")
}
return nil
}
func canonicalFabricRegistryPayload(record FabricRegistryGossipRecord) ([]byte, error) {
record = normalizeFabricRegistryRecord(record)
record.Signatures = nil
payload, err := json.Marshal(record)
if err != nil {
return nil, err
}
var compact bytes.Buffer
if err := json.Compact(&compact, payload); err != nil {
return nil, err
}
return compact.Bytes(), nil
}
func normalizeFabricRegistryRecord(record FabricRegistryGossipRecord) FabricRegistryGossipRecord {
record.SchemaVersion = strings.TrimSpace(record.SchemaVersion)
record.ClusterID = strings.TrimSpace(record.ClusterID)
record.Service = strings.ToLower(strings.TrimSpace(record.Service))
record.Scope = strings.ToLower(strings.TrimSpace(record.Scope))
record.OrganizationID = strings.TrimSpace(record.OrganizationID)
record.IssuerNodeID = strings.TrimSpace(record.IssuerNodeID)
record.IssuerRole = strings.TrimSpace(record.IssuerRole)
record.Generation = strings.TrimSpace(record.Generation)
for i := range record.Endpoints {
record.Endpoints[i].EndpointID = strings.TrimSpace(record.Endpoints[i].EndpointID)
record.Endpoints[i].Address = strings.TrimSpace(record.Endpoints[i].Address)
record.Endpoints[i].Transport = strings.TrimSpace(record.Endpoints[i].Transport)
record.Endpoints[i].Reachability = strings.TrimSpace(record.Endpoints[i].Reachability)
record.Endpoints[i].ConnectivityMode = strings.TrimSpace(record.Endpoints[i].ConnectivityMode)
record.Endpoints[i].Region = strings.TrimSpace(record.Endpoints[i].Region)
record.Endpoints[i].PeerCertSHA256 = normalizeCertSHA256(record.Endpoints[i].PeerCertSHA256)
}
sort.SliceStable(record.Endpoints, func(i, j int) bool {
if record.Endpoints[i].Priority != record.Endpoints[j].Priority {
return record.Endpoints[i].Priority < record.Endpoints[j].Priority
}
return record.Endpoints[i].EndpointID < record.Endpoints[j].EndpointID
})
sort.SliceStable(record.Signatures, func(i, j int) bool {
if record.Signatures[i].IssuerID != record.Signatures[j].IssuerID {
return record.Signatures[i].IssuerID < record.Signatures[j].IssuerID
}
return record.Signatures[i].KeyID < record.Signatures[j].KeyID
})
return record
}
func fabricRegistryIssuerAllowed(issuer FabricRegistryTrustedIssuer, record FabricRegistryGossipRecord) bool {
if strings.TrimSpace(issuer.Role) != "" && issuer.Role != record.IssuerRole {
return false
}
if len(issuer.Scopes) > 0 && !stringInSlice(record.Scope, issuer.Scopes) {
return false
}
if len(issuer.Services) > 0 && !stringInSlice(record.Service, issuer.Services) {
return false
}
return true
}
func fabricRegistryRecordKey(record FabricRegistryGossipRecord) string {
return fabricRegistryKey(record.ClusterID, record.Service, record.Scope, record.OrganizationID)
}
func fabricRegistryScopeResolutionOrder(scope string, organizationID string) []string {
scope = strings.ToLower(strings.TrimSpace(scope))
switch scope {
case FabricRegistryScopeOrganization:
if strings.TrimSpace(organizationID) != "" {
return []string{FabricRegistryScopeOrganization, FabricRegistryScopeCluster, FabricRegistryScopeFarm}
}
return []string{FabricRegistryScopeCluster, FabricRegistryScopeFarm}
case FabricRegistryScopeFarm:
return []string{FabricRegistryScopeFarm}
case FabricRegistryScopeCluster, "":
return []string{FabricRegistryScopeCluster, FabricRegistryScopeFarm}
default:
return []string{scope, FabricRegistryScopeCluster, FabricRegistryScopeFarm}
}
}
func selectFabricRegistryEndpoints(endpoints []FabricRegistryEndpoint, preferredRegion string) []FabricRegistryEndpoint {
preferredRegion = strings.TrimSpace(preferredRegion)
out := make([]FabricRegistryEndpoint, 0, len(endpoints))
for _, endpoint := range endpoints {
if strings.TrimSpace(endpoint.Address) == "" || !isQUICOnlyCandidateTransport(endpoint.Transport) || hasUnsupportedEndpointScheme(endpoint.Address) {
continue
}
out = append(out, endpoint)
}
sort.SliceStable(out, func(i, j int) bool {
if preferredRegion != "" {
iMatch := strings.EqualFold(out[i].Region, preferredRegion)
jMatch := strings.EqualFold(out[j].Region, preferredRegion)
if iMatch != jMatch {
return iMatch
}
}
if out[i].Priority != out[j].Priority {
return out[i].Priority < out[j].Priority
}
if out[i].Weight != out[j].Weight {
return out[i].Weight > out[j].Weight
}
return out[i].EndpointID < out[j].EndpointID
})
return out
}
func probeFabricRegistryEndpoint(ctx context.Context, transport FabricTransport, endpoint FabricRegistryEndpoint, timeout time.Duration) (time.Duration, error) {
if transport == nil {
return 0, fmt.Errorf("fabric registry live probe transport is unavailable")
}
if timeout <= 0 {
timeout = 2 * time.Second
}
target := FabricTransportTargetFromRegistryEndpoint(endpoint)
target.Timeout = timeout
target.InboundBuffer = 2
target.ErrorBuffer = 2
startedAt := time.Now()
session, err := transport.Connect(ctx, target)
if err != nil {
return 0, err
}
defer session.Close()
sequence := uint64(startedAt.UnixNano())
if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FramePing, TrafficClass: fabricproto.TrafficClassReliable, Sequence: sequence, Payload: []byte("fabric-registry-live-probe")}); err != nil {
return 0, err
}
for {
select {
case frame, ok := <-session.Frames():
if !ok {
return 0, fmt.Errorf("fabric registry live probe session closed")
}
if frame.Type == fabricproto.FramePong && frame.Sequence == sequence {
return time.Since(startedAt), nil
}
case err, ok := <-session.Errors():
if !ok {
return 0, fmt.Errorf("fabric registry live probe error channel closed")
}
if err != nil {
return 0, err
}
case <-ctx.Done():
return 0, ctx.Err()
}
}
}
func fabricRegistryKey(clusterID, service, scope, organizationID string) string {
return strings.TrimSpace(clusterID) + "\x00" + strings.ToLower(strings.TrimSpace(service)) + "\x00" + strings.ToLower(strings.TrimSpace(scope)) + "\x00" + strings.TrimSpace(organizationID)
}
func fabricRegistryRecordNewer(next, current FabricRegistryGossipRecord, now time.Time) bool {
if !current.ExpiresAt.After(now) {
return true
}
if next.Epoch != current.Epoch {
return next.Epoch > current.Epoch
}
if !next.IssuedAt.Equal(current.IssuedAt) {
return next.IssuedAt.After(current.IssuedAt)
}
return strings.TrimSpace(next.Generation) > strings.TrimSpace(current.Generation)
}
func registryNow(now time.Time) time.Time {
if now.IsZero() {
return time.Now().UTC()
}
return now.UTC()
}
func stringInSlice(value string, values []string) bool {
value = strings.TrimSpace(value)
for _, candidate := range values {
if strings.TrimSpace(candidate) == value {
return true
}
}
return false
}