311 lines
9.1 KiB
Go
311 lines
9.1 KiB
Go
package mesh
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
PeerConnectionDisconnected = "disconnected"
|
|
PeerConnectionConnecting = "connecting"
|
|
PeerConnectionReady = "ready"
|
|
PeerConnectionRelayReady = "relay_ready"
|
|
PeerConnectionDegraded = "degraded"
|
|
PeerConnectionBackoff = "backoff"
|
|
PeerConnectionWaiting = "waiting_rendezvous"
|
|
)
|
|
|
|
const (
|
|
peerConnectionBackoffBase = 5 * time.Second
|
|
peerConnectionBackoffMax = time.Minute
|
|
)
|
|
|
|
type PeerConnectionTracker struct {
|
|
mu sync.Mutex
|
|
entries map[string]PeerConnectionState
|
|
}
|
|
|
|
type PeerConnectionState struct {
|
|
NodeID string `json:"node_id"`
|
|
State string `json:"state"`
|
|
Warm bool `json:"warm"`
|
|
WarmReason string `json:"warm_reason,omitempty"`
|
|
Endpoint string `json:"endpoint,omitempty"`
|
|
BestCandidateID string `json:"best_candidate_id,omitempty"`
|
|
RendezvousLeaseID string `json:"rendezvous_lease_id,omitempty"`
|
|
RelayNodeID string `json:"relay_node_id,omitempty"`
|
|
RelayEndpoint string `json:"relay_endpoint,omitempty"`
|
|
RelayQUIC bool `json:"relay_quic"`
|
|
ConsecutiveSuccesses int `json:"consecutive_successes"`
|
|
ConsecutiveFailures int `json:"consecutive_failures"`
|
|
LastLatencyMs int `json:"last_latency_ms,omitempty"`
|
|
LastFailureReason string `json:"last_failure_reason,omitempty"`
|
|
LastTransitionAt time.Time `json:"last_transition_at"`
|
|
LastProbeAt time.Time `json:"last_probe_at,omitempty"`
|
|
BackoffUntil time.Time `json:"backoff_until,omitempty"`
|
|
}
|
|
|
|
type PeerConnectionSnapshot struct {
|
|
Total int `json:"total"`
|
|
Ready int `json:"ready"`
|
|
RelayReady int `json:"relay_ready"`
|
|
Degraded int `json:"degraded"`
|
|
Backoff int `json:"backoff"`
|
|
Waiting int `json:"waiting_rendezvous"`
|
|
Connecting int `json:"connecting"`
|
|
Disconnected int `json:"disconnected"`
|
|
StateCounts map[string]int `json:"state_counts"`
|
|
Entries []PeerConnectionState `json:"entries"`
|
|
LastTransitionAt time.Time `json:"last_transition_at,omitempty"`
|
|
}
|
|
|
|
func NewPeerConnectionTracker(peerSnapshot PeerCacheSnapshot, now time.Time) *PeerConnectionTracker {
|
|
now = normalizedNow(now)
|
|
tracker := &PeerConnectionTracker{entries: map[string]PeerConnectionState{}}
|
|
for _, peer := range peerSnapshot.Entries {
|
|
if !peer.Warm || peer.NodeID == "" {
|
|
continue
|
|
}
|
|
tracker.entries[peer.NodeID] = PeerConnectionState{
|
|
NodeID: peer.NodeID,
|
|
State: PeerConnectionDisconnected,
|
|
Warm: peer.Warm,
|
|
WarmReason: peer.WarmReason,
|
|
Endpoint: peer.Endpoint,
|
|
BestCandidateID: peer.BestCandidateID,
|
|
LastTransitionAt: now,
|
|
}
|
|
}
|
|
return tracker
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) ShouldProbe(nodeID string, now time.Time) bool {
|
|
if t == nil {
|
|
return true
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
entry, ok := t.entries[nodeID]
|
|
if !ok {
|
|
return true
|
|
}
|
|
now = normalizedNow(now)
|
|
return entry.State != PeerConnectionBackoff || entry.BackoffUntil.IsZero() || !entry.BackoffUntil.After(now)
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) BeginProbe(peer PeerCacheEntry, now time.Time) PeerConnectionState {
|
|
if t == nil {
|
|
return PeerConnectionState{}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
now = normalizedNow(now)
|
|
entry := t.entry(peer, now)
|
|
if entry.State != PeerConnectionReady && entry.State != PeerConnectionDegraded {
|
|
entry.State = PeerConnectionConnecting
|
|
entry.LastTransitionAt = now
|
|
}
|
|
entry.LastProbeAt = now
|
|
t.entries[peer.NodeID] = entry
|
|
return entry
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) RecordSuccess(nodeID string, latencyMs int, now time.Time) PeerConnectionState {
|
|
if t == nil {
|
|
return PeerConnectionState{}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
now = normalizedNow(now)
|
|
entry := t.entries[nodeID]
|
|
entry.NodeID = nodeID
|
|
entry.ConsecutiveSuccesses++
|
|
entry.ConsecutiveFailures = 0
|
|
entry.LastLatencyMs = latencyMs
|
|
entry.LastFailureReason = ""
|
|
entry.LastProbeAt = now
|
|
entry.BackoffUntil = time.Time{}
|
|
nextState := PeerConnectionReady
|
|
if latencyMs >= 500 {
|
|
nextState = PeerConnectionDegraded
|
|
}
|
|
if entry.State != nextState {
|
|
entry.State = nextState
|
|
entry.LastTransitionAt = now
|
|
}
|
|
t.entries[nodeID] = entry
|
|
return entry
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) RecordSuccessForPeer(peer PeerCacheEntry, latencyMs int, now time.Time) PeerConnectionState {
|
|
if t == nil {
|
|
return PeerConnectionState{}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
now = normalizedNow(now)
|
|
entry := t.entry(peer, now)
|
|
entry.ConsecutiveSuccesses++
|
|
entry.ConsecutiveFailures = 0
|
|
entry.LastLatencyMs = latencyMs
|
|
entry.LastFailureReason = ""
|
|
entry.LastProbeAt = now
|
|
entry.BackoffUntil = time.Time{}
|
|
nextState := PeerConnectionReady
|
|
if latencyMs >= 500 {
|
|
nextState = PeerConnectionDegraded
|
|
}
|
|
if entry.State != nextState {
|
|
entry.State = nextState
|
|
entry.LastTransitionAt = now
|
|
}
|
|
t.entries[peer.NodeID] = entry
|
|
return entry
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) RecordRelayReady(peer PeerCacheEntry, latencyMs int, now time.Time) PeerConnectionState {
|
|
if t == nil {
|
|
return PeerConnectionState{}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
now = normalizedNow(now)
|
|
entry := t.entry(peer, now)
|
|
entry.ConsecutiveSuccesses++
|
|
entry.ConsecutiveFailures = 0
|
|
entry.LastLatencyMs = latencyMs
|
|
entry.LastFailureReason = ""
|
|
entry.LastProbeAt = now
|
|
entry.BackoffUntil = time.Time{}
|
|
if entry.State != PeerConnectionRelayReady {
|
|
entry.State = PeerConnectionRelayReady
|
|
entry.LastTransitionAt = now
|
|
}
|
|
t.entries[peer.NodeID] = entry
|
|
return entry
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) RecordFailure(nodeID string, reason string, now time.Time) PeerConnectionState {
|
|
if t == nil {
|
|
return PeerConnectionState{}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
now = normalizedNow(now)
|
|
entry := t.entries[nodeID]
|
|
entry.NodeID = nodeID
|
|
entry.ConsecutiveFailures++
|
|
entry.ConsecutiveSuccesses = 0
|
|
entry.LastFailureReason = reason
|
|
entry.LastProbeAt = now
|
|
nextState := PeerConnectionDegraded
|
|
if entry.ConsecutiveFailures >= 3 {
|
|
nextState = PeerConnectionBackoff
|
|
entry.BackoffUntil = now.Add(peerConnectionBackoffDuration(entry.ConsecutiveFailures))
|
|
}
|
|
if entry.State != nextState {
|
|
entry.State = nextState
|
|
entry.LastTransitionAt = now
|
|
}
|
|
t.entries[nodeID] = entry
|
|
return entry
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) RecordDeferred(peer PeerCacheEntry, reason string, now time.Time) PeerConnectionState {
|
|
if t == nil {
|
|
return PeerConnectionState{}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
now = normalizedNow(now)
|
|
entry := t.entry(peer, now)
|
|
entry.State = PeerConnectionWaiting
|
|
entry.LastFailureReason = reason
|
|
entry.LastProbeAt = time.Time{}
|
|
entry.LastTransitionAt = now
|
|
entry.BackoffUntil = time.Time{}
|
|
t.entries[peer.NodeID] = entry
|
|
return entry
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) Snapshot() PeerConnectionSnapshot {
|
|
if t == nil {
|
|
return PeerConnectionSnapshot{StateCounts: map[string]int{}}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
entries := make([]PeerConnectionState, 0, len(t.entries))
|
|
counts := map[string]int{
|
|
PeerConnectionDisconnected: 0,
|
|
PeerConnectionConnecting: 0,
|
|
PeerConnectionReady: 0,
|
|
PeerConnectionRelayReady: 0,
|
|
PeerConnectionDegraded: 0,
|
|
PeerConnectionBackoff: 0,
|
|
PeerConnectionWaiting: 0,
|
|
}
|
|
var lastTransition time.Time
|
|
for _, entry := range t.entries {
|
|
entries = append(entries, entry)
|
|
counts[entry.State]++
|
|
if entry.LastTransitionAt.After(lastTransition) {
|
|
lastTransition = entry.LastTransitionAt
|
|
}
|
|
}
|
|
sort.SliceStable(entries, func(i, j int) bool {
|
|
return entries[i].NodeID < entries[j].NodeID
|
|
})
|
|
return PeerConnectionSnapshot{
|
|
Total: len(entries),
|
|
Ready: counts[PeerConnectionReady],
|
|
RelayReady: counts[PeerConnectionRelayReady],
|
|
Degraded: counts[PeerConnectionDegraded],
|
|
Backoff: counts[PeerConnectionBackoff],
|
|
Waiting: counts[PeerConnectionWaiting],
|
|
Connecting: counts[PeerConnectionConnecting],
|
|
Disconnected: counts[PeerConnectionDisconnected],
|
|
StateCounts: counts,
|
|
Entries: entries,
|
|
LastTransitionAt: lastTransition,
|
|
}
|
|
}
|
|
|
|
func (t *PeerConnectionTracker) entry(peer PeerCacheEntry, now time.Time) PeerConnectionState {
|
|
entry, ok := t.entries[peer.NodeID]
|
|
if !ok {
|
|
entry = PeerConnectionState{
|
|
NodeID: peer.NodeID,
|
|
State: PeerConnectionDisconnected,
|
|
LastTransitionAt: now,
|
|
}
|
|
}
|
|
entry.Warm = peer.Warm
|
|
entry.WarmReason = peer.WarmReason
|
|
entry.Endpoint = peer.Endpoint
|
|
entry.BestCandidateID = peer.BestCandidateID
|
|
entry.RendezvousLeaseID = peer.RendezvousLeaseID
|
|
entry.RelayNodeID = peer.RelayNodeID
|
|
entry.RelayEndpoint = peer.RelayEndpoint
|
|
entry.RelayQUIC = peer.RelayQUIC
|
|
return entry
|
|
}
|
|
|
|
func peerConnectionBackoffDuration(failures int) time.Duration {
|
|
if failures < 3 {
|
|
return 0
|
|
}
|
|
backoff := peerConnectionBackoffBase * time.Duration(failures-2)
|
|
if backoff > peerConnectionBackoffMax {
|
|
return peerConnectionBackoffMax
|
|
}
|
|
return backoff
|
|
}
|
|
|
|
func normalizedNow(now time.Time) time.Time {
|
|
if now.IsZero() {
|
|
return time.Now().UTC()
|
|
}
|
|
return now.UTC()
|
|
}
|