package mesh import ( "context" "fmt" "strings" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" ) type FabricChannelRuntimeConfig struct { RouterConfig FabricChannelRouterConfig StreamID uint64 TrafficClass fabricproto.TrafficClass Timeout time.Duration MaxPayload int RouteHealthTTL time.Duration } type FabricChannelRuntime struct { Transport FabricTransport Router FabricChannelRouter Pressure *FabricRoutePressureTracker Health *FabricRouteHealthTracker Config FabricChannelRuntimeConfig } type FabricChannelRuntimeResult struct { Channel FabricChannel BytesSent uint64 BytesRecv uint64 FramesSent uint64 FramesRecv uint64 AcksReceived uint64 RouteEvents []FabricChannelRouteEvent RouteAttempts []string MigrationEvents int RoutePressure FabricRoutePressureSnapshot RouteHealth FabricRouteHealthSnapshot } type FabricChannelRequestResponseResult struct { FabricChannelRuntimeResult ResponsePayload []byte } func NewFabricChannelRuntime(transport FabricTransport, cfg FabricChannelRuntimeConfig) *FabricChannelRuntime { if cfg.StreamID == 0 { cfg.StreamID = 2 } if cfg.TrafficClass == 0 { cfg.TrafficClass = fabricproto.TrafficClassBulk } if cfg.Timeout <= 0 { cfg.Timeout = 30 * time.Second } if cfg.MaxPayload <= 0 { cfg.MaxPayload = fabricproto.DefaultMaxPayload } return &FabricChannelRuntime{ Transport: transport, Router: NewFabricChannelRouter(cfg.RouterConfig), Pressure: NewFabricRoutePressureTracker(), Health: NewFabricRouteHealthTracker(cfg.RouteHealthTTL), Config: cfg, } } func (r *FabricChannelRuntime) SendReliable(ctx context.Context, spec FabricChannelSpec, routeSet FabricRouteSet, payloads [][]byte) (FabricChannelRuntimeResult, error) { if r == nil || r.Transport == nil { return FabricChannelRuntimeResult{}, ErrForwardRuntimeUnavailable } now := time.Now().UTC() routeSet = r.routeSetForScheduling(routeSet) channel, event, err := r.Router.OpenChannel(spec, routeSet, now) if err != nil { return FabricChannelRuntimeResult{}, err } result := FabricChannelRuntimeResult{Channel: channel, RouteEvents: []FabricChannelRouteEvent{event}} sequence := uint64(0) index := 0 for index < len(payloads) { routeSet = r.routeSetForScheduling(routeSet) route, ok := findFabricRoute(routeSet, channel.RouteID) if !ok { return result, ErrFabricRouteNotFound } result.RouteAttempts = append(result.RouteAttempts, route.RouteID) target, err := FabricTransportTargetForRoute(route) if err != nil { return result, err } releaseRoute := r.acquireRoute(route.RouteID) session, err := r.Transport.Connect(ctx, target) if err != nil { releaseRoute() r.markRouteFailure(route.RouteID, err) updated, event, rerouteErr := r.Router.ObserveChannel(channel, routeSet, FabricChannelObservation{ ChannelID: spec.ChannelID, RouteID: route.RouteID, Failed: true, Reason: "connect_failed", ObservedAt: time.Now().UTC(), }, time.Now().UTC()) channel = updated result.Channel = channel if event.Type == FabricChannelRouteEventReroute { result.RouteEvents = append(result.RouteEvents, event) result.MigrationEvents++ continue } if rerouteErr != nil { return result, rerouteErr } return result, err } migrated, sendErr := r.sendOnSession(ctx, session, &channel, routeSet, route, payloads, &index, &sequence, &result) _ = session.Close() releaseRoute() result.Channel = channel if sendErr != nil { return result, sendErr } if !migrated { break } } result.Channel = channel result.RoutePressure = r.snapshotRoutePressure() result.RouteHealth = r.snapshotRouteHealth() return result, nil } func (r *FabricChannelRuntime) SendRequestResponse(ctx context.Context, spec FabricChannelSpec, routeSet FabricRouteSet, payload []byte) (FabricChannelRequestResponseResult, error) { if r == nil || r.Transport == nil { return FabricChannelRequestResponseResult{}, ErrForwardRuntimeUnavailable } if len(payload) > r.Config.MaxPayload { return FabricChannelRequestResponseResult{}, fmt.Errorf("%w: %d > %d", fabricproto.ErrInvalidPayloadLen, len(payload), r.Config.MaxPayload) } now := time.Now().UTC() routeSet = r.routeSetForScheduling(routeSet) channel, event, err := r.Router.OpenChannel(spec, routeSet, now) if err != nil { return FabricChannelRequestResponseResult{}, err } result := FabricChannelRequestResponseResult{ FabricChannelRuntimeResult: FabricChannelRuntimeResult{Channel: channel, RouteEvents: []FabricChannelRouteEvent{event}}, } sequence := uint64(1) for { routeSet = r.routeSetForScheduling(routeSet) route, ok := findFabricRoute(routeSet, channel.RouteID) if !ok { return result, ErrFabricRouteNotFound } result.RouteAttempts = append(result.RouteAttempts, route.RouteID) target, err := FabricTransportTargetForRoute(route) if err != nil { return result, err } releaseRoute := r.acquireRoute(route.RouteID) session, err := r.Transport.Connect(ctx, target) if err != nil { releaseRoute() r.markRouteFailure(route.RouteID, err) updated, routeEvent, rerouteErr := r.Router.ObserveChannel(channel, routeSet, FabricChannelObservation{ ChannelID: spec.ChannelID, RouteID: route.RouteID, Failed: true, Reason: "connect_failed", ObservedAt: time.Now().UTC(), }, time.Now().UTC()) channel = updated result.Channel = channel if routeEvent.Type == FabricChannelRouteEventReroute { result.RouteEvents = append(result.RouteEvents, routeEvent) result.MigrationEvents++ continue } if rerouteErr != nil { return result, rerouteErr } return result, err } response, ackMs, sendErr := r.sendRequestResponseOnSession(ctx, session, route.RouteID, spec.ChannelID, payload, sequence) _ = session.Close() releaseRoute() result.Channel = channel if sendErr == nil { r.markRouteSuccess(route.RouteID) result.BytesSent += uint64(len(payload)) result.FramesSent++ result.BytesRecv += uint64(len(response)) result.FramesRecv++ result.AcksReceived++ updated, routeEvent, observeErr := r.Router.ObserveChannel(channel, routeSet, FabricChannelObservation{ ChannelID: spec.ChannelID, RouteID: route.RouteID, AckLatencyMs: ackMs, BytesSent: uint64(len(payload)), FramesSent: 1, BytesRecv: uint64(len(response)), FramesRecv: 1, ObservedAt: time.Now().UTC(), }, time.Now().UTC()) channel = updated result.Channel = channel if observeErr != nil { return result, observeErr } if routeEvent.Type == FabricChannelRouteEventReroute { result.RouteEvents = append(result.RouteEvents, routeEvent) result.MigrationEvents++ } result.ResponsePayload = response result.RoutePressure = r.snapshotRoutePressure() result.RouteHealth = r.snapshotRouteHealth() return result, nil } r.markRouteFailure(route.RouteID, sendErr) updated, routeEvent, rerouteErr := r.Router.ObserveChannel(channel, routeSet, FabricChannelObservation{ ChannelID: spec.ChannelID, RouteID: route.RouteID, Failed: true, Reason: "response_failed", ObservedAt: time.Now().UTC(), }, time.Now().UTC()) channel = updated result.Channel = channel if routeEvent.Type == FabricChannelRouteEventReroute { result.RouteEvents = append(result.RouteEvents, routeEvent) result.MigrationEvents++ continue } if rerouteErr != nil { return result, rerouteErr } return result, sendErr } } func (r *FabricChannelRuntime) routeSetForScheduling(routeSet FabricRouteSet) FabricRouteSet { if r != nil && r.Health != nil { routeSet = r.Health.Apply(routeSet, time.Now().UTC()) } return r.routeSetWithActiveChannels(routeSet) } func (r *FabricChannelRuntime) routeSetWithActiveChannels(routeSet FabricRouteSet) FabricRouteSet { if r == nil || r.Pressure == nil { return routeSet } return r.Pressure.Apply(routeSet) } func (r *FabricChannelRuntime) acquireRoute(routeID string) func() { if r == nil || r.Pressure == nil { return func() {} } return r.Pressure.Acquire(routeID) } func (r *FabricChannelRuntime) snapshotRoutePressure() FabricRoutePressureSnapshot { if r == nil || r.Pressure == nil { return FabricRoutePressureSnapshot{} } return r.Pressure.SnapshotPressure() } func (r *FabricChannelRuntime) snapshotRouteHealth() FabricRouteHealthSnapshot { if r == nil || r.Health == nil { return FabricRouteHealthSnapshot{} } return r.Health.Snapshot(time.Now().UTC()) } func (r *FabricChannelRuntime) markRouteFailure(routeID string, err error) { if r == nil || r.Health == nil || err == nil { return } r.Health.MarkFailure(routeID, err.Error(), time.Now().UTC()) } func (r *FabricChannelRuntime) markRouteSuccess(routeID string) { if r == nil || r.Health == nil { return } r.Health.MarkSuccess(routeID) } func (r *FabricChannelRuntime) sendOnSession(ctx context.Context, session FabricTransportSession, channel *FabricChannel, routeSet FabricRouteSet, route FabricRoute, payloads [][]byte, index *int, sequence *uint64, result *FabricChannelRuntimeResult) (bool, error) { cfg := r.Config if err := session.Send(ctx, fabricproto.Frame{ Type: fabricproto.FrameOpenStream, TrafficClass: cfg.TrafficClass, StreamID: cfg.StreamID, }); err != nil { r.markRouteFailure(route.RouteID, err) return false, err } for *index < len(payloads) { payload := payloads[*index] if len(payload) > cfg.MaxPayload { return false, fmt.Errorf("%w: %d > %d", fabricproto.ErrInvalidPayloadLen, len(payload), cfg.MaxPayload) } (*sequence)++ if err := session.Send(ctx, fabricproto.Frame{ Type: fabricproto.FrameData, TrafficClass: cfg.TrafficClass, StreamID: cfg.StreamID, Sequence: *sequence, Payload: payload, }); err != nil { r.markRouteFailure(route.RouteID, err) return false, err } ackOK, ackMs := waitForFabricRuntimeAck(ctx, session, cfg.StreamID, *sequence, cfg.Timeout) if !ackOK { r.markRouteFailure(route.RouteID, fmt.Errorf("ack_failed")) updated, event, err := r.Router.ObserveChannel(*channel, routeSet, FabricChannelObservation{ ChannelID: channel.Spec.ChannelID, RouteID: route.RouteID, Failed: true, Reason: "ack_failed", ObservedAt: time.Now().UTC(), }, time.Now().UTC()) *channel = updated if event.Type == FabricChannelRouteEventReroute { result.RouteEvents = append(result.RouteEvents, event) result.MigrationEvents++ return true, nil } return false, err } r.markRouteSuccess(route.RouteID) *index++ result.BytesSent += uint64(len(payload)) result.FramesSent++ result.AcksReceived++ updated, event, err := r.Router.ObserveChannel(*channel, routeSet, FabricChannelObservation{ ChannelID: channel.Spec.ChannelID, RouteID: route.RouteID, AckLatencyMs: ackMs, BytesSent: uint64(len(payload)), FramesSent: 1, ObservedAt: time.Now().UTC(), }, time.Now().UTC()) *channel = updated if err != nil { return false, err } if event.Type == FabricChannelRouteEventReroute { result.RouteEvents = append(result.RouteEvents, event) result.MigrationEvents++ return true, nil } } _ = session.Send(context.Background(), fabricproto.Frame{ Type: fabricproto.FrameCloseStream, TrafficClass: cfg.TrafficClass, StreamID: cfg.StreamID, }) return false, nil } func (r *FabricChannelRuntime) sendRequestResponseOnSession(ctx context.Context, session FabricTransportSession, routeID string, channelID string, payload []byte, sequence uint64) ([]byte, int64, error) { cfg := r.Config if err := session.Send(ctx, fabricproto.Frame{ Type: fabricproto.FrameOpenStream, TrafficClass: cfg.TrafficClass, StreamID: cfg.StreamID, }); err != nil { r.markRouteFailure(routeID, err) return nil, 0, err } started := time.Now() if err := session.Send(ctx, fabricproto.Frame{ Type: fabricproto.FrameData, TrafficClass: cfg.TrafficClass, StreamID: cfg.StreamID, Sequence: sequence, Payload: payload, }); err != nil { r.markRouteFailure(routeID, err) return nil, 0, err } waitCtx := ctx if cfg.Timeout > 0 { var cancel context.CancelFunc waitCtx, cancel = context.WithTimeout(ctx, cfg.Timeout) defer cancel() } for { select { case <-waitCtx.Done(): return nil, 0, waitCtx.Err() case err, ok := <-session.Errors(): if !ok { return nil, 0, ErrForwardPeerUnavailable } if err != nil { return nil, 0, err } case frame, ok := <-session.Frames(): if !ok { return nil, 0, ErrForwardPeerUnavailable } if frame.Type != fabricproto.FrameData || frame.StreamID != cfg.StreamID || frame.Sequence != sequence { continue } _ = session.Send(context.Background(), fabricproto.Frame{ Type: fabricproto.FrameCloseStream, TrafficClass: cfg.TrafficClass, StreamID: cfg.StreamID, }) return append([]byte(nil), frame.Payload...), time.Since(started).Milliseconds(), nil } } } func FabricTransportTargetForRoute(route FabricRoute) (FabricTransportTarget, error) { if strings.TrimSpace(route.RouteID) == "" { return FabricTransportTarget{}, ErrFabricRouteNotFound } if route.RelayCount > 0 { for _, hop := range route.Hops { if hop.Mode != FabricRouteRelay { continue } if target, ok := fabricTransportTargetForHop(hop); ok { return target, nil } } } for i := len(route.Hops) - 1; i >= 0; i-- { if target, ok := fabricTransportTargetForHop(route.Hops[i]); ok { return target, nil } } return FabricTransportTarget{}, fmt.Errorf("%w: route %s has no transport endpoint", ErrFabricRouteNotFound, route.RouteID) } func fabricTransportTargetForHop(hop FabricRouteHop) (FabricTransportTarget, bool) { endpoint := strings.TrimSpace(hop.Address) if endpoint == "" { return FabricTransportTarget{}, false } transport := string(hop.Mode) if transport == "" { transport = "quic" } return FabricTransportTarget{ EndpointID: hop.EndpointID, PeerID: strings.TrimSpace(hop.NodeID), Endpoint: endpoint, Transport: transport, PeerCertSHA256: strings.TrimSpace(hop.PeerCertSHA256), }, true } func waitForFabricRuntimeAck(ctx context.Context, session FabricTransportSession, streamID uint64, sequence uint64, timeout time.Duration) (bool, int64) { started := time.Now() if timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } for { select { case <-ctx.Done(): return false, 0 case err, ok := <-session.Errors(): if !ok || err != nil { return false, 0 } case frame, ok := <-session.Frames(): if !ok { return false, 0 } if frame.Type == fabricproto.FrameAck && frame.StreamID == streamID && frame.Sequence == sequence { return true, time.Since(started).Milliseconds() } } } }