138 lines
3.4 KiB
Go
138 lines
3.4 KiB
Go
package mesh
|
|
|
|
import (
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type FabricRoutePressureTracker struct {
|
|
mu sync.Mutex
|
|
active map[string]int
|
|
maxActive map[string]int
|
|
acquiredTotal uint64
|
|
releasedTotal uint64
|
|
maxActiveTotal int
|
|
lastAcquiredRoute string
|
|
lastReleasedRoute string
|
|
}
|
|
|
|
type FabricRoutePressureSnapshot struct {
|
|
Active map[string]int `json:"active"`
|
|
MaxActive map[string]int `json:"max_active"`
|
|
ActiveTotal int `json:"active_total"`
|
|
MaxActiveTotal int `json:"max_active_total"`
|
|
AcquiredTotal uint64 `json:"acquired_total"`
|
|
ReleasedTotal uint64 `json:"released_total"`
|
|
LastAcquiredRoute string `json:"last_acquired_route,omitempty"`
|
|
LastReleasedRoute string `json:"last_released_route,omitempty"`
|
|
}
|
|
|
|
func NewFabricRoutePressureTracker() *FabricRoutePressureTracker {
|
|
return &FabricRoutePressureTracker{
|
|
active: map[string]int{},
|
|
maxActive: map[string]int{},
|
|
}
|
|
}
|
|
|
|
func (t *FabricRoutePressureTracker) Apply(routeSet FabricRouteSet) FabricRouteSet {
|
|
if t == nil {
|
|
return routeSet
|
|
}
|
|
active := t.Snapshot()
|
|
if len(active) == 0 {
|
|
return routeSet
|
|
}
|
|
apply := func(route FabricRoute) FabricRoute {
|
|
if count := active[route.RouteID]; count > 0 {
|
|
route.ActiveChannels += count
|
|
}
|
|
return route
|
|
}
|
|
routeSet.Primary = apply(routeSet.Primary)
|
|
for i := range routeSet.WarmStandby {
|
|
routeSet.WarmStandby[i] = apply(routeSet.WarmStandby[i])
|
|
}
|
|
for i := range routeSet.ColdFallbacks {
|
|
routeSet.ColdFallbacks[i] = apply(routeSet.ColdFallbacks[i])
|
|
}
|
|
return routeSet
|
|
}
|
|
|
|
func (t *FabricRoutePressureTracker) Acquire(routeID string) func() {
|
|
routeID = strings.TrimSpace(routeID)
|
|
if t == nil || routeID == "" {
|
|
return func() {}
|
|
}
|
|
t.mu.Lock()
|
|
if t.active == nil {
|
|
t.active = map[string]int{}
|
|
}
|
|
if t.maxActive == nil {
|
|
t.maxActive = map[string]int{}
|
|
}
|
|
t.active[routeID]++
|
|
if t.active[routeID] > t.maxActive[routeID] {
|
|
t.maxActive[routeID] = t.active[routeID]
|
|
}
|
|
t.acquiredTotal++
|
|
t.lastAcquiredRoute = routeID
|
|
if activeTotal := activeTotalLocked(t.active); activeTotal > t.maxActiveTotal {
|
|
t.maxActiveTotal = activeTotal
|
|
}
|
|
t.mu.Unlock()
|
|
var released atomic.Bool
|
|
return func() {
|
|
if released.Swap(true) {
|
|
return
|
|
}
|
|
t.mu.Lock()
|
|
if t.active[routeID] <= 1 {
|
|
delete(t.active, routeID)
|
|
} else {
|
|
t.active[routeID]--
|
|
}
|
|
t.releasedTotal++
|
|
t.lastReleasedRoute = routeID
|
|
t.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (t *FabricRoutePressureTracker) Snapshot() map[string]int {
|
|
return t.SnapshotPressure().Active
|
|
}
|
|
|
|
func (t *FabricRoutePressureTracker) SnapshotPressure() FabricRoutePressureSnapshot {
|
|
if t == nil {
|
|
return FabricRoutePressureSnapshot{}
|
|
}
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
active := make(map[string]int, len(t.active))
|
|
for routeID, count := range t.active {
|
|
active[routeID] = count
|
|
}
|
|
maxActive := make(map[string]int, len(t.maxActive))
|
|
for routeID, count := range t.maxActive {
|
|
maxActive[routeID] = count
|
|
}
|
|
return FabricRoutePressureSnapshot{
|
|
Active: active,
|
|
MaxActive: maxActive,
|
|
ActiveTotal: activeTotalLocked(active),
|
|
MaxActiveTotal: t.maxActiveTotal,
|
|
AcquiredTotal: t.acquiredTotal,
|
|
ReleasedTotal: t.releasedTotal,
|
|
LastAcquiredRoute: t.lastAcquiredRoute,
|
|
LastReleasedRoute: t.lastReleasedRoute,
|
|
}
|
|
}
|
|
|
|
func activeTotalLocked(active map[string]int) int {
|
|
total := 0
|
|
for _, count := range active {
|
|
total += count
|
|
}
|
|
return total
|
|
}
|