269 lines
8.5 KiB
Go
269 lines
8.5 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
|
|
)
|
|
|
|
type QUICSyntheticTransport struct {
|
|
Targets map[string]FabricTransportTarget
|
|
RouteSets map[string]FabricRouteSet
|
|
Transport FabricTransport
|
|
Router FabricChannelRouter
|
|
Timeout time.Duration
|
|
Pressure *FabricRoutePressureTracker
|
|
Health *FabricRouteHealthTracker
|
|
sequence atomic.Uint64
|
|
}
|
|
|
|
type QUICSyntheticTransportSnapshot struct {
|
|
RoutePressure FabricRoutePressureSnapshot `json:"route_pressure"`
|
|
RouteHealth FabricRouteHealthSnapshot `json:"route_health,omitempty"`
|
|
}
|
|
|
|
func NewQUICSyntheticTransportFromRouteSets(routeSets map[string]FabricRouteSet, transport FabricTransport) *QUICSyntheticTransport {
|
|
normalizedRouteSets := make(map[string]FabricRouteSet, len(routeSets))
|
|
targets := make(map[string]FabricTransportTarget, len(routeSets))
|
|
for nodeID, routeSet := range routeSets {
|
|
nodeID = strings.TrimSpace(nodeID)
|
|
if nodeID == "" {
|
|
continue
|
|
}
|
|
normalizedRouteSets[nodeID] = routeSet
|
|
if target, err := FabricTransportTargetForRoute(routeSet.Primary); err == nil {
|
|
targets[nodeID] = target
|
|
}
|
|
}
|
|
if transport == nil {
|
|
transport = NewQUICFabricTransport(nil)
|
|
}
|
|
return &QUICSyntheticTransport{
|
|
Targets: targets,
|
|
RouteSets: normalizedRouteSets,
|
|
Transport: transport,
|
|
Router: NewFabricChannelRouter(FabricChannelRouterConfig{
|
|
MaxAckLatencyMs: 2000,
|
|
MinRerouteInterval: 50 * time.Millisecond,
|
|
}),
|
|
Timeout: 10 * time.Second,
|
|
Pressure: NewFabricRoutePressureTracker(),
|
|
Health: NewFabricRouteHealthTracker(30 * time.Second),
|
|
}
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) SendSynthetic(ctx context.Context, nextNodeID string, envelope SyntheticEnvelope) (SyntheticEnvelope, error) {
|
|
if t == nil || t.Transport == nil {
|
|
return SyntheticEnvelope{}, ErrSyntheticPeerUnavailable
|
|
}
|
|
nextNodeID = strings.TrimSpace(nextNodeID)
|
|
routeSet, ok := t.RouteSets[nextNodeID]
|
|
if !ok {
|
|
target, targetOK := t.Targets[nextNodeID]
|
|
if !targetOK || strings.TrimSpace(target.Endpoint) == "" {
|
|
return SyntheticEnvelope{}, ErrSyntheticPeerUnavailable
|
|
}
|
|
routeSet = FabricRouteSetForTransportTargets(envelope.ClusterID, envelope.From.NodeID, nextNodeID, []FabricTransportTarget{target})
|
|
}
|
|
spec := FabricChannelSpec{
|
|
ChannelID: fmt.Sprintf("synthetic-%d", t.sequence.Add(1)),
|
|
ClusterID: envelope.ClusterID,
|
|
SourceNodeID: envelope.From.NodeID,
|
|
TargetKind: FabricChannelTargetNode,
|
|
TargetID: nextNodeID,
|
|
TrafficClass: FabricServiceChannelReliable,
|
|
CreatedAt: time.Now().UTC(),
|
|
}
|
|
payload, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
return t.sendSyntheticWithRouteSet(ctx, spec, routeSet, payload)
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) sendSyntheticWithRouteSet(ctx context.Context, spec FabricChannelSpec, routeSet FabricRouteSet, payload []byte) (SyntheticEnvelope, error) {
|
|
router := t.Router
|
|
if router.Config.MaxRoutePressure == 0 {
|
|
router = NewFabricChannelRouter(FabricChannelRouterConfig{MaxAckLatencyMs: 2000, MinRerouteInterval: 50 * time.Millisecond})
|
|
}
|
|
routeSet = t.routeSetForScheduling(routeSet)
|
|
channel, _, err := router.OpenChannel(spec, routeSet, time.Now().UTC())
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
timeout := t.Timeout
|
|
if timeout <= 0 {
|
|
timeout = 10 * time.Second
|
|
}
|
|
for {
|
|
routeSet = t.routeSetForScheduling(routeSet)
|
|
route, ok := findFabricRoute(routeSet, channel.RouteID)
|
|
if !ok {
|
|
return SyntheticEnvelope{}, ErrFabricRouteNotFound
|
|
}
|
|
target, err := FabricTransportTargetForRoute(route)
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
target.PeerID = firstNonEmpty(strings.TrimSpace(target.PeerID), spec.TargetID)
|
|
target.MaxPayload = fabricproto.DefaultMaxPayload
|
|
releaseRoute := t.acquireSyntheticRoute(route.RouteID)
|
|
session, err := t.Transport.Connect(ctx, target)
|
|
if err != nil {
|
|
releaseRoute()
|
|
t.markSyntheticRouteFailure(route.RouteID, err)
|
|
updated, event, rerouteErr := router.ObserveChannel(channel, routeSet, FabricChannelObservation{
|
|
ChannelID: spec.ChannelID,
|
|
RouteID: route.RouteID,
|
|
Failed: true,
|
|
Reason: "connect_failed",
|
|
ObservedAt: time.Now().UTC(),
|
|
}, time.Now().UTC())
|
|
channel = updated
|
|
if event.Type == FabricChannelRouteEventReroute {
|
|
continue
|
|
}
|
|
if rerouteErr != nil {
|
|
return SyntheticEnvelope{}, rerouteErr
|
|
}
|
|
return SyntheticEnvelope{}, fmt.Errorf("%w: %v", ErrSyntheticPeerUnavailable, err)
|
|
}
|
|
response, ackMs, err := t.sendSyntheticOnSession(ctx, session, payload, timeout)
|
|
_ = session.Close()
|
|
releaseRoute()
|
|
if err == nil {
|
|
t.markSyntheticRouteSuccess(route.RouteID)
|
|
_, _, _ = router.ObserveChannel(channel, routeSet, FabricChannelObservation{
|
|
ChannelID: spec.ChannelID,
|
|
RouteID: route.RouteID,
|
|
AckLatencyMs: ackMs,
|
|
BytesSent: uint64(len(payload)),
|
|
FramesSent: 1,
|
|
BytesRecv: uint64(len(response.Payload)),
|
|
FramesRecv: 1,
|
|
ObservedAt: time.Now().UTC(),
|
|
}, time.Now().UTC())
|
|
return decodeQUICSyntheticForwardResponse(response.Payload)
|
|
}
|
|
t.markSyntheticRouteFailure(route.RouteID, err)
|
|
updated, event, rerouteErr := router.ObserveChannel(channel, routeSet, FabricChannelObservation{
|
|
ChannelID: spec.ChannelID,
|
|
RouteID: route.RouteID,
|
|
Failed: true,
|
|
Reason: "response_failed",
|
|
ObservedAt: time.Now().UTC(),
|
|
}, time.Now().UTC())
|
|
channel = updated
|
|
if event.Type == FabricChannelRouteEventReroute {
|
|
continue
|
|
}
|
|
if rerouteErr != nil {
|
|
return SyntheticEnvelope{}, rerouteErr
|
|
}
|
|
return SyntheticEnvelope{}, fmt.Errorf("%w: %v", ErrSyntheticPeerUnavailable, err)
|
|
}
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) routeSetForScheduling(routeSet FabricRouteSet) FabricRouteSet {
|
|
if t != nil && t.Health != nil {
|
|
routeSet = t.Health.Apply(routeSet, time.Now().UTC())
|
|
}
|
|
if t != nil && t.Pressure != nil {
|
|
routeSet = t.Pressure.Apply(routeSet)
|
|
}
|
|
return routeSet
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) acquireSyntheticRoute(routeID string) func() {
|
|
if t == nil || t.Pressure == nil {
|
|
return func() {}
|
|
}
|
|
return t.Pressure.Acquire(routeID)
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) markSyntheticRouteFailure(routeID string, err error) {
|
|
if t == nil || t.Health == nil || err == nil {
|
|
return
|
|
}
|
|
t.Health.MarkFailure(routeID, err.Error(), time.Now().UTC())
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) markSyntheticRouteSuccess(routeID string) {
|
|
if t == nil || t.Health == nil {
|
|
return
|
|
}
|
|
t.Health.MarkSuccess(routeID)
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) Snapshot() QUICSyntheticTransportSnapshot {
|
|
if t == nil {
|
|
return QUICSyntheticTransportSnapshot{}
|
|
}
|
|
var pressure FabricRoutePressureSnapshot
|
|
if t.Pressure != nil {
|
|
pressure = t.Pressure.SnapshotPressure()
|
|
}
|
|
var health FabricRouteHealthSnapshot
|
|
if t.Health != nil {
|
|
health = t.Health.Snapshot(time.Now().UTC())
|
|
}
|
|
return QUICSyntheticTransportSnapshot{RoutePressure: pressure, RouteHealth: health}
|
|
}
|
|
|
|
func (t *QUICSyntheticTransport) sendSyntheticOnSession(ctx context.Context, session FabricTransportSession, payload []byte, timeout time.Duration) (fabricproto.Frame, int64, error) {
|
|
sequence := t.sequence.Add(1)
|
|
if err := session.Send(ctx, fabricproto.Frame{
|
|
Type: fabricproto.FrameData,
|
|
TrafficClass: fabricproto.TrafficClassReliable,
|
|
StreamID: SyntheticForwardQUICStreamID,
|
|
Sequence: sequence,
|
|
Payload: payload,
|
|
}); err != nil {
|
|
return fabricproto.Frame{}, 0, err
|
|
}
|
|
waitCtx := ctx
|
|
if timeout > 0 {
|
|
var cancel context.CancelFunc
|
|
waitCtx, cancel = context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
}
|
|
started := time.Now()
|
|
for {
|
|
select {
|
|
case <-waitCtx.Done():
|
|
return fabricproto.Frame{}, 0, waitCtx.Err()
|
|
case err, ok := <-session.Errors():
|
|
if !ok {
|
|
return fabricproto.Frame{}, 0, ErrSyntheticPeerUnavailable
|
|
}
|
|
if err != nil {
|
|
return fabricproto.Frame{}, 0, err
|
|
}
|
|
case frame, ok := <-session.Frames():
|
|
if !ok {
|
|
return fabricproto.Frame{}, 0, ErrSyntheticPeerUnavailable
|
|
}
|
|
if frame.Type != fabricproto.FrameData || frame.StreamID != SyntheticForwardQUICStreamID || frame.Sequence != sequence {
|
|
continue
|
|
}
|
|
return frame, time.Since(started).Milliseconds(), nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func decodeQUICSyntheticForwardResponse(payload []byte) (SyntheticEnvelope, error) {
|
|
var response quicSyntheticForwardResponse
|
|
if err := json.Unmarshal(payload, &response); err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if strings.TrimSpace(response.Error) != "" {
|
|
return SyntheticEnvelope{}, fmt.Errorf("%w: %s", ErrSyntheticPeerUnavailable, response.Error)
|
|
}
|
|
return response.Envelope, nil
|
|
}
|