Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

391 lines
10 KiB
Go

package mesh
import (
"errors"
"sort"
"strings"
"time"
)
type FabricChannelTargetKind string
const (
FabricChannelTargetNode FabricChannelTargetKind = "node"
FabricChannelTargetPool FabricChannelTargetKind = "pool"
)
type FabricChannelLifecycleState string
const (
FabricChannelOpening FabricChannelLifecycleState = "opening"
FabricChannelOpen FabricChannelLifecycleState = "open"
FabricChannelDraining FabricChannelLifecycleState = "draining"
FabricChannelClosed FabricChannelLifecycleState = "closed"
)
type FabricRouteMode string
const (
FabricRouteDirect FabricRouteMode = "direct_quic"
FabricRouteLAN FabricRouteMode = "lan_quic"
FabricRouteReverse FabricRouteMode = "reverse_quic"
FabricRouteRelay FabricRouteMode = "relay_quic"
FabricRouteICE FabricRouteMode = "ice_quic"
)
var (
ErrFabricChannelInvalid = errors.New("fabric channel request is invalid")
ErrFabricRouteNotFound = errors.New("fabric route not found")
)
type FabricChannelSpec struct {
ChannelID string
ClusterID string
SourceNodeID string
TargetKind FabricChannelTargetKind
TargetID string
TrafficClass string
MinBandwidth int64
StickyKey string
CreatedAt time.Time
ForbiddenHops []string
}
type FabricServiceChannelTarget struct {
Kind FabricChannelTargetKind
PoolIDs []string
NodeIDs []string
SelectedNodeID string
ServiceRole string
SelectionPolicy string
SingleMemberPool bool
}
type FabricServiceChannelRequest struct {
SchemaVersion string
ChannelID string
ClusterID string
OrganizationID string
UserID string
ResourceID string
SourceNodeID string
SourceRole string
ServiceClass string
Target FabricServiceChannelTarget
TrafficClass string
CreatedAt time.Time
}
type FabricChannel struct {
Spec FabricChannelSpec
State FabricChannelLifecycleState
RouteID string
TargetNode string
OpenedAt time.Time
LastReroute time.Time
BytesSent uint64
BytesRecv uint64
FramesSent uint64
FramesRecv uint64
RerouteCount uint64
}
type FabricRouteHop struct {
NodeID string
Mode FabricRouteMode
EndpointID string
Address string
PeerCertSHA256 string
}
type FabricRoute struct {
RouteID string
ClusterID string
SourceNodeID string
DestinationNodeID string
PoolID string
Hops []FabricRouteHop
BaseLatencyMs int
JitterMs int
LossPermille int
Capacity int
ActiveChannels int
RelayCount int
LastUpdatedAt time.Time
Healthy bool
Degraded bool
}
type FabricRouteSet struct {
TargetKind FabricChannelTargetKind
TargetID string
Primary FabricRoute
WarmStandby []FabricRoute
ColdFallbacks []FabricRoute
}
type FabricAdjacency struct {
FromNodeID string
ToNodeID string
Mode FabricRouteMode
RTTMs int
JitterMs int
LossPermille int
Capacity int
ActiveChannels int
ThroughputBps int64
PressurePercent int
Healthy bool
PassiveOutbound bool
LocalityGroupID string
NATGroupID string
LastObservedAt time.Time
LastFailureReason string
}
type FabricRouteChoice struct {
Route FabricRoute
Score int
Reason string
PressureBefore int
PressureAfter int
}
type FabricRouteSchedulerConfig struct {
LatencyWeight int
JitterWeight int
LossWeight int
PressureWeight int
HopPenalty int
RelayPenalty int
DegradedPenalty int
ProjectedChannelCost int
HardMaxRoutePressure int
}
type FabricRouteScheduler struct {
Config FabricRouteSchedulerConfig
}
func NewFabricRouteScheduler(cfg FabricRouteSchedulerConfig) FabricRouteScheduler {
return FabricRouteScheduler{Config: normalizeFabricRouteSchedulerConfig(cfg)}
}
func (s FabricRouteScheduler) ChooseRoute(spec FabricChannelSpec, routeSet FabricRouteSet, now time.Time) (FabricRouteChoice, error) {
if err := ValidateFabricChannelSpec(spec); err != nil {
return FabricRouteChoice{}, err
}
routes := flattenFabricRouteSet(routeSet)
if len(routes) == 0 {
return FabricRouteChoice{}, ErrFabricRouteNotFound
}
forbidden := stringSet(spec.ForbiddenHops)
choices := make([]FabricRouteChoice, 0, len(routes))
for _, route := range routes {
if !fabricRouteUsable(spec, route, forbidden, now) {
continue
}
choice := s.scoreRoute(route)
if s.Config.HardMaxRoutePressure > 0 && choice.PressureAfter > s.Config.HardMaxRoutePressure {
continue
}
choice.Route = route
choices = append(choices, choice)
}
if len(choices) == 0 {
return FabricRouteChoice{}, ErrFabricRouteNotFound
}
sort.SliceStable(choices, func(i, j int) bool {
if choices[i].Score != choices[j].Score {
return choices[i].Score < choices[j].Score
}
if choices[i].PressureAfter != choices[j].PressureAfter {
return choices[i].PressureAfter < choices[j].PressureAfter
}
if choices[i].Route.BaseLatencyMs != choices[j].Route.BaseLatencyMs {
return choices[i].Route.BaseLatencyMs < choices[j].Route.BaseLatencyMs
}
return choices[i].Route.RouteID < choices[j].Route.RouteID
})
return choices[0], nil
}
func ValidateFabricChannelSpec(spec FabricChannelSpec) error {
if strings.TrimSpace(spec.ChannelID) == "" || strings.TrimSpace(spec.ClusterID) == "" || strings.TrimSpace(spec.SourceNodeID) == "" || strings.TrimSpace(spec.TargetID) == "" {
return ErrFabricChannelInvalid
}
switch spec.TargetKind {
case FabricChannelTargetNode, FabricChannelTargetPool:
return nil
default:
return ErrFabricChannelInvalid
}
}
func FabricChannelSpecFromServiceRequest(req FabricServiceChannelRequest, localNodeID string, now time.Time) (FabricChannelSpec, error) {
if now.IsZero() {
now = time.Now().UTC()
}
sourceNodeID := firstNonEmpty(strings.TrimSpace(req.SourceNodeID), strings.TrimSpace(localNodeID))
targetKind := req.Target.Kind
if targetKind == "" {
targetKind = FabricChannelTargetPool
}
targetID := firstNonEmpty(firstString(req.Target.PoolIDs), strings.TrimSpace(req.Target.SelectedNodeID), firstString(req.Target.NodeIDs))
if targetKind == FabricChannelTargetNode {
targetID = firstNonEmpty(strings.TrimSpace(req.Target.SelectedNodeID), firstString(req.Target.NodeIDs), targetID)
}
spec := FabricChannelSpec{
ChannelID: firstNonEmpty(strings.TrimSpace(req.ChannelID), strings.TrimSpace(req.ResourceID)),
ClusterID: strings.TrimSpace(req.ClusterID),
SourceNodeID: sourceNodeID,
TargetKind: targetKind,
TargetID: targetID,
TrafficClass: firstNonEmpty(strings.TrimSpace(req.TrafficClass), serviceClassDefaultTrafficClass(req.ServiceClass)),
StickyKey: strings.TrimSpace(req.ResourceID),
CreatedAt: now,
}
if err := ValidateFabricChannelSpec(spec); err != nil {
return FabricChannelSpec{}, err
}
return spec, nil
}
func serviceClassDefaultTrafficClass(serviceClass string) string {
switch strings.TrimSpace(strings.ToLower(serviceClass)) {
case FabricServiceClassVPNPackets:
return FabricServiceChannelBulk
case FabricServiceClassRemoteWorkspace:
return FabricServiceChannelInteractive
default:
return FabricServiceChannelReliable
}
}
func firstString(values []string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
}
return ""
}
func (s FabricRouteScheduler) scoreRoute(route FabricRoute) FabricRouteChoice {
cfg := normalizeFabricRouteSchedulerConfig(s.Config)
pressureBefore := fabricRoutePressurePercent(route, 0)
pressureAfter := fabricRoutePressurePercent(route, cfg.ProjectedChannelCost)
score := route.BaseLatencyMs*cfg.LatencyWeight +
route.JitterMs*cfg.JitterWeight +
route.LossPermille*cfg.LossWeight +
pressureAfter*cfg.PressureWeight +
len(route.Hops)*cfg.HopPenalty +
route.RelayCount*cfg.RelayPenalty
if route.Degraded {
score += cfg.DegradedPenalty
}
reason := "latency_load_score"
if pressureAfter >= 90 {
reason = "capacity_pressure_avoidance"
}
if route.RelayCount > 0 {
reason = "relay_fallback_available"
}
return FabricRouteChoice{Score: score, Reason: reason, PressureBefore: pressureBefore, PressureAfter: pressureAfter}
}
func normalizeFabricRouteSchedulerConfig(cfg FabricRouteSchedulerConfig) FabricRouteSchedulerConfig {
if cfg.LatencyWeight <= 0 {
cfg.LatencyWeight = 10
}
if cfg.JitterWeight <= 0 {
cfg.JitterWeight = 4
}
if cfg.LossWeight <= 0 {
cfg.LossWeight = 8
}
if cfg.PressureWeight <= 0 {
cfg.PressureWeight = 12
}
if cfg.HopPenalty <= 0 {
cfg.HopPenalty = 5
}
if cfg.RelayPenalty <= 0 {
cfg.RelayPenalty = 25
}
if cfg.DegradedPenalty <= 0 {
cfg.DegradedPenalty = 500
}
if cfg.ProjectedChannelCost <= 0 {
cfg.ProjectedChannelCost = 1
}
if cfg.HardMaxRoutePressure < 0 {
cfg.HardMaxRoutePressure = 0
}
return cfg
}
func flattenFabricRouteSet(routeSet FabricRouteSet) []FabricRoute {
routes := make([]FabricRoute, 0, 1+len(routeSet.WarmStandby)+len(routeSet.ColdFallbacks))
if strings.TrimSpace(routeSet.Primary.RouteID) != "" {
routes = append(routes, routeSet.Primary)
}
routes = append(routes, routeSet.WarmStandby...)
routes = append(routes, routeSet.ColdFallbacks...)
return routes
}
func fabricRouteUsable(spec FabricChannelSpec, route FabricRoute, forbidden map[string]struct{}, now time.Time) bool {
if strings.TrimSpace(route.RouteID) == "" || !route.Healthy {
return false
}
if route.ClusterID != "" && spec.ClusterID != "" && route.ClusterID != spec.ClusterID {
return false
}
if route.SourceNodeID != "" && route.SourceNodeID != spec.SourceNodeID {
return false
}
switch spec.TargetKind {
case FabricChannelTargetNode:
if route.DestinationNodeID != "" && route.DestinationNodeID != spec.TargetID {
return false
}
case FabricChannelTargetPool:
if route.PoolID != "" && route.PoolID != spec.TargetID {
return false
}
}
for _, hop := range route.Hops {
if _, blocked := forbidden[hop.NodeID]; blocked {
return false
}
}
return true
}
func fabricRoutePressurePercent(route FabricRoute, projected int) int {
if route.Capacity <= 0 {
return 100
}
active := route.ActiveChannels + projected
if active <= 0 {
return 0
}
pressure := (active * 100) / route.Capacity
if pressure > 100 {
return 100
}
return pressure
}
func stringSet(values []string) map[string]struct{} {
out := make(map[string]struct{}, len(values))
for _, value := range values {
value = strings.TrimSpace(value)
if value != "" {
out[value] = struct{}{}
}
}
return out
}