380 lines
12 KiB
Go
380 lines
12 KiB
Go
package mesh
|
|
|
|
import (
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const DefaultWarmPeerLimit = 8
|
|
|
|
type PeerCacheConfig struct {
|
|
Local PeerIdentity
|
|
PeerEndpoints map[string]string
|
|
PeerEndpointCandidates map[string][]PeerEndpointCandidate
|
|
PeerDirectory []PeerDirectoryEntry
|
|
RecoverySeeds []PeerRecoverySeed
|
|
RendezvousLeases []PeerRendezvousLease
|
|
Routes []SyntheticRoute
|
|
WarmPeerLimit int
|
|
PreferredRegion string
|
|
Now time.Time
|
|
}
|
|
|
|
type PeerCache struct {
|
|
snapshot PeerCacheSnapshot
|
|
}
|
|
|
|
type PeerCacheSnapshot struct {
|
|
ClusterID string `json:"cluster_id"`
|
|
LocalNodeID string `json:"local_node_id"`
|
|
PeerCount int `json:"peer_count"`
|
|
WarmPeerCount int `json:"warm_peer_count"`
|
|
RecoverySeedCount int `json:"recovery_seed_count"`
|
|
RendezvousLeaseCount int `json:"rendezvous_lease_count"`
|
|
BuiltAt time.Time `json:"built_at"`
|
|
Entries []PeerCacheEntry `json:"entries"`
|
|
}
|
|
|
|
type PeerCacheEntry struct {
|
|
NodeID string `json:"node_id"`
|
|
RouteIDs []string `json:"route_ids,omitempty"`
|
|
Endpoint string `json:"endpoint,omitempty"`
|
|
EndpointCount int `json:"endpoint_count"`
|
|
CandidateCount int `json:"candidate_count"`
|
|
ConnectivityModes []string `json:"connectivity_modes,omitempty"`
|
|
RecoverySeed bool `json:"recovery_seed"`
|
|
Warm bool `json:"warm"`
|
|
WarmReason string `json:"warm_reason,omitempty"`
|
|
BestCandidateID string `json:"best_candidate_id,omitempty"`
|
|
BestCandidateAddr string `json:"best_candidate_addr,omitempty"`
|
|
BestTransport string `json:"best_transport,omitempty"`
|
|
BestReachability string `json:"best_reachability,omitempty"`
|
|
BestConnectivity string `json:"best_connectivity,omitempty"`
|
|
BestNATType string `json:"best_nat_type,omitempty"`
|
|
BestPolicyTags []string `json:"best_policy_tags,omitempty"`
|
|
BestCandidateScore int `json:"best_candidate_score,omitempty"`
|
|
EndpointCandidates []PeerEndpointCandidate `json:"endpoint_candidates,omitempty"`
|
|
RendezvousLeaseID string `json:"rendezvous_lease_id,omitempty"`
|
|
RelayNodeID string `json:"relay_node_id,omitempty"`
|
|
RelayEndpoint string `json:"relay_endpoint,omitempty"`
|
|
RelayControl bool `json:"relay_control"`
|
|
}
|
|
|
|
type peerCacheBuildEntry struct {
|
|
PeerCacheEntry
|
|
adjacentRoutePeer bool
|
|
bestScore int
|
|
}
|
|
|
|
func NewPeerCache(cfg PeerCacheConfig) *PeerCache {
|
|
now := cfg.Now.UTC()
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
}
|
|
limit := cfg.WarmPeerLimit
|
|
if limit <= 0 {
|
|
limit = DefaultWarmPeerLimit
|
|
}
|
|
entries := map[string]*peerCacheBuildEntry{}
|
|
for _, item := range cfg.PeerDirectory {
|
|
nodeID := strings.TrimSpace(item.NodeID)
|
|
if nodeID == "" || nodeID == cfg.Local.NodeID {
|
|
continue
|
|
}
|
|
entry := peerCacheEntry(entries, nodeID)
|
|
entry.RouteIDs = mergeStrings(entry.RouteIDs, item.RouteIDs)
|
|
entry.EndpointCount = maxInt(entry.EndpointCount, item.EndpointCount)
|
|
entry.CandidateCount = maxInt(entry.CandidateCount, item.CandidateCount)
|
|
entry.ConnectivityModes = mergeStrings(entry.ConnectivityModes, item.ConnectivityModes)
|
|
entry.RecoverySeed = entry.RecoverySeed || item.RecoverySeed
|
|
}
|
|
for nodeID, endpoint := range cfg.PeerEndpoints {
|
|
nodeID = strings.TrimSpace(nodeID)
|
|
endpoint = strings.TrimSpace(endpoint)
|
|
if nodeID == "" || nodeID == cfg.Local.NodeID || endpoint == "" {
|
|
continue
|
|
}
|
|
entry := peerCacheEntry(entries, nodeID)
|
|
entry.Endpoint = endpoint
|
|
entry.EndpointCount = maxInt(entry.EndpointCount, 1)
|
|
}
|
|
for nodeID, candidates := range cfg.PeerEndpointCandidates {
|
|
nodeID = strings.TrimSpace(nodeID)
|
|
if nodeID == "" || nodeID == cfg.Local.NodeID || len(candidates) == 0 {
|
|
continue
|
|
}
|
|
entry := peerCacheEntry(entries, nodeID)
|
|
entry.CandidateCount = maxInt(entry.CandidateCount, len(candidates))
|
|
for _, candidate := range candidates {
|
|
if strings.TrimSpace(candidate.ConnectivityMode) != "" {
|
|
entry.ConnectivityModes = mergeStrings(entry.ConnectivityModes, []string{candidate.ConnectivityMode})
|
|
}
|
|
}
|
|
scored := RankPeerEndpointCandidates(candidates, EndpointCandidateScoreOptions{
|
|
ChannelClass: SyntheticChannelFabricControl,
|
|
PreferredRegion: cfg.PreferredRegion,
|
|
Now: now,
|
|
MaxVerificationAge: time.Hour,
|
|
})
|
|
if len(scored) > 0 {
|
|
entry.EndpointCandidates = make([]PeerEndpointCandidate, 0, len(scored))
|
|
for _, scoredCandidate := range scored {
|
|
entry.EndpointCandidates = append(entry.EndpointCandidates, scoredCandidate.Candidate)
|
|
}
|
|
entry.BestCandidateID = scored[0].Candidate.EndpointID
|
|
entry.BestCandidateAddr = scored[0].Candidate.Address
|
|
entry.BestTransport = scored[0].Candidate.Transport
|
|
entry.BestReachability = scored[0].Candidate.Reachability
|
|
entry.BestConnectivity = scored[0].Candidate.ConnectivityMode
|
|
entry.BestNATType = scored[0].Candidate.NATType
|
|
entry.BestPolicyTags = append([]string{}, scored[0].Candidate.PolicyTags...)
|
|
entry.BestCandidateScore = scored[0].Score
|
|
entry.bestScore = scored[0].Score
|
|
if strings.TrimSpace(scored[0].Candidate.Address) != "" {
|
|
entry.Endpoint = strings.TrimSpace(scored[0].Candidate.Address)
|
|
}
|
|
}
|
|
}
|
|
for _, route := range cfg.Routes {
|
|
path := routePath(route)
|
|
localIndex := indexOf(path, cfg.Local.NodeID)
|
|
if localIndex < 0 {
|
|
continue
|
|
}
|
|
for _, nodeID := range path {
|
|
if nodeID == "" || nodeID == cfg.Local.NodeID {
|
|
continue
|
|
}
|
|
entry := peerCacheEntry(entries, nodeID)
|
|
entry.RouteIDs = mergeStrings(entry.RouteIDs, []string{route.RouteID})
|
|
}
|
|
for _, adjacentIndex := range []int{localIndex - 1, localIndex + 1} {
|
|
if adjacentIndex < 0 || adjacentIndex >= len(path) {
|
|
continue
|
|
}
|
|
nodeID := path[adjacentIndex]
|
|
if nodeID == "" || nodeID == cfg.Local.NodeID {
|
|
continue
|
|
}
|
|
peerCacheEntry(entries, nodeID).adjacentRoutePeer = true
|
|
}
|
|
}
|
|
for _, seed := range cfg.RecoverySeeds {
|
|
nodeID := strings.TrimSpace(seed.NodeID)
|
|
if nodeID == "" || nodeID == cfg.Local.NodeID {
|
|
continue
|
|
}
|
|
entry := peerCacheEntry(entries, nodeID)
|
|
entry.RecoverySeed = true
|
|
if entry.Endpoint == "" {
|
|
entry.Endpoint = strings.TrimSpace(seed.Endpoint)
|
|
}
|
|
if strings.TrimSpace(seed.ConnectivityMode) != "" {
|
|
entry.ConnectivityModes = mergeStrings(entry.ConnectivityModes, []string{seed.ConnectivityMode})
|
|
}
|
|
}
|
|
rendezvousLeases := 0
|
|
for _, lease := range cfg.RendezvousLeases {
|
|
if !leaseUsableForPeerCache(lease, cfg.Local.NodeID, now) {
|
|
continue
|
|
}
|
|
rendezvousLeases++
|
|
if lease.PeerNodeID != cfg.Local.NodeID {
|
|
entry := peerCacheEntry(entries, lease.PeerNodeID)
|
|
useLeaseEndpoint := shouldUseRendezvousEndpoint(*entry)
|
|
entry.RendezvousLeaseID = lease.LeaseID
|
|
entry.RelayNodeID = lease.RelayNodeID
|
|
entry.RelayEndpoint = strings.TrimRight(strings.TrimSpace(lease.RelayEndpoint), "/")
|
|
entry.RelayControl = true
|
|
entry.CandidateCount = maxInt(entry.CandidateCount, 1)
|
|
entry.ConnectivityModes = mergeStrings(entry.ConnectivityModes, []string{firstNonEmpty(lease.ConnectivityMode, "relay_required"), "relay_control"})
|
|
if useLeaseEndpoint {
|
|
entry.BestTransport = firstNonEmpty(lease.Transport, "relay_control")
|
|
entry.BestReachability = "relay"
|
|
entry.BestConnectivity = firstNonEmpty(lease.ConnectivityMode, "relay_required")
|
|
entry.Endpoint = entry.RelayEndpoint
|
|
entry.BestCandidateID = lease.LeaseID
|
|
entry.BestCandidateAddr = entry.RelayEndpoint
|
|
entry.bestScore = maxInt(entry.bestScore, 500)
|
|
}
|
|
}
|
|
if lease.PeerNodeID == cfg.Local.NodeID && lease.RelayNodeID != "" && lease.RelayNodeID != cfg.Local.NodeID {
|
|
entry := peerCacheEntry(entries, lease.RelayNodeID)
|
|
if entry.Endpoint == "" {
|
|
entry.Endpoint = strings.TrimRight(strings.TrimSpace(lease.RelayEndpoint), "/")
|
|
}
|
|
entry.EndpointCount = maxInt(entry.EndpointCount, 1)
|
|
entry.ConnectivityModes = mergeStrings(entry.ConnectivityModes, []string{"relay_control"})
|
|
}
|
|
}
|
|
out := make([]peerCacheBuildEntry, 0, len(entries))
|
|
recoverySeeds := 0
|
|
for _, entry := range entries {
|
|
sort.Strings(entry.RouteIDs)
|
|
sort.Strings(entry.ConnectivityModes)
|
|
if entry.RecoverySeed {
|
|
recoverySeeds++
|
|
}
|
|
out = append(out, *entry)
|
|
}
|
|
sort.SliceStable(out, func(i, j int) bool {
|
|
left := warmPeerPriority(out[i])
|
|
right := warmPeerPriority(out[j])
|
|
if left != right {
|
|
return left > right
|
|
}
|
|
return out[i].NodeID < out[j].NodeID
|
|
})
|
|
warm := 0
|
|
for i := range out {
|
|
if warm >= limit {
|
|
break
|
|
}
|
|
if warmPeerPriority(out[i]) <= 0 {
|
|
continue
|
|
}
|
|
out[i].Warm = true
|
|
out[i].WarmReason = warmPeerReason(out[i])
|
|
warm++
|
|
}
|
|
sort.SliceStable(out, func(i, j int) bool {
|
|
return out[i].NodeID < out[j].NodeID
|
|
})
|
|
snapshotEntries := make([]PeerCacheEntry, 0, len(out))
|
|
for _, entry := range out {
|
|
snapshotEntries = append(snapshotEntries, entry.PeerCacheEntry)
|
|
}
|
|
return &PeerCache{snapshot: PeerCacheSnapshot{
|
|
ClusterID: cfg.Local.ClusterID,
|
|
LocalNodeID: cfg.Local.NodeID,
|
|
PeerCount: len(snapshotEntries),
|
|
WarmPeerCount: warm,
|
|
RecoverySeedCount: recoverySeeds,
|
|
RendezvousLeaseCount: rendezvousLeases,
|
|
BuiltAt: now,
|
|
Entries: snapshotEntries,
|
|
}}
|
|
}
|
|
|
|
func (c *PeerCache) Snapshot() PeerCacheSnapshot {
|
|
if c == nil {
|
|
return PeerCacheSnapshot{}
|
|
}
|
|
snapshot := c.snapshot
|
|
snapshot.Entries = append([]PeerCacheEntry{}, c.snapshot.Entries...)
|
|
return snapshot
|
|
}
|
|
|
|
func (c *PeerCache) WarmPeerIDs() []string {
|
|
snapshot := c.Snapshot()
|
|
out := make([]string, 0, snapshot.WarmPeerCount)
|
|
for _, entry := range snapshot.Entries {
|
|
if entry.Warm {
|
|
out = append(out, entry.NodeID)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func peerCacheEntry(entries map[string]*peerCacheBuildEntry, nodeID string) *peerCacheBuildEntry {
|
|
if entry, ok := entries[nodeID]; ok {
|
|
return entry
|
|
}
|
|
entry := &peerCacheBuildEntry{PeerCacheEntry: PeerCacheEntry{NodeID: nodeID}}
|
|
entries[nodeID] = entry
|
|
return entry
|
|
}
|
|
|
|
func warmPeerPriority(entry peerCacheBuildEntry) int {
|
|
score := 0
|
|
if entry.adjacentRoutePeer {
|
|
score += 1000
|
|
}
|
|
if entry.RecoverySeed {
|
|
score += 500
|
|
}
|
|
if entry.Endpoint != "" {
|
|
score += 100
|
|
}
|
|
if entry.bestScore > 0 {
|
|
score += entry.bestScore
|
|
}
|
|
if entry.RelayControl {
|
|
score += 300
|
|
}
|
|
score += entry.CandidateCount
|
|
return score
|
|
}
|
|
|
|
func warmPeerReason(entry peerCacheBuildEntry) string {
|
|
if entry.adjacentRoutePeer {
|
|
return "route_adjacent"
|
|
}
|
|
if entry.RecoverySeed {
|
|
return "recovery_seed"
|
|
}
|
|
if entry.RelayControl {
|
|
return "rendezvous_lease"
|
|
}
|
|
if entry.BestCandidateID != "" {
|
|
return "endpoint_candidate"
|
|
}
|
|
if entry.Endpoint != "" {
|
|
return "peer_endpoint"
|
|
}
|
|
return "scoped_peer"
|
|
}
|
|
|
|
func leaseUsableForPeerCache(lease PeerRendezvousLease, localNodeID string, now time.Time) bool {
|
|
if strings.TrimSpace(lease.LeaseID) == "" ||
|
|
strings.TrimSpace(lease.PeerNodeID) == "" ||
|
|
strings.TrimSpace(lease.RelayNodeID) == "" ||
|
|
strings.TrimSpace(lease.RelayEndpoint) == "" ||
|
|
lease.ExpiresAt.IsZero() ||
|
|
!lease.ExpiresAt.After(now) ||
|
|
!lease.ControlPlaneOnly {
|
|
return false
|
|
}
|
|
return lease.PeerNodeID != localNodeID || lease.RelayNodeID != localNodeID
|
|
}
|
|
|
|
func shouldUseRendezvousEndpoint(entry peerCacheBuildEntry) bool {
|
|
if strings.TrimSpace(entry.Endpoint) == "" {
|
|
return true
|
|
}
|
|
transport := strings.ToLower(strings.TrimSpace(entry.BestTransport))
|
|
reachability := strings.ToLower(strings.TrimSpace(entry.BestReachability))
|
|
connectivity := strings.ToLower(strings.TrimSpace(entry.BestConnectivity))
|
|
return strings.Contains(transport, "relay") ||
|
|
strings.Contains(transport, "outbound") ||
|
|
reachability == "relay" ||
|
|
reachability == "outbound_only" ||
|
|
connectivity == "relay_required" ||
|
|
connectivity == "outbound_only"
|
|
}
|
|
|
|
func mergeStrings(existing []string, incoming []string) []string {
|
|
seen := map[string]struct{}{}
|
|
out := make([]string, 0, len(existing)+len(incoming))
|
|
for _, value := range append(existing, incoming...) {
|
|
value = strings.TrimSpace(value)
|
|
if value == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[value]; ok {
|
|
continue
|
|
}
|
|
seen[value] = struct{}{}
|
|
out = append(out, value)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func maxInt(left, right int) int {
|
|
if left > right {
|
|
return left
|
|
}
|
|
return right
|
|
}
|