Files

218 lines
6.5 KiB
Go

package mesh
import (
"errors"
"strings"
"time"
)
type FabricChannelRouteEventType string
const (
FabricChannelRouteEventNone FabricChannelRouteEventType = ""
FabricChannelRouteEventOpened FabricChannelRouteEventType = "opened"
FabricChannelRouteEventReroute FabricChannelRouteEventType = "reroute"
)
var ErrFabricRouteRerouteSuppressed = errors.New("fabric route reroute suppressed")
type FabricChannelRouterConfig struct {
SchedulerConfig FabricRouteSchedulerConfig
MaxAckLatencyMs int64
MaxRoutePressure int
MinRerouteInterval time.Duration
ProjectedChannelCost int
}
type FabricChannelRouter struct {
Config FabricChannelRouterConfig
Scheduler FabricRouteScheduler
}
type FabricChannelObservation struct {
ChannelID string
RouteID string
AckLatencyMs int64
Failed bool
BytesSent uint64
BytesRecv uint64
FramesSent uint64
FramesRecv uint64
Reason string
ObservedAt time.Time
}
type FabricChannelRouteEvent struct {
Type FabricChannelRouteEventType
Reason string
PreviousRoute FabricRoute
NextRoute FabricRoute
Choice FabricRouteChoice
Observation FabricChannelObservation
Channel FabricChannel
OccurredAt time.Time
}
func NewFabricChannelRouter(cfg FabricChannelRouterConfig) FabricChannelRouter {
cfg = normalizeFabricChannelRouterConfig(cfg)
return FabricChannelRouter{
Config: cfg,
Scheduler: NewFabricRouteScheduler(cfg.SchedulerConfig),
}
}
func (r FabricChannelRouter) OpenChannel(spec FabricChannelSpec, routeSet FabricRouteSet, now time.Time) (FabricChannel, FabricChannelRouteEvent, error) {
if now.IsZero() {
now = time.Now().UTC()
}
choice, err := r.Scheduler.ChooseRoute(spec, routeSet, now)
if err != nil {
return FabricChannel{}, FabricChannelRouteEvent{}, err
}
channel := FabricChannel{
Spec: spec,
State: FabricChannelOpen,
RouteID: choice.Route.RouteID,
TargetNode: choice.Route.DestinationNodeID,
OpenedAt: now,
}
event := FabricChannelRouteEvent{
Type: FabricChannelRouteEventOpened,
Reason: choice.Reason,
NextRoute: choice.Route,
Choice: choice,
Channel: channel,
OccurredAt: now,
}
return channel, event, nil
}
func (r FabricChannelRouter) ObserveChannel(channel FabricChannel, routeSet FabricRouteSet, observation FabricChannelObservation, now time.Time) (FabricChannel, FabricChannelRouteEvent, error) {
if now.IsZero() {
now = time.Now().UTC()
}
if observation.ObservedAt.IsZero() {
observation.ObservedAt = now
}
channel.BytesSent += observation.BytesSent
channel.BytesRecv += observation.BytesRecv
channel.FramesSent += observation.FramesSent
channel.FramesRecv += observation.FramesRecv
if channel.State == "" {
channel.State = FabricChannelOpen
}
if !r.shouldReroute(channel, observation, routeSet, now) {
return channel, FabricChannelRouteEvent{Type: FabricChannelRouteEventNone, Observation: observation, Channel: channel, OccurredAt: now}, nil
}
previous, _ := findFabricRoute(routeSet, channel.RouteID)
choice, err := r.chooseAlternativeRoute(channel.Spec, routeSet, channel.RouteID, now)
if err != nil {
return channel, FabricChannelRouteEvent{}, err
}
channel.RouteID = choice.Route.RouteID
channel.TargetNode = choice.Route.DestinationNodeID
channel.LastReroute = now
channel.RerouteCount++
reason := observation.Reason
if strings.TrimSpace(reason) == "" {
reason = rerouteReason(r.Config, observation, previous)
}
event := FabricChannelRouteEvent{
Type: FabricChannelRouteEventReroute,
Reason: reason,
PreviousRoute: previous,
NextRoute: choice.Route,
Choice: choice,
Observation: observation,
Channel: channel,
OccurredAt: now,
}
return channel, event, nil
}
func (r FabricChannelRouter) shouldReroute(channel FabricChannel, observation FabricChannelObservation, routeSet FabricRouteSet, now time.Time) bool {
cfg := normalizeFabricChannelRouterConfig(r.Config)
if cfg.MinRerouteInterval > 0 && !channel.LastReroute.IsZero() && now.Sub(channel.LastReroute) < cfg.MinRerouteInterval {
return false
}
if observation.Failed {
return true
}
if cfg.MaxAckLatencyMs > 0 && observation.AckLatencyMs > cfg.MaxAckLatencyMs {
return true
}
if cfg.MaxRoutePressure > 0 {
if route, ok := findFabricRoute(routeSet, channel.RouteID); ok && fabricRoutePressurePercent(route, cfg.ProjectedChannelCost) > cfg.MaxRoutePressure {
return true
}
}
return false
}
func (r FabricChannelRouter) chooseAlternativeRoute(spec FabricChannelSpec, routeSet FabricRouteSet, currentRouteID string, now time.Time) (FabricRouteChoice, error) {
routes := flattenFabricRouteSet(routeSet)
alternatives := make([]FabricRoute, 0, len(routes))
for _, route := range routes {
if route.RouteID == currentRouteID {
continue
}
alternatives = append(alternatives, route)
}
if len(alternatives) == 0 {
return FabricRouteChoice{}, ErrFabricRouteNotFound
}
return r.Scheduler.ChooseRoute(spec, routeSetFromRoutes(routeSet, alternatives), now)
}
func normalizeFabricChannelRouterConfig(cfg FabricChannelRouterConfig) FabricChannelRouterConfig {
if cfg.ProjectedChannelCost <= 0 {
cfg.ProjectedChannelCost = 1
}
if cfg.SchedulerConfig.ProjectedChannelCost <= 0 {
cfg.SchedulerConfig.ProjectedChannelCost = cfg.ProjectedChannelCost
}
if cfg.MaxRoutePressure <= 0 {
cfg.MaxRoutePressure = 90
}
return cfg
}
func rerouteReason(cfg FabricChannelRouterConfig, observation FabricChannelObservation, route FabricRoute) string {
cfg = normalizeFabricChannelRouterConfig(cfg)
switch {
case observation.Failed:
return "route_failure"
case cfg.MaxAckLatencyMs > 0 && observation.AckLatencyMs > cfg.MaxAckLatencyMs:
return "ack_latency_threshold"
case cfg.MaxRoutePressure > 0 && fabricRoutePressurePercent(route, cfg.ProjectedChannelCost) > cfg.MaxRoutePressure:
return "route_capacity_pressure"
default:
return "route_degraded"
}
}
func findFabricRoute(routeSet FabricRouteSet, routeID string) (FabricRoute, bool) {
routeID = strings.TrimSpace(routeID)
if routeID == "" {
return FabricRoute{}, false
}
for _, route := range flattenFabricRouteSet(routeSet) {
if route.RouteID == routeID {
return route, true
}
}
return FabricRoute{}, false
}
func routeSetFromRoutes(template FabricRouteSet, routes []FabricRoute) FabricRouteSet {
out := FabricRouteSet{TargetKind: template.TargetKind, TargetID: template.TargetID}
if len(routes) == 0 {
return out
}
out.Primary = routes[0]
if len(routes) > 1 {
out.WarmStandby = append(out.WarmStandby, routes[1:]...)
}
return out
}