Files
rdp-proxy/agents/rap-node-agent/internal/mesh/synthetic_runtime.go
T
2026-04-28 22:29:50 +03:00

1038 lines
32 KiB
Go

package mesh
import (
"context"
"encoding/json"
"sync"
"time"
)
type SyntheticTransport interface {
SendSynthetic(ctx context.Context, nextNodeID string, envelope SyntheticEnvelope) (SyntheticEnvelope, error)
}
type SyntheticLogEntry struct {
Event string `json:"event"`
RouteID string `json:"route_id,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
LocalNodeID string `json:"local_node_id,omitempty"`
NextNodeID string `json:"next_node_id,omitempty"`
Channel string `json:"channel,omitempty"`
MessageType string `json:"message_type,omitempty"`
Reason string `json:"reason,omitempty"`
TTL int `json:"ttl,omitempty"`
HopCount int `json:"hop_count,omitempty"`
QueueDepth int `json:"queue_depth,omitempty"`
QueueCapacity int `json:"queue_capacity,omitempty"`
Dropped bool `json:"dropped,omitempty"`
DroppedSequence uint64 `json:"dropped_sequence,omitempty"`
OccurredAt time.Time `json:"occurred_at"`
}
type SyntheticMetrics struct {
ProbesSent uint64
ProbesReceived uint64
ProbesForwarded uint64
ProbeAcksCreated uint64
RouteHealthProbesSent uint64
TestServiceRequestsSent uint64
TestServiceDeliveriesSucceeded uint64
TestServiceDeliveriesFailed uint64
TestServiceFallbacksUsed uint64
RouteDeliveriesSucceeded uint64
RouteDeliveriesFailed uint64
FallbackRoutesUsed uint64
WarmRoutesPromoted uint64
RouteCacheInvalidations uint64
Rejected uint64
LastRejectReason string
}
type SyntheticRuntimeConfig struct {
Enabled bool
Local PeerIdentity
Routes []SyntheticRoute
RouteHealthRoutes []SyntheticRoute
AllowedChannels []string
MaxTTL int
MaxHops int
TestOrganizationID string
MaxTestPayloadBytes int
Transport SyntheticTransport
Now func() time.Time
Logger func(SyntheticLogEntry)
}
type SyntheticRuntime struct {
enabled bool
local PeerIdentity
routes map[string]SyntheticRoute
routeHealthRoutes map[string]SyntheticRoute
allowedChannels map[string]struct{}
maxTTL int
maxHops int
testOrganizationID string
maxTestPayloadBytes int
transport SyntheticTransport
now func() time.Time
logger func(SyntheticLogEntry)
mu sync.Mutex
metrics SyntheticMetrics
seq uint64
routeHealth map[string]SyntheticRouteObservation
}
func NewSyntheticRuntime(cfg SyntheticRuntimeConfig) *SyntheticRuntime {
allowed := cfg.AllowedChannels
if len(allowed) == 0 {
allowed = []string{
SyntheticChannelFabricControl,
SyntheticChannelRouteControl,
SyntheticChannelTelemetry,
}
}
allowedSet := make(map[string]struct{}, len(allowed))
for _, channel := range allowed {
if channel != "" {
allowedSet[channel] = struct{}{}
}
}
routes := make(map[string]SyntheticRoute, len(cfg.Routes))
for _, route := range cfg.Routes {
if route.RouteID != "" {
routes[route.RouteID] = route
}
}
routeHealthRoutes := routes
if len(cfg.RouteHealthRoutes) > 0 {
routeHealthRoutes = make(map[string]SyntheticRoute, len(cfg.RouteHealthRoutes))
for _, route := range cfg.RouteHealthRoutes {
if route.RouteID != "" {
routeHealthRoutes[route.RouteID] = route
}
}
}
maxTTL := cfg.MaxTTL
if maxTTL <= 0 {
maxTTL = 8
}
maxHops := cfg.MaxHops
if maxHops <= 0 {
maxHops = 8
}
testOrganizationID := cfg.TestOrganizationID
if testOrganizationID == "" {
testOrganizationID = SyntheticDefaultTestOrganizationID
}
maxTestPayloadBytes := cfg.MaxTestPayloadBytes
if maxTestPayloadBytes <= 0 {
maxTestPayloadBytes = SyntheticDefaultMaxTestPayloadBytes
}
now := cfg.Now
if now == nil {
now = func() time.Time { return time.Now().UTC() }
}
return &SyntheticRuntime{
enabled: cfg.Enabled,
local: cfg.Local,
routes: routes,
routeHealthRoutes: routeHealthRoutes,
allowedChannels: allowedSet,
maxTTL: maxTTL,
maxHops: maxHops,
testOrganizationID: testOrganizationID,
maxTestPayloadBytes: maxTestPayloadBytes,
transport: cfg.Transport,
now: now,
logger: cfg.Logger,
routeHealth: map[string]SyntheticRouteObservation{},
}
}
func (r *SyntheticRuntime) UpdateConfig(routes []SyntheticRoute, transport SyntheticTransport) {
if r == nil {
return
}
updatedRoutes := make(map[string]SyntheticRoute, len(routes))
for _, route := range routes {
if route.RouteID != "" {
updatedRoutes[route.RouteID] = route
}
}
r.mu.Lock()
defer r.mu.Unlock()
r.routes = updatedRoutes
r.routeHealthRoutes = updatedRoutes
if transport != nil {
r.transport = transport
}
}
func (r *SyntheticRuntime) UpdateRouteHealthConfig(routes []SyntheticRoute) {
if r == nil {
return
}
updatedRoutes := make(map[string]SyntheticRoute, len(routes))
for _, route := range routes {
if route.RouteID != "" {
updatedRoutes[route.RouteID] = route
}
}
r.mu.Lock()
defer r.mu.Unlock()
r.routeHealthRoutes = updatedRoutes
}
func (r *SyntheticRuntime) SendProbe(ctx context.Context, routeID string, channel string, probeID string) (SyntheticEnvelope, error) {
return r.sendProbeMessage(ctx, routeID, channel, probeID, SyntheticMessageProbe)
}
func (r *SyntheticRuntime) SendRouteHealthProbe(ctx context.Context, routeID string, channel string, probeID string) (SyntheticRouteHealthResult, error) {
startedAt := r.now()
ack, err := r.sendProbeMessage(ctx, routeID, channel, probeID, SyntheticMessageRouteHealth)
if err != nil {
r.recordRouteFailure(routeID, startedAt, err)
return SyntheticRouteHealthResult{}, err
}
observation := r.recordRouteSuccess(routeID, startedAt)
return SyntheticRouteHealthResult{
RequestedRouteID: routeID,
SelectedRouteID: routeID,
FallbackUsed: false,
Ack: ack,
Observation: observation,
}, nil
}
func (r *SyntheticRuntime) SendRouteHealthProbeWithFallback(ctx context.Context, preferredRouteID string, fallbackRouteIDs []string, channel string, probeID string) (SyntheticRouteHealthResult, error) {
if err := r.ensureEnabled(); err != nil {
r.reject(preferredRouteID, channel, SyntheticMessageRouteHealth, err)
return SyntheticRouteHealthResult{}, err
}
candidates := append([]string{preferredRouteID}, fallbackRouteIDs...)
var lastErr error
for i, routeID := range candidates {
if routeID == "" {
continue
}
if i > 0 {
r.log(SyntheticLogEntry{
Event: "fabric_fallback_route_selected",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: channel,
MessageType: SyntheticMessageRouteHealth,
Reason: "preferred_route_unavailable",
OccurredAt: r.now(),
})
}
result, err := r.SendRouteHealthProbe(ctx, routeID, channel, probeID)
if err == nil {
if i > 0 {
result.RequestedRouteID = preferredRouteID
result.FallbackUsed = true
r.increment(func(m *SyntheticMetrics) {
m.FallbackRoutesUsed++
m.WarmRoutesPromoted++
})
r.log(SyntheticLogEntry{
Event: "fabric_warm_route_promoted",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: channel,
MessageType: SyntheticMessageRouteHealth,
OccurredAt: r.now(),
})
}
return result, nil
}
lastErr = err
r.log(SyntheticLogEntry{
Event: "fabric_route_delivery_failed",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: channel,
MessageType: SyntheticMessageRouteHealth,
Reason: err.Error(),
OccurredAt: r.now(),
})
}
if lastErr == nil {
lastErr = ErrNoHealthySyntheticRoute
}
r.reject(preferredRouteID, channel, SyntheticMessageRouteHealth, lastErr)
return SyntheticRouteHealthResult{}, lastErr
}
func (r *SyntheticRuntime) SendTestService(ctx context.Context, routeID string, channel string, request SyntheticTestServiceRequest) (SyntheticTestServiceResult, error) {
startedAt := r.now()
ack, err := r.sendTestServiceMessage(ctx, routeID, channel, request)
if err != nil {
r.recordTestServiceFailure(routeID, startedAt, err)
return SyntheticTestServiceResult{}, err
}
response, err := decodeTestServiceResponse(ack.Payload)
if err != nil {
r.recordTestServiceFailure(routeID, startedAt, err)
return SyntheticTestServiceResult{}, err
}
observation := r.recordTestServiceSuccess(routeID, startedAt)
return SyntheticTestServiceResult{
RequestedRouteID: routeID,
SelectedRouteID: routeID,
FallbackUsed: false,
Ack: ack,
Response: response,
Observation: observation,
}, nil
}
func (r *SyntheticRuntime) SendTestServiceWithFallback(ctx context.Context, preferredRouteID string, fallbackRouteIDs []string, channel string, request SyntheticTestServiceRequest) (SyntheticTestServiceResult, error) {
if err := r.ensureEnabled(); err != nil {
r.reject(preferredRouteID, channel, SyntheticMessageTestService, err)
return SyntheticTestServiceResult{}, err
}
candidates := append([]string{preferredRouteID}, fallbackRouteIDs...)
var lastErr error
for i, routeID := range candidates {
if routeID == "" {
continue
}
if i > 0 {
r.log(SyntheticLogEntry{
Event: "fabric_test_service_fallback_selected",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: channel,
MessageType: SyntheticMessageTestService,
Reason: "preferred_route_unavailable",
OccurredAt: r.now(),
})
}
result, err := r.SendTestService(ctx, routeID, channel, request)
if err == nil {
if i > 0 {
result.RequestedRouteID = preferredRouteID
result.FallbackUsed = true
r.increment(func(m *SyntheticMetrics) {
m.TestServiceFallbacksUsed++
})
r.log(SyntheticLogEntry{
Event: "fabric_test_service_fallback_used",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: channel,
MessageType: SyntheticMessageTestService,
OccurredAt: r.now(),
})
}
return result, nil
}
lastErr = err
}
if lastErr == nil {
lastErr = ErrNoHealthySyntheticRoute
}
r.reject(preferredRouteID, channel, SyntheticMessageTestService, lastErr)
return SyntheticTestServiceResult{}, lastErr
}
func (r *SyntheticRuntime) SnapshotRouteObservation(routeID string) (SyntheticRouteObservation, bool) {
r.mu.Lock()
defer r.mu.Unlock()
observation, ok := r.routeHealth[routeID]
return observation, ok
}
func (r *SyntheticRuntime) InvalidateRouteCache(reason string, version SyntheticRouteCacheVersion) int {
r.mu.Lock()
invalidated := 0
for routeID, observation := range r.routeHealth {
if shouldInvalidateObservation(observation, version) {
delete(r.routeHealth, routeID)
invalidated++
}
}
r.metrics.RouteCacheInvalidations += uint64(invalidated)
r.mu.Unlock()
r.log(SyntheticLogEntry{
Event: "fabric_route_cache_invalidated",
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
Reason: reason,
OccurredAt: r.now(),
})
return invalidated
}
func (r *SyntheticRuntime) sendProbeMessage(ctx context.Context, routeID string, channel string, probeID string, messageType string) (SyntheticEnvelope, error) {
if err := r.ensureEnabled(); err != nil {
r.reject(routeID, channel, messageType, err)
return SyntheticEnvelope{}, err
}
route, path, err := r.routeForMessage(routeID, messageType)
if err != nil {
r.reject(routeID, channel, messageType, err)
return SyntheticEnvelope{}, err
}
if err := r.validateRouteForSend(route, path, channel); err != nil {
r.reject(routeID, channel, messageType, err)
return SyntheticEnvelope{}, err
}
if len(path) < 2 {
r.reject(routeID, channel, messageType, ErrInvalidRoutePath)
return SyntheticEnvelope{}, ErrInvalidRoutePath
}
r.log(SyntheticLogEntry{
Event: "fabric_route_selected",
RouteID: route.RouteID,
ClusterID: route.ClusterID,
LocalNodeID: r.local.NodeID,
NextNodeID: path[1],
Channel: channel,
MessageType: messageType,
TTL: routeTTL(route, r.maxTTL),
OccurredAt: r.now(),
})
payload, err := json.Marshal(SyntheticProbePayload{
ProbeID: probeID,
SentAt: r.now(),
})
if err != nil {
return SyntheticEnvelope{}, err
}
envelope := SyntheticEnvelope{
ProtocolVersion: ProtocolVersion,
RouteID: route.RouteID,
ClusterID: route.ClusterID,
From: r.local,
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: path[1]},
Channel: channel,
MessageType: messageType,
TTL: routeTTL(route, r.maxTTL),
HopCount: 1,
Visited: []string{r.local.NodeID},
Sequence: r.nextSequence(),
SentAt: r.now(),
Payload: payload,
}
r.increment(func(m *SyntheticMetrics) {
if messageType == SyntheticMessageRouteHealth {
m.RouteHealthProbesSent++
} else {
m.ProbesSent++
}
})
r.log(SyntheticLogEntry{
Event: "fabric_probe_sent",
RouteID: envelope.RouteID,
ClusterID: envelope.ClusterID,
LocalNodeID: r.local.NodeID,
NextNodeID: envelope.To.NodeID,
Channel: envelope.Channel,
MessageType: envelope.MessageType,
TTL: envelope.TTL,
HopCount: envelope.HopCount,
OccurredAt: r.now(),
})
return r.send(ctx, envelope.To.NodeID, envelope)
}
func (r *SyntheticRuntime) sendTestServiceMessage(ctx context.Context, routeID string, channel string, request SyntheticTestServiceRequest) (SyntheticEnvelope, error) {
if err := r.ensureEnabled(); err != nil {
r.reject(routeID, channel, SyntheticMessageTestService, err)
return SyntheticEnvelope{}, err
}
if err := r.validateTestServiceRequest(request); err != nil {
r.reject(routeID, channel, SyntheticMessageTestService, err)
return SyntheticEnvelope{}, err
}
route, path, err := r.route(routeID)
if err != nil {
r.reject(routeID, channel, SyntheticMessageTestService, err)
return SyntheticEnvelope{}, err
}
if err := r.validateRouteForSend(route, path, channel); err != nil {
r.reject(routeID, channel, SyntheticMessageTestService, err)
return SyntheticEnvelope{}, err
}
if len(path) < 2 {
r.reject(routeID, channel, SyntheticMessageTestService, ErrInvalidRoutePath)
return SyntheticEnvelope{}, ErrInvalidRoutePath
}
if request.SentAt.IsZero() {
request.SentAt = r.now()
}
payload, err := json.Marshal(request)
if err != nil {
return SyntheticEnvelope{}, err
}
envelope := SyntheticEnvelope{
ProtocolVersion: ProtocolVersion,
RouteID: route.RouteID,
ClusterID: route.ClusterID,
From: r.local,
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: path[1]},
Channel: channel,
MessageType: SyntheticMessageTestService,
TTL: routeTTL(route, r.maxTTL),
HopCount: 1,
Visited: []string{r.local.NodeID},
Sequence: r.nextSequence(),
SentAt: r.now(),
Payload: payload,
}
r.increment(func(m *SyntheticMetrics) {
m.TestServiceRequestsSent++
})
r.log(SyntheticLogEntry{
Event: "fabric_test_service_sent",
RouteID: envelope.RouteID,
ClusterID: envelope.ClusterID,
LocalNodeID: r.local.NodeID,
NextNodeID: envelope.To.NodeID,
Channel: envelope.Channel,
MessageType: envelope.MessageType,
TTL: envelope.TTL,
HopCount: envelope.HopCount,
OccurredAt: r.now(),
})
return r.send(ctx, envelope.To.NodeID, envelope)
}
func (r *SyntheticRuntime) Receive(ctx context.Context, envelope SyntheticEnvelope) (SyntheticEnvelope, error) {
if err := r.ensureEnabled(); err != nil {
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
return SyntheticEnvelope{}, err
}
route, path, err := r.validateInbound(envelope)
if err != nil {
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
return SyntheticEnvelope{}, err
}
if envelope.MessageType == SyntheticMessageTestService {
request, err := decodeTestServiceRequest(envelope.Payload)
if err != nil {
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
return SyntheticEnvelope{}, err
}
if err := r.validateTestServiceRequest(request); err != nil {
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
return SyntheticEnvelope{}, err
}
}
r.increment(func(m *SyntheticMetrics) { m.ProbesReceived++ })
r.log(SyntheticLogEntry{
Event: "fabric_probe_received",
RouteID: envelope.RouteID,
ClusterID: envelope.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: envelope.Channel,
MessageType: envelope.MessageType,
TTL: envelope.TTL,
HopCount: envelope.HopCount,
OccurredAt: r.now(),
})
visited := append(append([]string{}, envelope.Visited...), r.local.NodeID)
if r.local.NodeID == route.DestinationNodeID {
return r.ack(route, envelope, visited)
}
if envelope.TTL <= 1 {
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, ErrTTLExhausted)
return SyntheticEnvelope{}, ErrTTLExhausted
}
index := indexOf(path, r.local.NodeID)
if index < 0 || index+1 >= len(path) {
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, ErrInvalidRoutePath)
return SyntheticEnvelope{}, ErrInvalidRoutePath
}
nextNodeID := path[index+1]
forward := envelope
forward.From = r.local
forward.To = PeerIdentity{ClusterID: route.ClusterID, NodeID: nextNodeID}
forward.TTL = envelope.TTL - 1
forward.HopCount = envelope.HopCount + 1
forward.Visited = visited
forward.SentAt = r.now()
r.increment(func(m *SyntheticMetrics) { m.ProbesForwarded++ })
r.log(SyntheticLogEntry{
Event: "fabric_probe_forwarded",
RouteID: forward.RouteID,
ClusterID: forward.ClusterID,
LocalNodeID: r.local.NodeID,
NextNodeID: nextNodeID,
Channel: forward.Channel,
MessageType: forward.MessageType,
TTL: forward.TTL,
HopCount: forward.HopCount,
OccurredAt: r.now(),
})
return r.send(ctx, nextNodeID, forward)
}
func (r *SyntheticRuntime) SnapshotMetrics() SyntheticMetrics {
r.mu.Lock()
defer r.mu.Unlock()
return r.metrics
}
func (r *SyntheticRuntime) ensureEnabled() error {
if r == nil || !r.enabled {
return ErrMeshRuntimeDisabled
}
if r.local.ClusterID == "" || r.local.NodeID == "" {
return ErrNodeMismatch
}
return nil
}
func (r *SyntheticRuntime) route(routeID string) (SyntheticRoute, []string, error) {
return r.routeFrom(routeID, r.routes)
}
func (r *SyntheticRuntime) routeForMessage(routeID string, messageType string) (SyntheticRoute, []string, error) {
if messageType == SyntheticMessageRouteHealth {
return r.routeFrom(routeID, r.routeHealthRoutes)
}
return r.route(routeID)
}
func (r *SyntheticRuntime) routeFrom(routeID string, routes map[string]SyntheticRoute) (SyntheticRoute, []string, error) {
if routeID == "" {
return SyntheticRoute{}, nil, ErrRouteIDRequired
}
r.mu.Lock()
route, ok := routes[routeID]
r.mu.Unlock()
if !ok {
return SyntheticRoute{}, nil, ErrRouteNotFound
}
path := routePath(route)
if len(path) < 2 || path[0] != route.SourceNodeID || path[len(path)-1] != route.DestinationNodeID {
return SyntheticRoute{}, nil, ErrInvalidRoutePath
}
if route.ClusterID == "" || route.ClusterID != r.local.ClusterID {
return SyntheticRoute{}, nil, ErrClusterMismatch
}
if !route.ExpiresAt.IsZero() && !r.now().Before(route.ExpiresAt) {
return SyntheticRoute{}, nil, ErrRouteExpired
}
return route, path, nil
}
func (r *SyntheticRuntime) validateRouteForSend(route SyntheticRoute, path []string, channel string) error {
if route.SourceNodeID != r.local.NodeID {
return ErrNodeMismatch
}
if routeTTL(route, r.maxTTL) <= 0 {
return ErrTTLExhausted
}
if routeMaxHops(route, r.maxHops) < len(path)-1 {
return ErrInvalidRoutePath
}
return r.validateChannel(route, channel)
}
func (r *SyntheticRuntime) validateInbound(envelope SyntheticEnvelope) (SyntheticRoute, []string, error) {
if envelope.ProtocolVersion != ProtocolVersion {
return SyntheticRoute{}, nil, ErrUnsupportedSyntheticMessage
}
if envelope.MessageType != SyntheticMessageProbe && envelope.MessageType != SyntheticMessageRouteHealth && envelope.MessageType != SyntheticMessageTestService {
return SyntheticRoute{}, nil, ErrUnsupportedSyntheticMessage
}
if envelope.ClusterID == "" || envelope.ClusterID != r.local.ClusterID {
return SyntheticRoute{}, nil, ErrClusterMismatch
}
if envelope.To.ClusterID != r.local.ClusterID || envelope.To.NodeID != r.local.NodeID {
return SyntheticRoute{}, nil, ErrNodeMismatch
}
if envelope.TTL <= 0 {
return SyntheticRoute{}, nil, ErrTTLExhausted
}
route, path, err := r.routeForMessage(envelope.RouteID, envelope.MessageType)
if err != nil {
return SyntheticRoute{}, nil, err
}
if envelope.ClusterID != route.ClusterID {
return SyntheticRoute{}, nil, ErrClusterMismatch
}
if envelope.HopCount <= 0 || envelope.HopCount > routeMaxHops(route, r.maxHops) {
return SyntheticRoute{}, nil, ErrInvalidRoutePath
}
index := indexOf(path, r.local.NodeID)
if index < 0 {
return SyntheticRoute{}, nil, ErrNodeMismatch
}
if envelope.HopCount != index {
return SyntheticRoute{}, nil, ErrInvalidRoutePath
}
if contains(envelope.Visited, r.local.NodeID) {
return SyntheticRoute{}, nil, ErrLoopDetected
}
if index > 0 && len(envelope.Visited) > 0 && envelope.Visited[len(envelope.Visited)-1] != path[index-1] {
return SyntheticRoute{}, nil, ErrInvalidRoutePath
}
if err := r.validateChannel(route, envelope.Channel); err != nil {
return SyntheticRoute{}, nil, err
}
return route, path, nil
}
func (r *SyntheticRuntime) validateChannel(route SyntheticRoute, channel string) error {
if channel == "" {
return ErrUnauthorizedChannel
}
if _, ok := r.allowedChannels[channel]; !ok {
return ErrUnauthorizedChannel
}
if len(route.AllowedChannels) == 0 {
return nil
}
for _, allowed := range route.AllowedChannels {
if allowed == channel {
return nil
}
}
return ErrUnauthorizedChannel
}
func (r *SyntheticRuntime) ack(route SyntheticRoute, envelope SyntheticEnvelope, visited []string) (SyntheticEnvelope, error) {
if envelope.MessageType == SyntheticMessageTestService {
return r.ackTestService(route, envelope, visited)
}
var probe SyntheticProbePayload
_ = json.Unmarshal(envelope.Payload, &probe)
payload, err := json.Marshal(SyntheticProbeAckPayload{
ProbeID: probe.ProbeID,
Path: visited,
AcceptedAt: r.now(),
})
if err != nil {
return SyntheticEnvelope{}, err
}
ack := SyntheticEnvelope{
ProtocolVersion: ProtocolVersion,
RouteID: route.RouteID,
ClusterID: route.ClusterID,
From: r.local,
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: route.SourceNodeID},
Channel: envelope.Channel,
MessageType: syntheticAckMessageType(envelope.MessageType),
TTL: envelope.TTL,
HopCount: envelope.HopCount,
Visited: visited,
Sequence: envelope.Sequence,
SentAt: r.now(),
Payload: payload,
}
r.increment(func(m *SyntheticMetrics) { m.ProbeAcksCreated++ })
r.log(SyntheticLogEntry{
Event: "fabric_probe_ack_created",
RouteID: ack.RouteID,
ClusterID: ack.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: ack.Channel,
MessageType: ack.MessageType,
TTL: ack.TTL,
HopCount: ack.HopCount,
OccurredAt: r.now(),
})
return ack, nil
}
func (r *SyntheticRuntime) ackTestService(route SyntheticRoute, envelope SyntheticEnvelope, visited []string) (SyntheticEnvelope, error) {
request, err := decodeTestServiceRequest(envelope.Payload)
if err != nil {
return SyntheticEnvelope{}, err
}
if err := r.validateTestServiceRequest(request); err != nil {
return SyntheticEnvelope{}, err
}
response := SyntheticTestServiceResponse{
RequestID: request.RequestID,
OrganizationID: request.OrganizationID,
ServiceType: request.ServiceType,
EchoPayload: request.Payload,
Path: visited,
AcceptedAt: r.now(),
}
payload, err := json.Marshal(response)
if err != nil {
return SyntheticEnvelope{}, err
}
ack := SyntheticEnvelope{
ProtocolVersion: ProtocolVersion,
RouteID: route.RouteID,
ClusterID: route.ClusterID,
From: r.local,
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: route.SourceNodeID},
Channel: envelope.Channel,
MessageType: SyntheticMessageTestServiceAck,
TTL: envelope.TTL,
HopCount: envelope.HopCount,
Visited: visited,
Sequence: envelope.Sequence,
SentAt: r.now(),
Payload: payload,
}
r.increment(func(m *SyntheticMetrics) { m.ProbeAcksCreated++ })
r.log(SyntheticLogEntry{
Event: "fabric_test_service_ack_created",
RouteID: ack.RouteID,
ClusterID: ack.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: ack.Channel,
MessageType: ack.MessageType,
TTL: ack.TTL,
HopCount: ack.HopCount,
OccurredAt: r.now(),
})
return ack, nil
}
func (r *SyntheticRuntime) recordRouteSuccess(routeID string, startedAt time.Time) SyntheticRouteObservation {
return r.recordRouteSuccessForMessage(routeID, startedAt, SyntheticMessageRouteHealth)
}
func (r *SyntheticRuntime) recordRouteSuccessForMessage(routeID string, startedAt time.Time, messageType string) SyntheticRouteObservation {
route, _, _ := r.routeForMessage(routeID, messageType)
now := r.now()
observation := SyntheticRouteObservation{}
r.mu.Lock()
observation = r.routeHealth[routeID]
observation.RouteID = routeID
observation.State = SyntheticRouteStateHealthy
observation.LastSuccessAt = now
observation.LastFailureReason = ""
observation.SuccessCount++
observation.LastLatencyMs = now.Sub(startedAt).Milliseconds()
observation.RouteVersion = route.RouteVersion
observation.PolicyVersion = route.PolicyVersion
observation.PeerDirectoryVersion = route.PeerDirectoryVersion
r.routeHealth[routeID] = observation
r.metrics.RouteDeliveriesSucceeded++
r.mu.Unlock()
r.log(SyntheticLogEntry{
Event: "fabric_route_delivery_succeeded",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
MessageType: messageType,
OccurredAt: now,
})
return observation
}
func (r *SyntheticRuntime) recordRouteFailure(routeID string, startedAt time.Time, err error) SyntheticRouteObservation {
return r.recordRouteFailureForMessage(routeID, startedAt, err, SyntheticMessageRouteHealth)
}
func (r *SyntheticRuntime) recordRouteFailureForMessage(routeID string, startedAt time.Time, err error, messageType string) SyntheticRouteObservation {
route, _, _ := r.routeForMessage(routeID, messageType)
now := r.now()
reason := ""
if err != nil {
reason = err.Error()
}
observation := SyntheticRouteObservation{}
r.mu.Lock()
observation = r.routeHealth[routeID]
observation.RouteID = routeID
observation.State = SyntheticRouteStateFailed
observation.LastFailureAt = now
observation.LastFailureReason = reason
observation.FailureCount++
observation.LastLatencyMs = now.Sub(startedAt).Milliseconds()
observation.RouteVersion = route.RouteVersion
observation.PolicyVersion = route.PolicyVersion
observation.PeerDirectoryVersion = route.PeerDirectoryVersion
r.routeHealth[routeID] = observation
r.metrics.RouteDeliveriesFailed++
r.mu.Unlock()
r.log(SyntheticLogEntry{
Event: "fabric_route_marked_failed",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
MessageType: messageType,
Reason: reason,
OccurredAt: now,
})
return observation
}
func (r *SyntheticRuntime) recordTestServiceSuccess(routeID string, startedAt time.Time) SyntheticRouteObservation {
observation := r.recordRouteSuccessForMessage(routeID, startedAt, SyntheticMessageTestService)
r.increment(func(m *SyntheticMetrics) {
m.TestServiceDeliveriesSucceeded++
})
r.log(SyntheticLogEntry{
Event: "fabric_test_service_delivery_succeeded",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
MessageType: SyntheticMessageTestService,
OccurredAt: r.now(),
})
return observation
}
func (r *SyntheticRuntime) recordTestServiceFailure(routeID string, startedAt time.Time, err error) SyntheticRouteObservation {
observation := r.recordRouteFailureForMessage(routeID, startedAt, err, SyntheticMessageTestService)
r.increment(func(m *SyntheticMetrics) {
m.TestServiceDeliveriesFailed++
})
return observation
}
func (r *SyntheticRuntime) send(ctx context.Context, nextNodeID string, envelope SyntheticEnvelope) (SyntheticEnvelope, error) {
r.mu.Lock()
transport := r.transport
r.mu.Unlock()
if transport == nil {
return SyntheticEnvelope{}, ErrSyntheticPeerUnavailable
}
return transport.SendSynthetic(ctx, nextNodeID, envelope)
}
func (r *SyntheticRuntime) nextSequence() uint64 {
r.mu.Lock()
defer r.mu.Unlock()
r.seq++
return r.seq
}
func (r *SyntheticRuntime) increment(fn func(*SyntheticMetrics)) {
r.mu.Lock()
defer r.mu.Unlock()
fn(&r.metrics)
}
func (r *SyntheticRuntime) reject(routeID string, channel string, messageType string, err error) {
reason := ""
if err != nil {
reason = err.Error()
}
r.increment(func(m *SyntheticMetrics) {
m.Rejected++
m.LastRejectReason = reason
})
r.log(SyntheticLogEntry{
Event: "fabric_probe_rejected",
RouteID: routeID,
ClusterID: r.local.ClusterID,
LocalNodeID: r.local.NodeID,
Channel: channel,
MessageType: messageType,
Reason: reason,
OccurredAt: r.now(),
})
}
func (r *SyntheticRuntime) log(entry SyntheticLogEntry) {
if r.logger != nil {
r.logger(entry)
}
}
func routePath(route SyntheticRoute) []string {
if len(route.Hops) > 0 {
return append([]string{}, route.Hops...)
}
return []string{route.SourceNodeID, route.DestinationNodeID}
}
func routeTTL(route SyntheticRoute, fallback int) int {
if route.MaxTTL > 0 {
return route.MaxTTL
}
return fallback
}
func routeMaxHops(route SyntheticRoute, fallback int) int {
if route.MaxHops > 0 {
return route.MaxHops
}
return fallback
}
func indexOf(values []string, needle string) int {
for i, value := range values {
if value == needle {
return i
}
}
return -1
}
func contains(values []string, needle string) bool {
return indexOf(values, needle) >= 0
}
func syntheticAckMessageType(messageType string) string {
if messageType == SyntheticMessageTestService {
return SyntheticMessageTestServiceAck
}
if messageType == SyntheticMessageRouteHealth {
return SyntheticMessageRouteHealthAck
}
return SyntheticMessageProbeAck
}
func (r *SyntheticRuntime) validateTestServiceRequest(request SyntheticTestServiceRequest) error {
if request.RequestID == "" {
return ErrSyntheticRequestInvalid
}
if request.OrganizationID != r.testOrganizationID {
return ErrSyntheticOrganizationMismatch
}
if request.ServiceType != SyntheticTestServiceType {
return ErrUnsupportedSyntheticService
}
if len([]byte(request.Payload)) > r.maxTestPayloadBytes {
return ErrSyntheticPayloadTooLarge
}
return nil
}
func decodeTestServiceRequest(payload json.RawMessage) (SyntheticTestServiceRequest, error) {
var request SyntheticTestServiceRequest
if len(payload) == 0 {
return request, ErrSyntheticRequestInvalid
}
if err := json.Unmarshal(payload, &request); err != nil {
return request, err
}
return request, nil
}
func decodeTestServiceResponse(payload json.RawMessage) (SyntheticTestServiceResponse, error) {
var response SyntheticTestServiceResponse
if len(payload) == 0 {
return response, ErrSyntheticRequestInvalid
}
if err := json.Unmarshal(payload, &response); err != nil {
return response, err
}
return response, nil
}
func shouldInvalidateObservation(observation SyntheticRouteObservation, version SyntheticRouteCacheVersion) bool {
if version.RouteVersion != "" && observation.RouteVersion != version.RouteVersion {
return true
}
if version.PolicyVersion != "" && observation.PolicyVersion != version.PolicyVersion {
return true
}
if version.PeerDirectoryVersion != "" && observation.PeerDirectoryVersion != version.PeerDirectoryVersion {
return true
}
return false
}