414 lines
15 KiB
Go
414 lines
15 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
PeerConnectionProbeReachable = "reachable"
|
|
PeerConnectionProbeUnreachable = "unreachable"
|
|
PeerConnectionProbeDeferred = "deferred"
|
|
PeerConnectionProbeSkipped = "skipped"
|
|
)
|
|
|
|
const (
|
|
DefaultPeerConnectionProbeTimeout = 2 * time.Second
|
|
)
|
|
|
|
type PeerConnectionManagerConfig struct {
|
|
Local PeerIdentity
|
|
PeerCache *PeerCache
|
|
Tracker *PeerConnectionTracker
|
|
RendezvousLeases []PeerRendezvousLease
|
|
HTTPClient *http.Client
|
|
ProbeTimeout time.Duration
|
|
Now func() time.Time
|
|
}
|
|
|
|
type PeerConnectionManager struct {
|
|
local PeerIdentity
|
|
peerCache *PeerCache
|
|
tracker *PeerConnectionTracker
|
|
rendezvousLeases []PeerRendezvousLease
|
|
httpClient *http.Client
|
|
probeTimeout time.Duration
|
|
now func() time.Time
|
|
|
|
mu sync.Mutex
|
|
lastCycle PeerConnectionManagerCycle
|
|
}
|
|
|
|
type PeerConnectionManagerCycle struct {
|
|
Mode string `json:"mode"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at"`
|
|
ProbeTimeoutMs int `json:"probe_timeout_ms"`
|
|
IntentCount int `json:"intent_count"`
|
|
Attempted int `json:"attempted"`
|
|
Succeeded int `json:"succeeded"`
|
|
Failed int `json:"failed"`
|
|
Deferred int `json:"deferred"`
|
|
Skipped int `json:"skipped"`
|
|
RendezvousRequiredCount int `json:"rendezvous_required_count"`
|
|
RendezvousResolvedCount int `json:"rendezvous_resolved_count"`
|
|
RelayControlCount int `json:"relay_control_count"`
|
|
RecoveryPlan PeerRecoveryPlan `json:"recovery_plan"`
|
|
IntentPlan PeerConnectionIntentPlan `json:"intent_plan"`
|
|
Results []PeerConnectionProbeResult `json:"results,omitempty"`
|
|
}
|
|
|
|
type PeerConnectionManagerSnapshot struct {
|
|
LastCycle PeerConnectionManagerCycle `json:"last_cycle"`
|
|
}
|
|
|
|
type PeerConnectionProbeResult struct {
|
|
NodeID string `json:"node_id"`
|
|
LinkStatus string `json:"link_status"`
|
|
Action string `json:"action"`
|
|
Reason string `json:"reason"`
|
|
Endpoint string `json:"endpoint,omitempty"`
|
|
SelectedCandidateID string `json:"selected_candidate_id,omitempty"`
|
|
SelectedEndpoint string `json:"selected_endpoint,omitempty"`
|
|
ConnectionState PeerConnectionState `json:"connection_state"`
|
|
TransportMode string `json:"transport_mode"`
|
|
RequiresRendezvous bool `json:"requires_rendezvous"`
|
|
RendezvousResolved bool `json:"rendezvous_resolved"`
|
|
DirectCandidate bool `json:"direct_candidate"`
|
|
RelayCandidate bool `json:"relay_candidate"`
|
|
RendezvousLeaseID string `json:"rendezvous_lease_id,omitempty"`
|
|
RelayNodeID string `json:"relay_node_id,omitempty"`
|
|
RelayEndpoint string `json:"relay_endpoint,omitempty"`
|
|
LatencyMs int `json:"latency_ms,omitempty"`
|
|
FailureReason string `json:"failure_reason,omitempty"`
|
|
CandidateResults []PeerConnectionCandidateProbeResult `json:"candidate_results,omitempty"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at"`
|
|
}
|
|
|
|
type PeerConnectionCandidateProbeResult struct {
|
|
CandidateID string `json:"candidate_id,omitempty"`
|
|
Endpoint string `json:"endpoint"`
|
|
Transport string `json:"transport,omitempty"`
|
|
LinkStatus string `json:"link_status"`
|
|
LatencyMs int `json:"latency_ms,omitempty"`
|
|
FailureReason string `json:"failure_reason,omitempty"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at"`
|
|
}
|
|
|
|
type peerConnectionProbeTarget struct {
|
|
CandidateID string
|
|
Endpoint string
|
|
Transport string
|
|
}
|
|
|
|
func NewPeerConnectionManager(cfg PeerConnectionManagerConfig) *PeerConnectionManager {
|
|
probeTimeout := cfg.ProbeTimeout
|
|
if probeTimeout <= 0 {
|
|
probeTimeout = DefaultPeerConnectionProbeTimeout
|
|
}
|
|
httpClient := cfg.HTTPClient
|
|
if httpClient == nil {
|
|
httpClient = &http.Client{
|
|
Transport: &http.Transport{
|
|
MaxIdleConns: 64,
|
|
MaxIdleConnsPerHost: 8,
|
|
IdleConnTimeout: 90 * time.Second,
|
|
},
|
|
Timeout: probeTimeout + time.Second,
|
|
}
|
|
}
|
|
now := cfg.Now
|
|
if now == nil {
|
|
now = func() time.Time { return time.Now().UTC() }
|
|
}
|
|
return &PeerConnectionManager{
|
|
local: cfg.Local,
|
|
peerCache: cfg.PeerCache,
|
|
tracker: cfg.Tracker,
|
|
rendezvousLeases: append([]PeerRendezvousLease{}, cfg.RendezvousLeases...),
|
|
httpClient: httpClient,
|
|
probeTimeout: probeTimeout,
|
|
now: now,
|
|
}
|
|
}
|
|
|
|
func (m *PeerConnectionManager) ProbeOnce(ctx context.Context) PeerConnectionManagerCycle {
|
|
peerCache, rendezvousLeases := m.peerConfigSnapshot()
|
|
if m == nil || peerCache == nil || m.tracker == nil {
|
|
return PeerConnectionManagerCycle{}
|
|
}
|
|
startedAt := normalizedNow(m.now())
|
|
peerSnapshot := peerCache.Snapshot()
|
|
recoveryPlan := PlanPeerRecovery(PeerRecoveryPlanConfig{
|
|
PeerCache: peerSnapshot,
|
|
Connections: m.tracker.Snapshot(),
|
|
TargetReadyPeers: DefaultStablePeerTarget,
|
|
MaxProbeCandidates: DefaultRecoveryProbeLimit,
|
|
Now: startedAt,
|
|
})
|
|
intentPlan := PlanPeerConnectionIntents(PeerConnectionIntentPlanConfig{
|
|
PeerCache: peerSnapshot,
|
|
RecoveryPlan: recoveryPlan,
|
|
RendezvousLeases: rendezvousLeases,
|
|
Now: startedAt,
|
|
})
|
|
entriesByNode := map[string]PeerCacheEntry{}
|
|
for _, entry := range peerSnapshot.Entries {
|
|
entriesByNode[entry.NodeID] = entry
|
|
}
|
|
cycle := PeerConnectionManagerCycle{
|
|
Mode: recoveryPlan.Mode,
|
|
StartedAt: startedAt,
|
|
ProbeTimeoutMs: int(m.probeTimeout.Milliseconds()),
|
|
IntentCount: intentPlan.IntentCount,
|
|
RendezvousRequiredCount: intentPlan.RendezvousRequiredCount,
|
|
RendezvousResolvedCount: intentPlan.RendezvousResolvedCount,
|
|
RelayControlCount: intentPlan.RelayControlCount,
|
|
RecoveryPlan: recoveryPlan,
|
|
IntentPlan: intentPlan,
|
|
Results: make([]PeerConnectionProbeResult, 0, len(intentPlan.Intents)),
|
|
}
|
|
for _, intent := range intentPlan.Intents {
|
|
result := m.probeIntent(ctx, intent, entriesByNode[intent.NodeID])
|
|
cycle.Results = append(cycle.Results, result)
|
|
switch result.LinkStatus {
|
|
case PeerConnectionProbeReachable:
|
|
cycle.Attempted++
|
|
cycle.Succeeded++
|
|
case PeerConnectionProbeUnreachable:
|
|
cycle.Attempted++
|
|
cycle.Failed++
|
|
case PeerConnectionProbeDeferred:
|
|
cycle.Deferred++
|
|
case PeerConnectionProbeSkipped:
|
|
cycle.Skipped++
|
|
}
|
|
}
|
|
cycle.CompletedAt = normalizedNow(m.now())
|
|
m.mu.Lock()
|
|
m.lastCycle = cycle
|
|
m.mu.Unlock()
|
|
return cycle
|
|
}
|
|
|
|
func (m *PeerConnectionManager) Snapshot() PeerConnectionManagerSnapshot {
|
|
if m == nil {
|
|
return PeerConnectionManagerSnapshot{}
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return PeerConnectionManagerSnapshot{LastCycle: m.lastCycle}
|
|
}
|
|
|
|
func (m *PeerConnectionManager) UpdatePeerConfig(peerCache *PeerCache, rendezvousLeases []PeerRendezvousLease) {
|
|
if m == nil {
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.peerCache = peerCache
|
|
m.rendezvousLeases = append([]PeerRendezvousLease{}, rendezvousLeases...)
|
|
}
|
|
|
|
func (m *PeerConnectionManager) peerConfigSnapshot() (*PeerCache, []PeerRendezvousLease) {
|
|
if m == nil {
|
|
return nil, nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.peerCache, append([]PeerRendezvousLease{}, m.rendezvousLeases...)
|
|
}
|
|
|
|
func (m *PeerConnectionManager) probeIntent(ctx context.Context, intent PeerConnectionIntent, cacheEntry PeerCacheEntry) PeerConnectionProbeResult {
|
|
startedAt := normalizedNow(m.now())
|
|
result := PeerConnectionProbeResult{
|
|
NodeID: intent.NodeID,
|
|
Action: intent.Action,
|
|
Reason: intent.Reason,
|
|
Endpoint: intent.Endpoint,
|
|
TransportMode: intent.TransportMode,
|
|
RequiresRendezvous: intent.RequiresRendezvous,
|
|
RendezvousResolved: intent.RendezvousResolved,
|
|
DirectCandidate: intent.DirectCandidate,
|
|
RelayCandidate: intent.RelayCandidate,
|
|
RendezvousLeaseID: intent.RendezvousLeaseID,
|
|
RelayNodeID: intent.RelayNodeID,
|
|
RelayEndpoint: intent.RelayEndpoint,
|
|
StartedAt: startedAt,
|
|
}
|
|
peer := PeerCacheEntry{
|
|
NodeID: intent.NodeID,
|
|
Endpoint: intent.Endpoint,
|
|
Warm: true,
|
|
WarmReason: intent.Reason,
|
|
RecoverySeed: intent.RecoverySeed,
|
|
BestCandidateID: intent.BestCandidateID,
|
|
BestTransport: intent.Transport,
|
|
RendezvousLeaseID: intent.RendezvousLeaseID,
|
|
RelayNodeID: intent.RelayNodeID,
|
|
RelayEndpoint: intent.RelayEndpoint,
|
|
RelayControl: intent.RelayCandidate,
|
|
}
|
|
if intent.RequiresRendezvous {
|
|
result.LinkStatus = PeerConnectionProbeDeferred
|
|
result.FailureReason = "rendezvous_required"
|
|
result.ConnectionState = m.tracker.RecordDeferred(peer, result.FailureReason, startedAt)
|
|
result.CompletedAt = normalizedNow(m.now())
|
|
return result
|
|
}
|
|
if strings.TrimSpace(intent.Endpoint) == "" || (!intent.DirectCandidate && !intent.RelayCandidate) {
|
|
result.LinkStatus = PeerConnectionProbeDeferred
|
|
result.FailureReason = "direct_candidate_unavailable"
|
|
if intent.RelayCandidate {
|
|
result.FailureReason = "relay_candidate_unavailable"
|
|
}
|
|
result.ConnectionState = m.tracker.RecordDeferred(peer, result.FailureReason, startedAt)
|
|
result.CompletedAt = normalizedNow(m.now())
|
|
return result
|
|
}
|
|
if !m.tracker.ShouldProbe(intent.NodeID, startedAt) {
|
|
result.LinkStatus = PeerConnectionProbeSkipped
|
|
result.FailureReason = "backoff_active"
|
|
result.ConnectionState = m.connectionState(intent.NodeID)
|
|
result.CompletedAt = normalizedNow(m.now())
|
|
return result
|
|
}
|
|
target := PeerIdentity{
|
|
ClusterID: m.local.ClusterID,
|
|
NodeID: intent.NodeID,
|
|
}
|
|
if intent.RelayCandidate && intent.RelayNodeID != "" {
|
|
target.NodeID = intent.RelayNodeID
|
|
}
|
|
targets := []peerConnectionProbeTarget{{
|
|
CandidateID: intent.BestCandidateID,
|
|
Endpoint: intent.Endpoint,
|
|
Transport: intent.Transport,
|
|
}}
|
|
if intent.DirectCandidate {
|
|
targets = peerConnectionProbeTargets(intent, cacheEntry)
|
|
}
|
|
var lastFailure string
|
|
for _, probeTarget := range targets {
|
|
probePeer := peer
|
|
probePeer.Endpoint = strings.TrimRight(strings.TrimSpace(probeTarget.Endpoint), "/")
|
|
probePeer.BestCandidateID = strings.TrimSpace(probeTarget.CandidateID)
|
|
probePeer.BestCandidateAddr = probePeer.Endpoint
|
|
probePeer.BestTransport = strings.TrimSpace(probeTarget.Transport)
|
|
if probePeer.Endpoint == "" {
|
|
continue
|
|
}
|
|
candidateStartedAt := normalizedNow(m.now())
|
|
m.tracker.BeginProbe(probePeer, candidateStartedAt)
|
|
probeCtx, cancel := context.WithTimeout(ctx, m.probeTimeout)
|
|
_, err := NewClient(probePeer.Endpoint).withHTTPClient(m.httpClient).SendHealth(probeCtx, NewHealthMessage(m.local, target))
|
|
cancel()
|
|
completedAt := normalizedNow(m.now())
|
|
candidateResult := PeerConnectionCandidateProbeResult{
|
|
CandidateID: probePeer.BestCandidateID,
|
|
Endpoint: probePeer.Endpoint,
|
|
Transport: probePeer.BestTransport,
|
|
StartedAt: candidateStartedAt,
|
|
CompletedAt: completedAt,
|
|
}
|
|
if err != nil {
|
|
lastFailure = err.Error()
|
|
candidateResult.LinkStatus = PeerConnectionProbeUnreachable
|
|
candidateResult.FailureReason = lastFailure
|
|
result.CandidateResults = append(result.CandidateResults, candidateResult)
|
|
continue
|
|
}
|
|
latency := int(completedAt.Sub(candidateStartedAt).Milliseconds())
|
|
if latency < 0 {
|
|
latency = 0
|
|
}
|
|
candidateResult.LinkStatus = PeerConnectionProbeReachable
|
|
candidateResult.LatencyMs = latency
|
|
result.CandidateResults = append(result.CandidateResults, candidateResult)
|
|
result.LinkStatus = PeerConnectionProbeReachable
|
|
result.Endpoint = probePeer.Endpoint
|
|
result.SelectedCandidateID = probePeer.BestCandidateID
|
|
result.SelectedEndpoint = probePeer.Endpoint
|
|
result.LatencyMs = latency
|
|
if intent.RelayCandidate {
|
|
result.ConnectionState = m.tracker.RecordRelayReady(probePeer, latency, completedAt)
|
|
} else {
|
|
result.ConnectionState = m.tracker.RecordSuccessForPeer(probePeer, latency, completedAt)
|
|
}
|
|
result.CompletedAt = completedAt
|
|
return result
|
|
}
|
|
completedAt := normalizedNow(m.now())
|
|
if lastFailure == "" {
|
|
lastFailure = "no_probe_endpoint_available"
|
|
}
|
|
result.LinkStatus = PeerConnectionProbeUnreachable
|
|
result.FailureReason = lastFailure
|
|
result.ConnectionState = m.tracker.RecordFailure(intent.NodeID, lastFailure, completedAt)
|
|
result.CompletedAt = completedAt
|
|
return result
|
|
}
|
|
|
|
func peerConnectionProbeTargets(intent PeerConnectionIntent, cacheEntry PeerCacheEntry) []peerConnectionProbeTarget {
|
|
seen := map[string]struct{}{}
|
|
out := make([]peerConnectionProbeTarget, 0, len(cacheEntry.EndpointCandidates)+1)
|
|
add := func(candidateID, endpoint, transport string) {
|
|
endpoint = strings.TrimRight(strings.TrimSpace(endpoint), "/")
|
|
if endpoint == "" {
|
|
return
|
|
}
|
|
key := candidateID + "|" + endpoint
|
|
if _, ok := seen[key]; ok {
|
|
return
|
|
}
|
|
seen[key] = struct{}{}
|
|
out = append(out, peerConnectionProbeTarget{
|
|
CandidateID: strings.TrimSpace(candidateID),
|
|
Endpoint: endpoint,
|
|
Transport: strings.TrimSpace(transport),
|
|
})
|
|
}
|
|
for _, candidate := range cacheEntry.EndpointCandidates {
|
|
if !candidateUsableForDirectProbe(candidate) {
|
|
continue
|
|
}
|
|
add(candidate.EndpointID, candidate.Address, candidate.Transport)
|
|
}
|
|
add(intent.BestCandidateID, intent.Endpoint, intent.Transport)
|
|
return out
|
|
}
|
|
|
|
func candidateUsableForDirectProbe(candidate PeerEndpointCandidate) bool {
|
|
endpoint := strings.TrimSpace(candidate.Address)
|
|
if endpoint == "" || strings.HasPrefix(endpoint, "relay://") || strings.HasPrefix(endpoint, "outbound://") {
|
|
return false
|
|
}
|
|
connectivity := strings.ToLower(strings.TrimSpace(candidate.ConnectivityMode))
|
|
reachability := strings.ToLower(strings.TrimSpace(candidate.Reachability))
|
|
transport := strings.ToLower(strings.TrimSpace(candidate.Transport))
|
|
if connectivity == "outbound_only" || connectivity == "relay_required" || reachability == "outbound_only" || reachability == "relay" {
|
|
return false
|
|
}
|
|
return transport == "" || strings.Contains(transport, "direct") || transport == "wss" || strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://")
|
|
}
|
|
|
|
func (m *PeerConnectionManager) connectionState(nodeID string) PeerConnectionState {
|
|
snapshot := m.tracker.Snapshot()
|
|
for _, entry := range snapshot.Entries {
|
|
if entry.NodeID == nodeID {
|
|
return entry
|
|
}
|
|
}
|
|
return PeerConnectionState{NodeID: nodeID, State: PeerConnectionDisconnected}
|
|
}
|
|
|
|
func (c Client) withHTTPClient(httpClient *http.Client) Client {
|
|
c.HTTPClient = httpClient
|
|
return c
|
|
}
|