1038 lines
32 KiB
Go
1038 lines
32 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type SyntheticTransport interface {
|
|
SendSynthetic(ctx context.Context, nextNodeID string, envelope SyntheticEnvelope) (SyntheticEnvelope, error)
|
|
}
|
|
|
|
type SyntheticLogEntry struct {
|
|
Event string `json:"event"`
|
|
RouteID string `json:"route_id,omitempty"`
|
|
ClusterID string `json:"cluster_id,omitempty"`
|
|
LocalNodeID string `json:"local_node_id,omitempty"`
|
|
NextNodeID string `json:"next_node_id,omitempty"`
|
|
Channel string `json:"channel,omitempty"`
|
|
MessageType string `json:"message_type,omitempty"`
|
|
Reason string `json:"reason,omitempty"`
|
|
TTL int `json:"ttl,omitempty"`
|
|
HopCount int `json:"hop_count,omitempty"`
|
|
QueueDepth int `json:"queue_depth,omitempty"`
|
|
QueueCapacity int `json:"queue_capacity,omitempty"`
|
|
Dropped bool `json:"dropped,omitempty"`
|
|
DroppedSequence uint64 `json:"dropped_sequence,omitempty"`
|
|
OccurredAt time.Time `json:"occurred_at"`
|
|
}
|
|
|
|
type SyntheticMetrics struct {
|
|
ProbesSent uint64
|
|
ProbesReceived uint64
|
|
ProbesForwarded uint64
|
|
ProbeAcksCreated uint64
|
|
RouteHealthProbesSent uint64
|
|
TestServiceRequestsSent uint64
|
|
TestServiceDeliveriesSucceeded uint64
|
|
TestServiceDeliveriesFailed uint64
|
|
TestServiceFallbacksUsed uint64
|
|
RouteDeliveriesSucceeded uint64
|
|
RouteDeliveriesFailed uint64
|
|
FallbackRoutesUsed uint64
|
|
WarmRoutesPromoted uint64
|
|
RouteCacheInvalidations uint64
|
|
Rejected uint64
|
|
LastRejectReason string
|
|
}
|
|
|
|
type SyntheticRuntimeConfig struct {
|
|
Enabled bool
|
|
Local PeerIdentity
|
|
Routes []SyntheticRoute
|
|
RouteHealthRoutes []SyntheticRoute
|
|
AllowedChannels []string
|
|
MaxTTL int
|
|
MaxHops int
|
|
TestOrganizationID string
|
|
MaxTestPayloadBytes int
|
|
Transport SyntheticTransport
|
|
Now func() time.Time
|
|
Logger func(SyntheticLogEntry)
|
|
}
|
|
|
|
type SyntheticRuntime struct {
|
|
enabled bool
|
|
local PeerIdentity
|
|
routes map[string]SyntheticRoute
|
|
routeHealthRoutes map[string]SyntheticRoute
|
|
allowedChannels map[string]struct{}
|
|
maxTTL int
|
|
maxHops int
|
|
testOrganizationID string
|
|
maxTestPayloadBytes int
|
|
transport SyntheticTransport
|
|
now func() time.Time
|
|
logger func(SyntheticLogEntry)
|
|
|
|
mu sync.Mutex
|
|
metrics SyntheticMetrics
|
|
seq uint64
|
|
routeHealth map[string]SyntheticRouteObservation
|
|
}
|
|
|
|
func NewSyntheticRuntime(cfg SyntheticRuntimeConfig) *SyntheticRuntime {
|
|
allowed := cfg.AllowedChannels
|
|
if len(allowed) == 0 {
|
|
allowed = []string{
|
|
SyntheticChannelFabricControl,
|
|
SyntheticChannelRouteControl,
|
|
SyntheticChannelTelemetry,
|
|
}
|
|
}
|
|
allowedSet := make(map[string]struct{}, len(allowed))
|
|
for _, channel := range allowed {
|
|
if channel != "" {
|
|
allowedSet[channel] = struct{}{}
|
|
}
|
|
}
|
|
routes := make(map[string]SyntheticRoute, len(cfg.Routes))
|
|
for _, route := range cfg.Routes {
|
|
if route.RouteID != "" {
|
|
routes[route.RouteID] = route
|
|
}
|
|
}
|
|
routeHealthRoutes := routes
|
|
if len(cfg.RouteHealthRoutes) > 0 {
|
|
routeHealthRoutes = make(map[string]SyntheticRoute, len(cfg.RouteHealthRoutes))
|
|
for _, route := range cfg.RouteHealthRoutes {
|
|
if route.RouteID != "" {
|
|
routeHealthRoutes[route.RouteID] = route
|
|
}
|
|
}
|
|
}
|
|
maxTTL := cfg.MaxTTL
|
|
if maxTTL <= 0 {
|
|
maxTTL = 8
|
|
}
|
|
maxHops := cfg.MaxHops
|
|
if maxHops <= 0 {
|
|
maxHops = 8
|
|
}
|
|
testOrganizationID := cfg.TestOrganizationID
|
|
if testOrganizationID == "" {
|
|
testOrganizationID = SyntheticDefaultTestOrganizationID
|
|
}
|
|
maxTestPayloadBytes := cfg.MaxTestPayloadBytes
|
|
if maxTestPayloadBytes <= 0 {
|
|
maxTestPayloadBytes = SyntheticDefaultMaxTestPayloadBytes
|
|
}
|
|
now := cfg.Now
|
|
if now == nil {
|
|
now = func() time.Time { return time.Now().UTC() }
|
|
}
|
|
return &SyntheticRuntime{
|
|
enabled: cfg.Enabled,
|
|
local: cfg.Local,
|
|
routes: routes,
|
|
routeHealthRoutes: routeHealthRoutes,
|
|
allowedChannels: allowedSet,
|
|
maxTTL: maxTTL,
|
|
maxHops: maxHops,
|
|
testOrganizationID: testOrganizationID,
|
|
maxTestPayloadBytes: maxTestPayloadBytes,
|
|
transport: cfg.Transport,
|
|
now: now,
|
|
logger: cfg.Logger,
|
|
routeHealth: map[string]SyntheticRouteObservation{},
|
|
}
|
|
}
|
|
|
|
func (r *SyntheticRuntime) UpdateConfig(routes []SyntheticRoute, transport SyntheticTransport) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
updatedRoutes := make(map[string]SyntheticRoute, len(routes))
|
|
for _, route := range routes {
|
|
if route.RouteID != "" {
|
|
updatedRoutes[route.RouteID] = route
|
|
}
|
|
}
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.routes = updatedRoutes
|
|
r.routeHealthRoutes = updatedRoutes
|
|
if transport != nil {
|
|
r.transport = transport
|
|
}
|
|
}
|
|
|
|
func (r *SyntheticRuntime) UpdateRouteHealthConfig(routes []SyntheticRoute) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
updatedRoutes := make(map[string]SyntheticRoute, len(routes))
|
|
for _, route := range routes {
|
|
if route.RouteID != "" {
|
|
updatedRoutes[route.RouteID] = route
|
|
}
|
|
}
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.routeHealthRoutes = updatedRoutes
|
|
}
|
|
|
|
func (r *SyntheticRuntime) SendProbe(ctx context.Context, routeID string, channel string, probeID string) (SyntheticEnvelope, error) {
|
|
return r.sendProbeMessage(ctx, routeID, channel, probeID, SyntheticMessageProbe)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) SendRouteHealthProbe(ctx context.Context, routeID string, channel string, probeID string) (SyntheticRouteHealthResult, error) {
|
|
startedAt := r.now()
|
|
ack, err := r.sendProbeMessage(ctx, routeID, channel, probeID, SyntheticMessageRouteHealth)
|
|
if err != nil {
|
|
r.recordRouteFailure(routeID, startedAt, err)
|
|
return SyntheticRouteHealthResult{}, err
|
|
}
|
|
observation := r.recordRouteSuccess(routeID, startedAt)
|
|
return SyntheticRouteHealthResult{
|
|
RequestedRouteID: routeID,
|
|
SelectedRouteID: routeID,
|
|
FallbackUsed: false,
|
|
Ack: ack,
|
|
Observation: observation,
|
|
}, nil
|
|
}
|
|
|
|
func (r *SyntheticRuntime) SendRouteHealthProbeWithFallback(ctx context.Context, preferredRouteID string, fallbackRouteIDs []string, channel string, probeID string) (SyntheticRouteHealthResult, error) {
|
|
if err := r.ensureEnabled(); err != nil {
|
|
r.reject(preferredRouteID, channel, SyntheticMessageRouteHealth, err)
|
|
return SyntheticRouteHealthResult{}, err
|
|
}
|
|
candidates := append([]string{preferredRouteID}, fallbackRouteIDs...)
|
|
var lastErr error
|
|
for i, routeID := range candidates {
|
|
if routeID == "" {
|
|
continue
|
|
}
|
|
if i > 0 {
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_fallback_route_selected",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: channel,
|
|
MessageType: SyntheticMessageRouteHealth,
|
|
Reason: "preferred_route_unavailable",
|
|
OccurredAt: r.now(),
|
|
})
|
|
}
|
|
result, err := r.SendRouteHealthProbe(ctx, routeID, channel, probeID)
|
|
if err == nil {
|
|
if i > 0 {
|
|
result.RequestedRouteID = preferredRouteID
|
|
result.FallbackUsed = true
|
|
r.increment(func(m *SyntheticMetrics) {
|
|
m.FallbackRoutesUsed++
|
|
m.WarmRoutesPromoted++
|
|
})
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_warm_route_promoted",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: channel,
|
|
MessageType: SyntheticMessageRouteHealth,
|
|
OccurredAt: r.now(),
|
|
})
|
|
}
|
|
return result, nil
|
|
}
|
|
lastErr = err
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_route_delivery_failed",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: channel,
|
|
MessageType: SyntheticMessageRouteHealth,
|
|
Reason: err.Error(),
|
|
OccurredAt: r.now(),
|
|
})
|
|
}
|
|
if lastErr == nil {
|
|
lastErr = ErrNoHealthySyntheticRoute
|
|
}
|
|
r.reject(preferredRouteID, channel, SyntheticMessageRouteHealth, lastErr)
|
|
return SyntheticRouteHealthResult{}, lastErr
|
|
}
|
|
|
|
func (r *SyntheticRuntime) SendTestService(ctx context.Context, routeID string, channel string, request SyntheticTestServiceRequest) (SyntheticTestServiceResult, error) {
|
|
startedAt := r.now()
|
|
ack, err := r.sendTestServiceMessage(ctx, routeID, channel, request)
|
|
if err != nil {
|
|
r.recordTestServiceFailure(routeID, startedAt, err)
|
|
return SyntheticTestServiceResult{}, err
|
|
}
|
|
response, err := decodeTestServiceResponse(ack.Payload)
|
|
if err != nil {
|
|
r.recordTestServiceFailure(routeID, startedAt, err)
|
|
return SyntheticTestServiceResult{}, err
|
|
}
|
|
observation := r.recordTestServiceSuccess(routeID, startedAt)
|
|
return SyntheticTestServiceResult{
|
|
RequestedRouteID: routeID,
|
|
SelectedRouteID: routeID,
|
|
FallbackUsed: false,
|
|
Ack: ack,
|
|
Response: response,
|
|
Observation: observation,
|
|
}, nil
|
|
}
|
|
|
|
func (r *SyntheticRuntime) SendTestServiceWithFallback(ctx context.Context, preferredRouteID string, fallbackRouteIDs []string, channel string, request SyntheticTestServiceRequest) (SyntheticTestServiceResult, error) {
|
|
if err := r.ensureEnabled(); err != nil {
|
|
r.reject(preferredRouteID, channel, SyntheticMessageTestService, err)
|
|
return SyntheticTestServiceResult{}, err
|
|
}
|
|
candidates := append([]string{preferredRouteID}, fallbackRouteIDs...)
|
|
var lastErr error
|
|
for i, routeID := range candidates {
|
|
if routeID == "" {
|
|
continue
|
|
}
|
|
if i > 0 {
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_test_service_fallback_selected",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: channel,
|
|
MessageType: SyntheticMessageTestService,
|
|
Reason: "preferred_route_unavailable",
|
|
OccurredAt: r.now(),
|
|
})
|
|
}
|
|
result, err := r.SendTestService(ctx, routeID, channel, request)
|
|
if err == nil {
|
|
if i > 0 {
|
|
result.RequestedRouteID = preferredRouteID
|
|
result.FallbackUsed = true
|
|
r.increment(func(m *SyntheticMetrics) {
|
|
m.TestServiceFallbacksUsed++
|
|
})
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_test_service_fallback_used",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: channel,
|
|
MessageType: SyntheticMessageTestService,
|
|
OccurredAt: r.now(),
|
|
})
|
|
}
|
|
return result, nil
|
|
}
|
|
lastErr = err
|
|
}
|
|
if lastErr == nil {
|
|
lastErr = ErrNoHealthySyntheticRoute
|
|
}
|
|
r.reject(preferredRouteID, channel, SyntheticMessageTestService, lastErr)
|
|
return SyntheticTestServiceResult{}, lastErr
|
|
}
|
|
|
|
func (r *SyntheticRuntime) SnapshotRouteObservation(routeID string) (SyntheticRouteObservation, bool) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
observation, ok := r.routeHealth[routeID]
|
|
return observation, ok
|
|
}
|
|
|
|
func (r *SyntheticRuntime) InvalidateRouteCache(reason string, version SyntheticRouteCacheVersion) int {
|
|
r.mu.Lock()
|
|
invalidated := 0
|
|
for routeID, observation := range r.routeHealth {
|
|
if shouldInvalidateObservation(observation, version) {
|
|
delete(r.routeHealth, routeID)
|
|
invalidated++
|
|
}
|
|
}
|
|
r.metrics.RouteCacheInvalidations += uint64(invalidated)
|
|
r.mu.Unlock()
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_route_cache_invalidated",
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Reason: reason,
|
|
OccurredAt: r.now(),
|
|
})
|
|
return invalidated
|
|
}
|
|
|
|
func (r *SyntheticRuntime) sendProbeMessage(ctx context.Context, routeID string, channel string, probeID string, messageType string) (SyntheticEnvelope, error) {
|
|
if err := r.ensureEnabled(); err != nil {
|
|
r.reject(routeID, channel, messageType, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
route, path, err := r.routeForMessage(routeID, messageType)
|
|
if err != nil {
|
|
r.reject(routeID, channel, messageType, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if err := r.validateRouteForSend(route, path, channel); err != nil {
|
|
r.reject(routeID, channel, messageType, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if len(path) < 2 {
|
|
r.reject(routeID, channel, messageType, ErrInvalidRoutePath)
|
|
return SyntheticEnvelope{}, ErrInvalidRoutePath
|
|
}
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_route_selected",
|
|
RouteID: route.RouteID,
|
|
ClusterID: route.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
NextNodeID: path[1],
|
|
Channel: channel,
|
|
MessageType: messageType,
|
|
TTL: routeTTL(route, r.maxTTL),
|
|
OccurredAt: r.now(),
|
|
})
|
|
payload, err := json.Marshal(SyntheticProbePayload{
|
|
ProbeID: probeID,
|
|
SentAt: r.now(),
|
|
})
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
envelope := SyntheticEnvelope{
|
|
ProtocolVersion: ProtocolVersion,
|
|
RouteID: route.RouteID,
|
|
ClusterID: route.ClusterID,
|
|
From: r.local,
|
|
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: path[1]},
|
|
Channel: channel,
|
|
MessageType: messageType,
|
|
TTL: routeTTL(route, r.maxTTL),
|
|
HopCount: 1,
|
|
Visited: []string{r.local.NodeID},
|
|
Sequence: r.nextSequence(),
|
|
SentAt: r.now(),
|
|
Payload: payload,
|
|
}
|
|
r.increment(func(m *SyntheticMetrics) {
|
|
if messageType == SyntheticMessageRouteHealth {
|
|
m.RouteHealthProbesSent++
|
|
} else {
|
|
m.ProbesSent++
|
|
}
|
|
})
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_probe_sent",
|
|
RouteID: envelope.RouteID,
|
|
ClusterID: envelope.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
NextNodeID: envelope.To.NodeID,
|
|
Channel: envelope.Channel,
|
|
MessageType: envelope.MessageType,
|
|
TTL: envelope.TTL,
|
|
HopCount: envelope.HopCount,
|
|
OccurredAt: r.now(),
|
|
})
|
|
return r.send(ctx, envelope.To.NodeID, envelope)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) sendTestServiceMessage(ctx context.Context, routeID string, channel string, request SyntheticTestServiceRequest) (SyntheticEnvelope, error) {
|
|
if err := r.ensureEnabled(); err != nil {
|
|
r.reject(routeID, channel, SyntheticMessageTestService, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if err := r.validateTestServiceRequest(request); err != nil {
|
|
r.reject(routeID, channel, SyntheticMessageTestService, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
route, path, err := r.route(routeID)
|
|
if err != nil {
|
|
r.reject(routeID, channel, SyntheticMessageTestService, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if err := r.validateRouteForSend(route, path, channel); err != nil {
|
|
r.reject(routeID, channel, SyntheticMessageTestService, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if len(path) < 2 {
|
|
r.reject(routeID, channel, SyntheticMessageTestService, ErrInvalidRoutePath)
|
|
return SyntheticEnvelope{}, ErrInvalidRoutePath
|
|
}
|
|
if request.SentAt.IsZero() {
|
|
request.SentAt = r.now()
|
|
}
|
|
payload, err := json.Marshal(request)
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
envelope := SyntheticEnvelope{
|
|
ProtocolVersion: ProtocolVersion,
|
|
RouteID: route.RouteID,
|
|
ClusterID: route.ClusterID,
|
|
From: r.local,
|
|
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: path[1]},
|
|
Channel: channel,
|
|
MessageType: SyntheticMessageTestService,
|
|
TTL: routeTTL(route, r.maxTTL),
|
|
HopCount: 1,
|
|
Visited: []string{r.local.NodeID},
|
|
Sequence: r.nextSequence(),
|
|
SentAt: r.now(),
|
|
Payload: payload,
|
|
}
|
|
r.increment(func(m *SyntheticMetrics) {
|
|
m.TestServiceRequestsSent++
|
|
})
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_test_service_sent",
|
|
RouteID: envelope.RouteID,
|
|
ClusterID: envelope.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
NextNodeID: envelope.To.NodeID,
|
|
Channel: envelope.Channel,
|
|
MessageType: envelope.MessageType,
|
|
TTL: envelope.TTL,
|
|
HopCount: envelope.HopCount,
|
|
OccurredAt: r.now(),
|
|
})
|
|
return r.send(ctx, envelope.To.NodeID, envelope)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) Receive(ctx context.Context, envelope SyntheticEnvelope) (SyntheticEnvelope, error) {
|
|
if err := r.ensureEnabled(); err != nil {
|
|
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
route, path, err := r.validateInbound(envelope)
|
|
if err != nil {
|
|
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if envelope.MessageType == SyntheticMessageTestService {
|
|
request, err := decodeTestServiceRequest(envelope.Payload)
|
|
if err != nil {
|
|
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if err := r.validateTestServiceRequest(request); err != nil {
|
|
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, err)
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
}
|
|
r.increment(func(m *SyntheticMetrics) { m.ProbesReceived++ })
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_probe_received",
|
|
RouteID: envelope.RouteID,
|
|
ClusterID: envelope.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: envelope.Channel,
|
|
MessageType: envelope.MessageType,
|
|
TTL: envelope.TTL,
|
|
HopCount: envelope.HopCount,
|
|
OccurredAt: r.now(),
|
|
})
|
|
visited := append(append([]string{}, envelope.Visited...), r.local.NodeID)
|
|
if r.local.NodeID == route.DestinationNodeID {
|
|
return r.ack(route, envelope, visited)
|
|
}
|
|
if envelope.TTL <= 1 {
|
|
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, ErrTTLExhausted)
|
|
return SyntheticEnvelope{}, ErrTTLExhausted
|
|
}
|
|
index := indexOf(path, r.local.NodeID)
|
|
if index < 0 || index+1 >= len(path) {
|
|
r.reject(envelope.RouteID, envelope.Channel, envelope.MessageType, ErrInvalidRoutePath)
|
|
return SyntheticEnvelope{}, ErrInvalidRoutePath
|
|
}
|
|
nextNodeID := path[index+1]
|
|
forward := envelope
|
|
forward.From = r.local
|
|
forward.To = PeerIdentity{ClusterID: route.ClusterID, NodeID: nextNodeID}
|
|
forward.TTL = envelope.TTL - 1
|
|
forward.HopCount = envelope.HopCount + 1
|
|
forward.Visited = visited
|
|
forward.SentAt = r.now()
|
|
r.increment(func(m *SyntheticMetrics) { m.ProbesForwarded++ })
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_probe_forwarded",
|
|
RouteID: forward.RouteID,
|
|
ClusterID: forward.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
NextNodeID: nextNodeID,
|
|
Channel: forward.Channel,
|
|
MessageType: forward.MessageType,
|
|
TTL: forward.TTL,
|
|
HopCount: forward.HopCount,
|
|
OccurredAt: r.now(),
|
|
})
|
|
return r.send(ctx, nextNodeID, forward)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) SnapshotMetrics() SyntheticMetrics {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
return r.metrics
|
|
}
|
|
|
|
func (r *SyntheticRuntime) ensureEnabled() error {
|
|
if r == nil || !r.enabled {
|
|
return ErrMeshRuntimeDisabled
|
|
}
|
|
if r.local.ClusterID == "" || r.local.NodeID == "" {
|
|
return ErrNodeMismatch
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *SyntheticRuntime) route(routeID string) (SyntheticRoute, []string, error) {
|
|
return r.routeFrom(routeID, r.routes)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) routeForMessage(routeID string, messageType string) (SyntheticRoute, []string, error) {
|
|
if messageType == SyntheticMessageRouteHealth {
|
|
return r.routeFrom(routeID, r.routeHealthRoutes)
|
|
}
|
|
return r.route(routeID)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) routeFrom(routeID string, routes map[string]SyntheticRoute) (SyntheticRoute, []string, error) {
|
|
if routeID == "" {
|
|
return SyntheticRoute{}, nil, ErrRouteIDRequired
|
|
}
|
|
r.mu.Lock()
|
|
route, ok := routes[routeID]
|
|
r.mu.Unlock()
|
|
if !ok {
|
|
return SyntheticRoute{}, nil, ErrRouteNotFound
|
|
}
|
|
path := routePath(route)
|
|
if len(path) < 2 || path[0] != route.SourceNodeID || path[len(path)-1] != route.DestinationNodeID {
|
|
return SyntheticRoute{}, nil, ErrInvalidRoutePath
|
|
}
|
|
if route.ClusterID == "" || route.ClusterID != r.local.ClusterID {
|
|
return SyntheticRoute{}, nil, ErrClusterMismatch
|
|
}
|
|
if !route.ExpiresAt.IsZero() && !r.now().Before(route.ExpiresAt) {
|
|
return SyntheticRoute{}, nil, ErrRouteExpired
|
|
}
|
|
return route, path, nil
|
|
}
|
|
|
|
func (r *SyntheticRuntime) validateRouteForSend(route SyntheticRoute, path []string, channel string) error {
|
|
if route.SourceNodeID != r.local.NodeID {
|
|
return ErrNodeMismatch
|
|
}
|
|
if routeTTL(route, r.maxTTL) <= 0 {
|
|
return ErrTTLExhausted
|
|
}
|
|
if routeMaxHops(route, r.maxHops) < len(path)-1 {
|
|
return ErrInvalidRoutePath
|
|
}
|
|
return r.validateChannel(route, channel)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) validateInbound(envelope SyntheticEnvelope) (SyntheticRoute, []string, error) {
|
|
if envelope.ProtocolVersion != ProtocolVersion {
|
|
return SyntheticRoute{}, nil, ErrUnsupportedSyntheticMessage
|
|
}
|
|
if envelope.MessageType != SyntheticMessageProbe && envelope.MessageType != SyntheticMessageRouteHealth && envelope.MessageType != SyntheticMessageTestService {
|
|
return SyntheticRoute{}, nil, ErrUnsupportedSyntheticMessage
|
|
}
|
|
if envelope.ClusterID == "" || envelope.ClusterID != r.local.ClusterID {
|
|
return SyntheticRoute{}, nil, ErrClusterMismatch
|
|
}
|
|
if envelope.To.ClusterID != r.local.ClusterID || envelope.To.NodeID != r.local.NodeID {
|
|
return SyntheticRoute{}, nil, ErrNodeMismatch
|
|
}
|
|
if envelope.TTL <= 0 {
|
|
return SyntheticRoute{}, nil, ErrTTLExhausted
|
|
}
|
|
route, path, err := r.routeForMessage(envelope.RouteID, envelope.MessageType)
|
|
if err != nil {
|
|
return SyntheticRoute{}, nil, err
|
|
}
|
|
if envelope.ClusterID != route.ClusterID {
|
|
return SyntheticRoute{}, nil, ErrClusterMismatch
|
|
}
|
|
if envelope.HopCount <= 0 || envelope.HopCount > routeMaxHops(route, r.maxHops) {
|
|
return SyntheticRoute{}, nil, ErrInvalidRoutePath
|
|
}
|
|
index := indexOf(path, r.local.NodeID)
|
|
if index < 0 {
|
|
return SyntheticRoute{}, nil, ErrNodeMismatch
|
|
}
|
|
if envelope.HopCount != index {
|
|
return SyntheticRoute{}, nil, ErrInvalidRoutePath
|
|
}
|
|
if contains(envelope.Visited, r.local.NodeID) {
|
|
return SyntheticRoute{}, nil, ErrLoopDetected
|
|
}
|
|
if index > 0 && len(envelope.Visited) > 0 && envelope.Visited[len(envelope.Visited)-1] != path[index-1] {
|
|
return SyntheticRoute{}, nil, ErrInvalidRoutePath
|
|
}
|
|
if err := r.validateChannel(route, envelope.Channel); err != nil {
|
|
return SyntheticRoute{}, nil, err
|
|
}
|
|
return route, path, nil
|
|
}
|
|
|
|
func (r *SyntheticRuntime) validateChannel(route SyntheticRoute, channel string) error {
|
|
if channel == "" {
|
|
return ErrUnauthorizedChannel
|
|
}
|
|
if _, ok := r.allowedChannels[channel]; !ok {
|
|
return ErrUnauthorizedChannel
|
|
}
|
|
if len(route.AllowedChannels) == 0 {
|
|
return nil
|
|
}
|
|
for _, allowed := range route.AllowedChannels {
|
|
if allowed == channel {
|
|
return nil
|
|
}
|
|
}
|
|
return ErrUnauthorizedChannel
|
|
}
|
|
|
|
func (r *SyntheticRuntime) ack(route SyntheticRoute, envelope SyntheticEnvelope, visited []string) (SyntheticEnvelope, error) {
|
|
if envelope.MessageType == SyntheticMessageTestService {
|
|
return r.ackTestService(route, envelope, visited)
|
|
}
|
|
var probe SyntheticProbePayload
|
|
_ = json.Unmarshal(envelope.Payload, &probe)
|
|
payload, err := json.Marshal(SyntheticProbeAckPayload{
|
|
ProbeID: probe.ProbeID,
|
|
Path: visited,
|
|
AcceptedAt: r.now(),
|
|
})
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
ack := SyntheticEnvelope{
|
|
ProtocolVersion: ProtocolVersion,
|
|
RouteID: route.RouteID,
|
|
ClusterID: route.ClusterID,
|
|
From: r.local,
|
|
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: route.SourceNodeID},
|
|
Channel: envelope.Channel,
|
|
MessageType: syntheticAckMessageType(envelope.MessageType),
|
|
TTL: envelope.TTL,
|
|
HopCount: envelope.HopCount,
|
|
Visited: visited,
|
|
Sequence: envelope.Sequence,
|
|
SentAt: r.now(),
|
|
Payload: payload,
|
|
}
|
|
r.increment(func(m *SyntheticMetrics) { m.ProbeAcksCreated++ })
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_probe_ack_created",
|
|
RouteID: ack.RouteID,
|
|
ClusterID: ack.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: ack.Channel,
|
|
MessageType: ack.MessageType,
|
|
TTL: ack.TTL,
|
|
HopCount: ack.HopCount,
|
|
OccurredAt: r.now(),
|
|
})
|
|
return ack, nil
|
|
}
|
|
|
|
func (r *SyntheticRuntime) ackTestService(route SyntheticRoute, envelope SyntheticEnvelope, visited []string) (SyntheticEnvelope, error) {
|
|
request, err := decodeTestServiceRequest(envelope.Payload)
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
if err := r.validateTestServiceRequest(request); err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
response := SyntheticTestServiceResponse{
|
|
RequestID: request.RequestID,
|
|
OrganizationID: request.OrganizationID,
|
|
ServiceType: request.ServiceType,
|
|
EchoPayload: request.Payload,
|
|
Path: visited,
|
|
AcceptedAt: r.now(),
|
|
}
|
|
payload, err := json.Marshal(response)
|
|
if err != nil {
|
|
return SyntheticEnvelope{}, err
|
|
}
|
|
ack := SyntheticEnvelope{
|
|
ProtocolVersion: ProtocolVersion,
|
|
RouteID: route.RouteID,
|
|
ClusterID: route.ClusterID,
|
|
From: r.local,
|
|
To: PeerIdentity{ClusterID: route.ClusterID, NodeID: route.SourceNodeID},
|
|
Channel: envelope.Channel,
|
|
MessageType: SyntheticMessageTestServiceAck,
|
|
TTL: envelope.TTL,
|
|
HopCount: envelope.HopCount,
|
|
Visited: visited,
|
|
Sequence: envelope.Sequence,
|
|
SentAt: r.now(),
|
|
Payload: payload,
|
|
}
|
|
r.increment(func(m *SyntheticMetrics) { m.ProbeAcksCreated++ })
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_test_service_ack_created",
|
|
RouteID: ack.RouteID,
|
|
ClusterID: ack.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: ack.Channel,
|
|
MessageType: ack.MessageType,
|
|
TTL: ack.TTL,
|
|
HopCount: ack.HopCount,
|
|
OccurredAt: r.now(),
|
|
})
|
|
return ack, nil
|
|
}
|
|
|
|
func (r *SyntheticRuntime) recordRouteSuccess(routeID string, startedAt time.Time) SyntheticRouteObservation {
|
|
return r.recordRouteSuccessForMessage(routeID, startedAt, SyntheticMessageRouteHealth)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) recordRouteSuccessForMessage(routeID string, startedAt time.Time, messageType string) SyntheticRouteObservation {
|
|
route, _, _ := r.routeForMessage(routeID, messageType)
|
|
now := r.now()
|
|
observation := SyntheticRouteObservation{}
|
|
r.mu.Lock()
|
|
observation = r.routeHealth[routeID]
|
|
observation.RouteID = routeID
|
|
observation.State = SyntheticRouteStateHealthy
|
|
observation.LastSuccessAt = now
|
|
observation.LastFailureReason = ""
|
|
observation.SuccessCount++
|
|
observation.LastLatencyMs = now.Sub(startedAt).Milliseconds()
|
|
observation.RouteVersion = route.RouteVersion
|
|
observation.PolicyVersion = route.PolicyVersion
|
|
observation.PeerDirectoryVersion = route.PeerDirectoryVersion
|
|
r.routeHealth[routeID] = observation
|
|
r.metrics.RouteDeliveriesSucceeded++
|
|
r.mu.Unlock()
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_route_delivery_succeeded",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
MessageType: messageType,
|
|
OccurredAt: now,
|
|
})
|
|
return observation
|
|
}
|
|
|
|
func (r *SyntheticRuntime) recordRouteFailure(routeID string, startedAt time.Time, err error) SyntheticRouteObservation {
|
|
return r.recordRouteFailureForMessage(routeID, startedAt, err, SyntheticMessageRouteHealth)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) recordRouteFailureForMessage(routeID string, startedAt time.Time, err error, messageType string) SyntheticRouteObservation {
|
|
route, _, _ := r.routeForMessage(routeID, messageType)
|
|
now := r.now()
|
|
reason := ""
|
|
if err != nil {
|
|
reason = err.Error()
|
|
}
|
|
observation := SyntheticRouteObservation{}
|
|
r.mu.Lock()
|
|
observation = r.routeHealth[routeID]
|
|
observation.RouteID = routeID
|
|
observation.State = SyntheticRouteStateFailed
|
|
observation.LastFailureAt = now
|
|
observation.LastFailureReason = reason
|
|
observation.FailureCount++
|
|
observation.LastLatencyMs = now.Sub(startedAt).Milliseconds()
|
|
observation.RouteVersion = route.RouteVersion
|
|
observation.PolicyVersion = route.PolicyVersion
|
|
observation.PeerDirectoryVersion = route.PeerDirectoryVersion
|
|
r.routeHealth[routeID] = observation
|
|
r.metrics.RouteDeliveriesFailed++
|
|
r.mu.Unlock()
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_route_marked_failed",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
MessageType: messageType,
|
|
Reason: reason,
|
|
OccurredAt: now,
|
|
})
|
|
return observation
|
|
}
|
|
|
|
func (r *SyntheticRuntime) recordTestServiceSuccess(routeID string, startedAt time.Time) SyntheticRouteObservation {
|
|
observation := r.recordRouteSuccessForMessage(routeID, startedAt, SyntheticMessageTestService)
|
|
r.increment(func(m *SyntheticMetrics) {
|
|
m.TestServiceDeliveriesSucceeded++
|
|
})
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_test_service_delivery_succeeded",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
MessageType: SyntheticMessageTestService,
|
|
OccurredAt: r.now(),
|
|
})
|
|
return observation
|
|
}
|
|
|
|
func (r *SyntheticRuntime) recordTestServiceFailure(routeID string, startedAt time.Time, err error) SyntheticRouteObservation {
|
|
observation := r.recordRouteFailureForMessage(routeID, startedAt, err, SyntheticMessageTestService)
|
|
r.increment(func(m *SyntheticMetrics) {
|
|
m.TestServiceDeliveriesFailed++
|
|
})
|
|
return observation
|
|
}
|
|
|
|
func (r *SyntheticRuntime) send(ctx context.Context, nextNodeID string, envelope SyntheticEnvelope) (SyntheticEnvelope, error) {
|
|
r.mu.Lock()
|
|
transport := r.transport
|
|
r.mu.Unlock()
|
|
if transport == nil {
|
|
return SyntheticEnvelope{}, ErrSyntheticPeerUnavailable
|
|
}
|
|
return transport.SendSynthetic(ctx, nextNodeID, envelope)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) nextSequence() uint64 {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.seq++
|
|
return r.seq
|
|
}
|
|
|
|
func (r *SyntheticRuntime) increment(fn func(*SyntheticMetrics)) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
fn(&r.metrics)
|
|
}
|
|
|
|
func (r *SyntheticRuntime) reject(routeID string, channel string, messageType string, err error) {
|
|
reason := ""
|
|
if err != nil {
|
|
reason = err.Error()
|
|
}
|
|
r.increment(func(m *SyntheticMetrics) {
|
|
m.Rejected++
|
|
m.LastRejectReason = reason
|
|
})
|
|
r.log(SyntheticLogEntry{
|
|
Event: "fabric_probe_rejected",
|
|
RouteID: routeID,
|
|
ClusterID: r.local.ClusterID,
|
|
LocalNodeID: r.local.NodeID,
|
|
Channel: channel,
|
|
MessageType: messageType,
|
|
Reason: reason,
|
|
OccurredAt: r.now(),
|
|
})
|
|
}
|
|
|
|
func (r *SyntheticRuntime) log(entry SyntheticLogEntry) {
|
|
if r.logger != nil {
|
|
r.logger(entry)
|
|
}
|
|
}
|
|
|
|
func routePath(route SyntheticRoute) []string {
|
|
if len(route.Hops) > 0 {
|
|
return append([]string{}, route.Hops...)
|
|
}
|
|
return []string{route.SourceNodeID, route.DestinationNodeID}
|
|
}
|
|
|
|
func routeTTL(route SyntheticRoute, fallback int) int {
|
|
if route.MaxTTL > 0 {
|
|
return route.MaxTTL
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func routeMaxHops(route SyntheticRoute, fallback int) int {
|
|
if route.MaxHops > 0 {
|
|
return route.MaxHops
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func indexOf(values []string, needle string) int {
|
|
for i, value := range values {
|
|
if value == needle {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func contains(values []string, needle string) bool {
|
|
return indexOf(values, needle) >= 0
|
|
}
|
|
|
|
func syntheticAckMessageType(messageType string) string {
|
|
if messageType == SyntheticMessageTestService {
|
|
return SyntheticMessageTestServiceAck
|
|
}
|
|
if messageType == SyntheticMessageRouteHealth {
|
|
return SyntheticMessageRouteHealthAck
|
|
}
|
|
return SyntheticMessageProbeAck
|
|
}
|
|
|
|
func (r *SyntheticRuntime) validateTestServiceRequest(request SyntheticTestServiceRequest) error {
|
|
if request.RequestID == "" {
|
|
return ErrSyntheticRequestInvalid
|
|
}
|
|
if request.OrganizationID != r.testOrganizationID {
|
|
return ErrSyntheticOrganizationMismatch
|
|
}
|
|
if request.ServiceType != SyntheticTestServiceType {
|
|
return ErrUnsupportedSyntheticService
|
|
}
|
|
if len([]byte(request.Payload)) > r.maxTestPayloadBytes {
|
|
return ErrSyntheticPayloadTooLarge
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func decodeTestServiceRequest(payload json.RawMessage) (SyntheticTestServiceRequest, error) {
|
|
var request SyntheticTestServiceRequest
|
|
if len(payload) == 0 {
|
|
return request, ErrSyntheticRequestInvalid
|
|
}
|
|
if err := json.Unmarshal(payload, &request); err != nil {
|
|
return request, err
|
|
}
|
|
return request, nil
|
|
}
|
|
|
|
func decodeTestServiceResponse(payload json.RawMessage) (SyntheticTestServiceResponse, error) {
|
|
var response SyntheticTestServiceResponse
|
|
if len(payload) == 0 {
|
|
return response, ErrSyntheticRequestInvalid
|
|
}
|
|
if err := json.Unmarshal(payload, &response); err != nil {
|
|
return response, err
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
func shouldInvalidateObservation(observation SyntheticRouteObservation, version SyntheticRouteCacheVersion) bool {
|
|
if version.RouteVersion != "" && observation.RouteVersion != version.RouteVersion {
|
|
return true
|
|
}
|
|
if version.PolicyVersion != "" && observation.PolicyVersion != version.PolicyVersion {
|
|
return true
|
|
}
|
|
if version.PeerDirectoryVersion != "" && observation.PeerDirectoryVersion != version.PeerDirectoryVersion {
|
|
return true
|
|
}
|
|
return false
|
|
}
|