305 lines
11 KiB
Go
305 lines
11 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
PeerConnectionProbeReachable = "reachable"
|
|
PeerConnectionProbeUnreachable = "unreachable"
|
|
PeerConnectionProbeDeferred = "deferred"
|
|
PeerConnectionProbeSkipped = "skipped"
|
|
)
|
|
|
|
const (
|
|
DefaultPeerConnectionProbeTimeout = 2 * time.Second
|
|
)
|
|
|
|
type PeerConnectionManagerConfig struct {
|
|
Local PeerIdentity
|
|
PeerCache *PeerCache
|
|
Tracker *PeerConnectionTracker
|
|
RendezvousLeases []PeerRendezvousLease
|
|
HTTPClient *http.Client
|
|
ProbeTimeout time.Duration
|
|
Now func() time.Time
|
|
}
|
|
|
|
type PeerConnectionManager struct {
|
|
local PeerIdentity
|
|
peerCache *PeerCache
|
|
tracker *PeerConnectionTracker
|
|
rendezvousLeases []PeerRendezvousLease
|
|
httpClient *http.Client
|
|
probeTimeout time.Duration
|
|
now func() time.Time
|
|
|
|
mu sync.Mutex
|
|
lastCycle PeerConnectionManagerCycle
|
|
}
|
|
|
|
type PeerConnectionManagerCycle struct {
|
|
Mode string `json:"mode"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at"`
|
|
ProbeTimeoutMs int `json:"probe_timeout_ms"`
|
|
IntentCount int `json:"intent_count"`
|
|
Attempted int `json:"attempted"`
|
|
Succeeded int `json:"succeeded"`
|
|
Failed int `json:"failed"`
|
|
Deferred int `json:"deferred"`
|
|
Skipped int `json:"skipped"`
|
|
RendezvousRequiredCount int `json:"rendezvous_required_count"`
|
|
RendezvousResolvedCount int `json:"rendezvous_resolved_count"`
|
|
RelayControlCount int `json:"relay_control_count"`
|
|
RecoveryPlan PeerRecoveryPlan `json:"recovery_plan"`
|
|
IntentPlan PeerConnectionIntentPlan `json:"intent_plan"`
|
|
Results []PeerConnectionProbeResult `json:"results,omitempty"`
|
|
}
|
|
|
|
type PeerConnectionManagerSnapshot struct {
|
|
LastCycle PeerConnectionManagerCycle `json:"last_cycle"`
|
|
}
|
|
|
|
type PeerConnectionProbeResult struct {
|
|
NodeID string `json:"node_id"`
|
|
LinkStatus string `json:"link_status"`
|
|
Action string `json:"action"`
|
|
Reason string `json:"reason"`
|
|
Endpoint string `json:"endpoint,omitempty"`
|
|
ConnectionState PeerConnectionState `json:"connection_state"`
|
|
TransportMode string `json:"transport_mode"`
|
|
RequiresRendezvous bool `json:"requires_rendezvous"`
|
|
RendezvousResolved bool `json:"rendezvous_resolved"`
|
|
DirectCandidate bool `json:"direct_candidate"`
|
|
RelayCandidate bool `json:"relay_candidate"`
|
|
RendezvousLeaseID string `json:"rendezvous_lease_id,omitempty"`
|
|
RelayNodeID string `json:"relay_node_id,omitempty"`
|
|
RelayEndpoint string `json:"relay_endpoint,omitempty"`
|
|
LatencyMs int `json:"latency_ms,omitempty"`
|
|
FailureReason string `json:"failure_reason,omitempty"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at"`
|
|
}
|
|
|
|
func NewPeerConnectionManager(cfg PeerConnectionManagerConfig) *PeerConnectionManager {
|
|
probeTimeout := cfg.ProbeTimeout
|
|
if probeTimeout <= 0 {
|
|
probeTimeout = DefaultPeerConnectionProbeTimeout
|
|
}
|
|
httpClient := cfg.HTTPClient
|
|
if httpClient == nil {
|
|
httpClient = &http.Client{
|
|
Transport: &http.Transport{
|
|
MaxIdleConns: 64,
|
|
MaxIdleConnsPerHost: 8,
|
|
IdleConnTimeout: 90 * time.Second,
|
|
},
|
|
Timeout: probeTimeout + time.Second,
|
|
}
|
|
}
|
|
now := cfg.Now
|
|
if now == nil {
|
|
now = func() time.Time { return time.Now().UTC() }
|
|
}
|
|
return &PeerConnectionManager{
|
|
local: cfg.Local,
|
|
peerCache: cfg.PeerCache,
|
|
tracker: cfg.Tracker,
|
|
rendezvousLeases: append([]PeerRendezvousLease{}, cfg.RendezvousLeases...),
|
|
httpClient: httpClient,
|
|
probeTimeout: probeTimeout,
|
|
now: now,
|
|
}
|
|
}
|
|
|
|
func (m *PeerConnectionManager) ProbeOnce(ctx context.Context) PeerConnectionManagerCycle {
|
|
peerCache, rendezvousLeases := m.peerConfigSnapshot()
|
|
if m == nil || peerCache == nil || m.tracker == nil {
|
|
return PeerConnectionManagerCycle{}
|
|
}
|
|
startedAt := normalizedNow(m.now())
|
|
peerSnapshot := peerCache.Snapshot()
|
|
recoveryPlan := PlanPeerRecovery(PeerRecoveryPlanConfig{
|
|
PeerCache: peerSnapshot,
|
|
Connections: m.tracker.Snapshot(),
|
|
TargetReadyPeers: DefaultStablePeerTarget,
|
|
MaxProbeCandidates: DefaultRecoveryProbeLimit,
|
|
Now: startedAt,
|
|
})
|
|
intentPlan := PlanPeerConnectionIntents(PeerConnectionIntentPlanConfig{
|
|
PeerCache: peerSnapshot,
|
|
RecoveryPlan: recoveryPlan,
|
|
RendezvousLeases: rendezvousLeases,
|
|
Now: startedAt,
|
|
})
|
|
cycle := PeerConnectionManagerCycle{
|
|
Mode: recoveryPlan.Mode,
|
|
StartedAt: startedAt,
|
|
ProbeTimeoutMs: int(m.probeTimeout.Milliseconds()),
|
|
IntentCount: intentPlan.IntentCount,
|
|
RendezvousRequiredCount: intentPlan.RendezvousRequiredCount,
|
|
RendezvousResolvedCount: intentPlan.RendezvousResolvedCount,
|
|
RelayControlCount: intentPlan.RelayControlCount,
|
|
RecoveryPlan: recoveryPlan,
|
|
IntentPlan: intentPlan,
|
|
Results: make([]PeerConnectionProbeResult, 0, len(intentPlan.Intents)),
|
|
}
|
|
for _, intent := range intentPlan.Intents {
|
|
result := m.probeIntent(ctx, intent)
|
|
cycle.Results = append(cycle.Results, result)
|
|
switch result.LinkStatus {
|
|
case PeerConnectionProbeReachable:
|
|
cycle.Attempted++
|
|
cycle.Succeeded++
|
|
case PeerConnectionProbeUnreachable:
|
|
cycle.Attempted++
|
|
cycle.Failed++
|
|
case PeerConnectionProbeDeferred:
|
|
cycle.Deferred++
|
|
case PeerConnectionProbeSkipped:
|
|
cycle.Skipped++
|
|
}
|
|
}
|
|
cycle.CompletedAt = normalizedNow(m.now())
|
|
m.mu.Lock()
|
|
m.lastCycle = cycle
|
|
m.mu.Unlock()
|
|
return cycle
|
|
}
|
|
|
|
func (m *PeerConnectionManager) Snapshot() PeerConnectionManagerSnapshot {
|
|
if m == nil {
|
|
return PeerConnectionManagerSnapshot{}
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return PeerConnectionManagerSnapshot{LastCycle: m.lastCycle}
|
|
}
|
|
|
|
func (m *PeerConnectionManager) UpdatePeerConfig(peerCache *PeerCache, rendezvousLeases []PeerRendezvousLease) {
|
|
if m == nil {
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.peerCache = peerCache
|
|
m.rendezvousLeases = append([]PeerRendezvousLease{}, rendezvousLeases...)
|
|
}
|
|
|
|
func (m *PeerConnectionManager) peerConfigSnapshot() (*PeerCache, []PeerRendezvousLease) {
|
|
if m == nil {
|
|
return nil, nil
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.peerCache, append([]PeerRendezvousLease{}, m.rendezvousLeases...)
|
|
}
|
|
|
|
func (m *PeerConnectionManager) probeIntent(ctx context.Context, intent PeerConnectionIntent) PeerConnectionProbeResult {
|
|
startedAt := normalizedNow(m.now())
|
|
result := PeerConnectionProbeResult{
|
|
NodeID: intent.NodeID,
|
|
Action: intent.Action,
|
|
Reason: intent.Reason,
|
|
Endpoint: intent.Endpoint,
|
|
TransportMode: intent.TransportMode,
|
|
RequiresRendezvous: intent.RequiresRendezvous,
|
|
RendezvousResolved: intent.RendezvousResolved,
|
|
DirectCandidate: intent.DirectCandidate,
|
|
RelayCandidate: intent.RelayCandidate,
|
|
RendezvousLeaseID: intent.RendezvousLeaseID,
|
|
RelayNodeID: intent.RelayNodeID,
|
|
RelayEndpoint: intent.RelayEndpoint,
|
|
StartedAt: startedAt,
|
|
}
|
|
peer := PeerCacheEntry{
|
|
NodeID: intent.NodeID,
|
|
Endpoint: intent.Endpoint,
|
|
Warm: true,
|
|
WarmReason: intent.Reason,
|
|
RecoverySeed: intent.RecoverySeed,
|
|
BestCandidateID: intent.BestCandidateID,
|
|
BestTransport: intent.Transport,
|
|
RendezvousLeaseID: intent.RendezvousLeaseID,
|
|
RelayNodeID: intent.RelayNodeID,
|
|
RelayEndpoint: intent.RelayEndpoint,
|
|
RelayControl: intent.RelayCandidate,
|
|
}
|
|
if intent.RequiresRendezvous {
|
|
result.LinkStatus = PeerConnectionProbeDeferred
|
|
result.FailureReason = "rendezvous_required"
|
|
result.ConnectionState = m.tracker.RecordDeferred(peer, result.FailureReason, startedAt)
|
|
result.CompletedAt = normalizedNow(m.now())
|
|
return result
|
|
}
|
|
if strings.TrimSpace(intent.Endpoint) == "" || (!intent.DirectCandidate && !intent.RelayCandidate) {
|
|
result.LinkStatus = PeerConnectionProbeDeferred
|
|
result.FailureReason = "direct_candidate_unavailable"
|
|
if intent.RelayCandidate {
|
|
result.FailureReason = "relay_candidate_unavailable"
|
|
}
|
|
result.ConnectionState = m.tracker.RecordDeferred(peer, result.FailureReason, startedAt)
|
|
result.CompletedAt = normalizedNow(m.now())
|
|
return result
|
|
}
|
|
if !m.tracker.ShouldProbe(intent.NodeID, startedAt) {
|
|
result.LinkStatus = PeerConnectionProbeSkipped
|
|
result.FailureReason = "backoff_active"
|
|
result.ConnectionState = m.connectionState(intent.NodeID)
|
|
result.CompletedAt = normalizedNow(m.now())
|
|
return result
|
|
}
|
|
m.tracker.BeginProbe(peer, startedAt)
|
|
probeCtx, cancel := context.WithTimeout(ctx, m.probeTimeout)
|
|
defer cancel()
|
|
target := PeerIdentity{
|
|
ClusterID: m.local.ClusterID,
|
|
NodeID: intent.NodeID,
|
|
}
|
|
if intent.RelayCandidate && intent.RelayNodeID != "" {
|
|
target.NodeID = intent.RelayNodeID
|
|
}
|
|
_, err := NewClient(strings.TrimRight(intent.Endpoint, "/")).withHTTPClient(m.httpClient).SendHealth(probeCtx, NewHealthMessage(m.local, target))
|
|
completedAt := normalizedNow(m.now())
|
|
if err != nil {
|
|
result.LinkStatus = PeerConnectionProbeUnreachable
|
|
result.FailureReason = err.Error()
|
|
result.ConnectionState = m.tracker.RecordFailure(intent.NodeID, err.Error(), completedAt)
|
|
result.CompletedAt = completedAt
|
|
return result
|
|
}
|
|
latency := int(completedAt.Sub(startedAt).Milliseconds())
|
|
if latency < 0 {
|
|
latency = 0
|
|
}
|
|
result.LinkStatus = PeerConnectionProbeReachable
|
|
result.LatencyMs = latency
|
|
if intent.RelayCandidate {
|
|
result.ConnectionState = m.tracker.RecordRelayReady(peer, latency, completedAt)
|
|
} else {
|
|
result.ConnectionState = m.tracker.RecordSuccess(intent.NodeID, latency, completedAt)
|
|
}
|
|
result.CompletedAt = completedAt
|
|
return result
|
|
}
|
|
|
|
func (m *PeerConnectionManager) connectionState(nodeID string) PeerConnectionState {
|
|
snapshot := m.tracker.Snapshot()
|
|
for _, entry := range snapshot.Entries {
|
|
if entry.NodeID == nodeID {
|
|
return entry
|
|
}
|
|
}
|
|
return PeerConnectionState{NodeID: nodeID, State: PeerConnectionDisconnected}
|
|
}
|
|
|
|
func (c Client) withHTTPClient(httpClient *http.Client) Client {
|
|
c.HTTPClient = httpClient
|
|
return c
|
|
}
|