Files
m 20d361a886
build / backend (push) Has been cancelled
build / node-agent (push) Has been cancelled
build / worker (push) Has been cancelled
рабочий вариант, но скороть 10 МБит
2026-05-22 21:46:49 +03:00

2007 lines
72 KiB
Go

package main
import (
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"log"
"math/big"
"os"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto"
"github.com/example/remote-access-platform/agents/rap-node-agent/internal/mesh"
"github.com/quic-go/quic-go"
)
const (
maxStreamResultSamples = 25
maxMetricSamples = 10000
)
type loadtestConfig struct {
Mode string `json:"mode"`
ListenAddr string `json:"listen_addr"`
Targets []string `json:"targets"`
TopologyProfile string `json:"topology_profile,omitempty"`
Soak bool `json:"soak"`
Nodes int `json:"nodes"`
Streams int `json:"streams"`
Concurrency int `json:"concurrency"`
BytesPerStream int64 `json:"bytes_per_stream"`
ControlEvery int `json:"control_every"`
ControlBytes int64 `json:"control_bytes_per_stream"`
PayloadSize int `json:"payload_size"`
ShortSessions bool `json:"short_sessions"`
PoolFailover bool `json:"pool_failover"`
FailTarget int `json:"fail_target"`
FailAfter time.Duration `json:"fail_after"`
ImpairTarget int `json:"impair_target"`
ProbeTargets bool `json:"probe_targets"`
MaxTargetRTTMs int64 `json:"max_target_rtt_ms"`
MigrateSlow bool `json:"migrate_slow_streams"`
MaxAckMs int64 `json:"max_ack_ms"`
MaxAckP95Ms int64 `json:"max_ack_p95_ms"`
MaxAckP99Ms int64 `json:"max_ack_p99_ms"`
MaxTargetAckMs int64 `json:"max_target_ack_ms"`
MaxControlP95 int64 `json:"max_control_ack_p95_ms"`
MaxSetupP95Ms int64 `json:"max_setup_p95_ms"`
MaxSetupP99Ms int64 `json:"max_setup_p99_ms"`
MaxRerouteP95Ms int64 `json:"max_reroute_p95_ms"`
MaxRerouteP99Ms int64 `json:"max_reroute_p99_ms"`
MaxGoroutineDelta int `json:"max_goroutine_delta"`
MaxHeapDeltaMB int64 `json:"max_heap_delta_mb"`
MaxOpenFDDelta int `json:"max_open_fd_delta"`
MaxOpenFDs int `json:"max_open_fds"`
MinThroughputMbps int64 `json:"min_throughput_mbps"`
MinChannelChurn int64 `json:"min_channel_churn_per_sec"`
StreamTimeout time.Duration `json:"stream_timeout"`
AckTimeout time.Duration `json:"ack_timeout"`
TargetQuarantine time.Duration `json:"target_quarantine_ttl"`
FailureQuarantine time.Duration `json:"failure_quarantine_ttl"`
Duration time.Duration `json:"duration"`
Timeout time.Duration `json:"timeout"`
ResourceSample time.Duration `json:"resource_sample_interval"`
ReportPath string `json:"report_path,omitempty"`
}
type streamResult struct {
StreamIndex int `json:"stream_index"`
InitialTarget string `json:"initial_target"`
Target string `json:"target"`
RouteID string `json:"route_id,omitempty"`
RouteMode string `json:"route_mode,omitempty"`
TargetAttempts []string `json:"target_attempts,omitempty"`
RouteAttempts []string `json:"route_attempts,omitempty"`
FailoverCount int `json:"failover_count"`
MigrationCount int `json:"migration_count"`
BytesSent int64 `json:"bytes_sent"`
FramesSent int64 `json:"frames_sent"`
AcksReceived int64 `json:"acks_received"`
AbandonedFrames int64 `json:"abandoned_frames,omitempty"`
AckIntegrityErrors int `json:"ack_integrity_errors,omitempty"`
MaxAckMs int64 `json:"max_ack_ms"`
SetupMs int64 `json:"setup_ms"`
RerouteLatencyMs int64 `json:"reroute_latency_ms,omitempty"`
DurationMs int64 `json:"duration_ms"`
Error string `json:"error,omitempty"`
Degraded bool `json:"degraded,omitempty"`
ShortSession bool `json:"short_session"`
TrafficClass string `json:"traffic_class"`
LogicalStream uint64 `json:"logical_stream"`
}
type loadtestReport struct {
SchemaVersion string `json:"schema_version"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
Config loadtestConfig `json:"config"`
TotalStreams int `json:"total_streams"`
SuccessfulStreams int `json:"successful_streams"`
FailedStreams int `json:"failed_streams"`
BytesSent int64 `json:"bytes_sent"`
FramesSent int64 `json:"frames_sent"`
AcksReceived int64 `json:"acks_received"`
AbandonedFrames int64 `json:"abandoned_frames,omitempty"`
AckMismatchedStreams int `json:"ack_mismatched_streams,omitempty"`
AckIntegrityErrors int `json:"ack_integrity_errors,omitempty"`
FailoverEvents int `json:"failover_events"`
MigrationEvents int `json:"migration_events"`
ChannelOpens uint64 `json:"channel_opens"`
ChannelCloses uint64 `json:"channel_closes"`
ChannelLeaks int `json:"channel_leaks"`
ChannelChurnPerSec int64 `json:"channel_churn_per_sec"`
ThroughputBps int64 `json:"throughput_bps"`
SetupLatencyP50Ms int64 `json:"setup_latency_p50_ms"`
SetupLatencyP95Ms int64 `json:"setup_latency_p95_ms"`
SetupLatencyP99Ms int64 `json:"setup_latency_p99_ms"`
ChannelOpenP50Ms int64 `json:"channel_open_p50_ms"`
ChannelOpenP95Ms int64 `json:"channel_open_p95_ms"`
ChannelOpenP99Ms int64 `json:"channel_open_p99_ms"`
RerouteLatencyP95Ms int64 `json:"reroute_latency_p95_ms"`
RerouteLatencyP99Ms int64 `json:"reroute_latency_p99_ms"`
StreamDurationP95Ms int64 `json:"stream_duration_p95_ms"`
AckP95Ms int64 `json:"ack_p95_ms,omitempty"`
AckP99Ms int64 `json:"ack_p99_ms,omitempty"`
ControlStreams int `json:"control_streams"`
BulkStreams int `json:"bulk_streams"`
ControlAckP95Ms int64 `json:"control_ack_p95_ms,omitempty"`
BulkAckP95Ms int64 `json:"bulk_ack_p95_ms,omitempty"`
RouteAttemptsTotal int64 `json:"route_attempts_total"`
RerouteCauses map[string]int `json:"reroute_causes,omitempty"`
Errors map[string]int `json:"errors,omitempty"`
TargetProbes []targetProbeResult `json:"target_probes,omitempty"`
ExcludedTargets []string `json:"excluded_targets,omitempty"`
TargetBytes map[string]int64 `json:"target_bytes,omitempty"`
TargetStreams map[string]int `json:"target_streams,omitempty"`
TargetStats map[string]targetStats `json:"target_stats,omitempty"`
DegradedTargets map[string]string `json:"degraded_targets,omitempty"`
RoutePressure mesh.FabricRoutePressureSnapshot `json:"route_pressure,omitempty"`
TransportSnapshot mesh.QUICFabricTransportSnapshot `json:"transport_snapshot,omitempty"`
ResourceSamples []resourceSample `json:"resource_samples,omitempty"`
ResourceSummary resourceSummary `json:"resource_summary,omitempty"`
StreamSamples []streamResult `json:"stream_samples,omitempty"`
ErrorSamples []streamResult `json:"error_samples,omitempty"`
Verdict string `json:"verdict"`
VerdictReasons []string `json:"verdict_reasons,omitempty"`
}
type resourceSample struct {
ObservedAt time.Time `json:"observed_at"`
ElapsedMs int64 `json:"elapsed_ms"`
Goroutines int `json:"goroutines"`
HeapAllocBytes uint64 `json:"heap_alloc_bytes"`
HeapInuseBytes uint64 `json:"heap_inuse_bytes"`
HeapObjects uint64 `json:"heap_objects"`
OpenFDs int `json:"open_fds,omitempty"`
NumGC uint32 `json:"num_gc"`
ActiveStreams int `json:"active_streams"`
ActiveRoutes int `json:"active_routes"`
ActiveRouteLoad int `json:"active_route_load"`
}
type resourceSummary struct {
SampleCount int `json:"sample_count"`
GoroutinesStart int `json:"goroutines_start"`
GoroutinesEnd int `json:"goroutines_end"`
GoroutinesMax int `json:"goroutines_max"`
GoroutinesDelta int `json:"goroutines_delta"`
HeapAllocStartBytes uint64 `json:"heap_alloc_start_bytes"`
HeapAllocEndBytes uint64 `json:"heap_alloc_end_bytes"`
HeapAllocMaxBytes uint64 `json:"heap_alloc_max_bytes"`
HeapAllocDeltaBytes int64 `json:"heap_alloc_delta_bytes"`
HeapObjectsStart uint64 `json:"heap_objects_start"`
HeapObjectsEnd uint64 `json:"heap_objects_end"`
HeapObjectsMax uint64 `json:"heap_objects_max"`
HeapObjectsDelta int64 `json:"heap_objects_delta"`
OpenFDsStart int `json:"open_fds_start,omitempty"`
OpenFDsEnd int `json:"open_fds_end,omitempty"`
OpenFDsMax int `json:"open_fds_max,omitempty"`
OpenFDsDelta int `json:"open_fds_delta,omitempty"`
GCCountDelta uint32 `json:"gc_count_delta"`
ActiveStreamsMax int `json:"active_streams_max"`
ActiveRouteLoadMax int `json:"active_route_load_max"`
}
type targetProbeResult struct {
Target string `json:"target"`
RTTMs int64 `json:"rtt_ms"`
Error string `json:"error,omitempty"`
Usable bool `json:"usable"`
}
type targetStats struct {
Streams int `json:"streams"`
BytesSent int64 `json:"bytes_sent"`
FramesSent int64 `json:"frames_sent"`
AcksReceived int64 `json:"acks_received"`
MaxAckMs int64 `json:"max_ack_ms"`
SetupLatencyP50Ms int64 `json:"setup_latency_p50_ms"`
SetupLatencyP95Ms int64 `json:"setup_latency_p95_ms"`
DurationP50Ms int64 `json:"duration_p50_ms"`
DurationP95Ms int64 `json:"duration_p95_ms"`
FailoverEntrypoint int `json:"failover_entrypoint,omitempty"`
DegradedEvents int `json:"degraded_events,omitempty"`
MaxActiveChannels int `json:"max_active_channels,omitempty"`
RouteModes map[string]int `json:"route_modes,omitempty"`
}
type streamResultCollector struct {
mu sync.Mutex
total int
successful int
failed int
errors map[string]int
rerouteCauses map[string]int
targetBytes map[string]int64
targetStreams map[string]int
ackMismatchedStreams int
ackIntegrityErrors int
abandonedFrames int64
targetFrames map[string]int64
targetAcks map[string]int64
targetMaxAck map[string]int64
targetFailoverEntrypoint map[string]int
targetDegradedEvents map[string]int
targetRouteModes map[string]map[string]int
setup []int64
setupCount int
rerouteLatency []int64
rerouteLatencyCount int
durations []int64
durationCount int
controlAck []int64
controlAckCount int
bulkAck []int64
bulkAckCount int
allAck []int64
allAckCount int
targetSetup map[string][]int64
targetSetupCount map[string]int
targetDurations map[string][]int64
targetDurationCount map[string]int
controlStreams int
bulkStreams int
routeAttempts int64
streamSamples []streamResult
errorSamples []streamResult
}
type targetHealthTracker struct {
mu sync.Mutex
degraded map[string]string
degradedUntil map[string]time.Time
observed map[string]string
rttMs map[string]int64
}
func main() {
cfg := parseFlags()
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel()
switch cfg.Mode {
case "server":
if err := runServer(ctx, cfg); err != nil {
log.Fatal(err)
}
case "client":
report, err := runClient(ctx, cfg)
if err != nil {
log.Fatal(err)
}
writeReport(report)
case "all":
report, err := runAll(ctx, cfg)
if err != nil {
log.Fatal(err)
}
writeReport(report)
default:
log.Fatalf("unsupported mode %q", cfg.Mode)
}
}
func parseFlags() loadtestConfig {
var targetList string
cfg := loadtestConfig{}
flag.StringVar(&cfg.Mode, "mode", "all", "server, client, or all")
flag.StringVar(&cfg.ListenAddr, "listen", "127.0.0.1:0", "QUIC fabric listen address for server/all mode")
flag.StringVar(&targetList, "targets", "", "comma-separated quic://host:port targets for client mode")
flag.StringVar(&cfg.TopologyProfile, "topology-profile", "", "optional scenario label: public, nat-lan-relay, mixed-public-nat-lan-relay, or custom")
flag.BoolVar(&cfg.Soak, "soak", false, "keep generating logical streams until duration expires")
flag.IntVar(&cfg.Nodes, "nodes", 1, "number of in-process target nodes for all mode")
flag.IntVar(&cfg.Streams, "streams", 1000, "logical streams to create")
flag.IntVar(&cfg.Concurrency, "concurrency", 64, "maximum concurrently active QUIC fabric streams")
flag.Int64Var(&cfg.BytesPerStream, "bytes-per-stream", 4*1024*1024, "bytes to send on each stream")
flag.IntVar(&cfg.ControlEvery, "control-every", 0, "mark every Nth stream as control traffic, 0 disables mixed control traffic")
flag.Int64Var(&cfg.ControlBytes, "control-bytes-per-stream", 4096, "bytes to send on each control stream")
flag.IntVar(&cfg.PayloadSize, "payload-size", 64*1024, "payload bytes per fabric frame")
flag.BoolVar(&cfg.ShortSessions, "short-sessions", true, "close each QUIC fabric stream after its logical channel completes")
flag.BoolVar(&cfg.PoolFailover, "pool-failover", true, "retry a failed stream on the next target in the pool")
flag.IntVar(&cfg.FailTarget, "fail-target", -1, "target index to close during all-mode load, -1 disables failure injection")
flag.DurationVar(&cfg.FailAfter, "fail-after", 0, "delay before closing fail-target in all mode")
flag.IntVar(&cfg.ImpairTarget, "impair-target", -1, "target index expected to be degraded by an external impairment, -1 disables")
flag.BoolVar(&cfg.ProbeTargets, "probe-targets", false, "probe target RTT before stream placement")
flag.Int64Var(&cfg.MaxTargetRTTMs, "max-target-rtt-ms", 0, "exclude targets whose probe RTT exceeds this threshold")
flag.BoolVar(&cfg.MigrateSlow, "migrate-slow-streams", false, "continue a logical stream on the next target when ACK latency exceeds max-ack-ms")
flag.Int64Var(&cfg.MaxAckMs, "max-ack-ms", 0, "ACK latency threshold for slow stream migration")
flag.Int64Var(&cfg.MaxAckP95Ms, "max-ack-p95-ms", 0, "fail verdict when overall ACK p95 exceeds this threshold, 0 disables")
flag.Int64Var(&cfg.MaxAckP99Ms, "max-ack-p99-ms", 0, "fail verdict when overall ACK p99 exceeds this threshold, 0 disables")
flag.Int64Var(&cfg.MaxTargetAckMs, "max-target-ack-ms", 0, "fail verdict when any healthy target max ACK exceeds this threshold, 0 disables")
flag.Int64Var(&cfg.MaxControlP95, "max-control-ack-p95-ms", 100, "fail verdict when control ACK p95 exceeds this threshold, 0 disables")
flag.Int64Var(&cfg.MaxSetupP95Ms, "max-setup-p95-ms", 200, "fail verdict when channel setup p95 exceeds this threshold, 0 disables")
flag.Int64Var(&cfg.MaxSetupP99Ms, "max-setup-p99-ms", 0, "fail verdict when channel setup p99 exceeds this threshold, 0 disables")
flag.Int64Var(&cfg.MaxRerouteP95Ms, "max-reroute-p95-ms", 0, "fail verdict when reroute setup p95 exceeds this threshold, 0 disables")
flag.Int64Var(&cfg.MaxRerouteP99Ms, "max-reroute-p99-ms", 0, "fail verdict when reroute setup p99 exceeds this threshold, 0 disables")
flag.IntVar(&cfg.MaxGoroutineDelta, "max-goroutine-delta", 0, "fail verdict when resource summary goroutine delta exceeds this value, 0 disables")
flag.Int64Var(&cfg.MaxHeapDeltaMB, "max-heap-delta-mb", 0, "fail verdict when heap alloc delta exceeds this MiB value, 0 disables")
flag.IntVar(&cfg.MaxOpenFDDelta, "max-open-fd-delta", 0, "fail verdict when open file descriptor delta exceeds this value, 0 disables")
flag.IntVar(&cfg.MaxOpenFDs, "max-open-fds", 0, "fail verdict when max open file descriptors exceeds this value, 0 disables")
flag.Int64Var(&cfg.MinThroughputMbps, "min-throughput-mbps", 0, "fail verdict when throughput falls below this Mbps value, 0 disables")
flag.Int64Var(&cfg.MinChannelChurn, "min-channel-churn-per-sec", 0, "fail verdict when logical channel open rate falls below this value, 0 disables")
flag.DurationVar(&cfg.StreamTimeout, "stream-timeout", 30*time.Second, "timeout for a single logical channel attempt, 0 disables")
flag.DurationVar(&cfg.AckTimeout, "ack-timeout", 2*time.Second, "timeout for one data-frame ACK before rerouting/failing, 0 disables")
flag.DurationVar(&cfg.TargetQuarantine, "target-quarantine-ttl", 30*time.Second, "how long a failed or slow target stays out of placement before it can be retried")
flag.DurationVar(&cfg.FailureQuarantine, "failure-quarantine-ttl", 5*time.Minute, "how long a known hard-failed target stays out of placement before it can be retried")
flag.DurationVar(&cfg.Duration, "duration", 0, "optional max client send duration")
flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Minute, "overall timeout")
flag.DurationVar(&cfg.ResourceSample, "resource-sample-interval", time.Second, "resource sampling interval for soak and stress reports")
flag.StringVar(&cfg.ReportPath, "report-path", "", "optional path to write the full JSON report")
flag.Parse()
cfg.Targets = splitCSV(targetList)
if cfg.Streams <= 0 {
cfg.Streams = 1
}
if cfg.Nodes <= 0 {
cfg.Nodes = 1
}
if cfg.Concurrency <= 0 || cfg.Concurrency > cfg.Streams {
cfg.Concurrency = cfg.Streams
}
if cfg.BytesPerStream <= 0 {
cfg.BytesPerStream = 1
}
if cfg.ControlBytes <= 0 {
cfg.ControlBytes = 1
}
if cfg.PayloadSize <= 0 {
cfg.PayloadSize = 64 * 1024
}
if cfg.PayloadSize > fabricproto.DefaultMaxPayload {
cfg.PayloadSize = fabricproto.DefaultMaxPayload
}
if cfg.Soak && cfg.Duration <= 0 {
cfg.Duration = time.Minute
}
if cfg.ResourceSample < 0 {
cfg.ResourceSample = 0
}
if cfg.TargetQuarantine < 0 {
cfg.TargetQuarantine = 0
}
if cfg.FailureQuarantine < 0 {
cfg.FailureQuarantine = cfg.TargetQuarantine
}
return cfg
}
func runServer(ctx context.Context, cfg loadtestConfig) error {
tlsConfig, fingerprint, err := selfSignedTLSConfig()
if err != nil {
return err
}
server, err := mesh.StartQUICFabricServer(ctx, mesh.QUICFabricServerConfig{
ListenAddr: cfg.ListenAddr,
TLSConfig: tlsConfig,
QUICConfig: loadtestQUICConfig(cfg),
})
if err != nil {
return err
}
defer server.Close()
log.Printf("fabric_loadtest_server addr=quic://%s tls_cert_sha256=%s", server.Addr().String(), fingerprint)
<-ctx.Done()
return ctx.Err()
}
func runAll(ctx context.Context, cfg loadtestConfig) (loadtestReport, error) {
tlsConfig, _, err := selfSignedTLSConfig()
if err != nil {
return loadtestReport{}, err
}
servers := make([]*mesh.QUICFabricServer, 0, cfg.Nodes)
for i := 0; i < cfg.Nodes; i++ {
server, err := mesh.StartQUICFabricServer(ctx, mesh.QUICFabricServerConfig{
ListenAddr: cfg.ListenAddr,
TLSConfig: tlsConfig,
QUICConfig: loadtestQUICConfig(cfg),
})
if err != nil {
for _, server := range servers {
_ = server.Close()
}
return loadtestReport{}, err
}
servers = append(servers, server)
cfg.Targets = append(cfg.Targets, "quic://"+server.Addr().String())
}
defer func() {
for _, server := range servers {
_ = server.Close()
}
}()
if cfg.FailTarget >= 0 && cfg.FailTarget < len(servers) {
go func() {
if cfg.FailAfter > 0 {
select {
case <-time.After(cfg.FailAfter):
case <-ctx.Done():
return
}
}
_ = servers[cfg.FailTarget].Close()
}()
}
return runClient(ctx, cfg)
}
func runClient(ctx context.Context, cfg loadtestConfig) (loadtestReport, error) {
if len(cfg.Targets) == 0 {
return loadtestReport{}, fmt.Errorf("at least one target is required")
}
if reasons := targetEndpointPolicyVerdictReasons(loadtestReport{Config: cfg}); len(reasons) > 0 {
return loadtestReport{}, fmt.Errorf("invalid fabric targets: %s", strings.Join(reasons, "; "))
}
started := time.Now().UTC()
transport := mesh.NewQUICFabricTransport(loadtestQUICConfig(cfg))
transport.MaxStreamsPerConn = cfg.Concurrency
defer transport.Close()
health := newTargetHealthTracker()
var probes []targetProbeResult
var excluded []string
if cfg.ProbeTargets || cfg.MaxTargetRTTMs > 0 {
probes, excluded, cfg.Targets = probeAndFilterTargets(ctx, transport, cfg)
if len(cfg.Targets) == 0 {
return loadtestReport{}, fmt.Errorf("all fabric targets failed probe")
}
health.RecordProbes(probes)
}
payload := make([]byte, cfg.PayloadSize)
for i := range payload {
payload[i] = byte(i % 251)
}
runCtx := ctx
var cancel context.CancelFunc
if cfg.Duration > 0 && !cfg.Soak {
runCtx, cancel = context.WithTimeout(ctx, cfg.Duration)
defer cancel()
}
var totalBytes atomic.Int64
var totalFrames atomic.Int64
var totalAcks atomic.Int64
var totalFailovers atomic.Int64
var totalMigrations atomic.Int64
pressure := mesh.NewFabricRoutePressureTracker()
sampler := startResourceSampler(runCtx, started, transport, pressure, cfg.ResourceSample)
results := runClientStreams(runCtx, transport, pressure, health, cfg, payload, &totalBytes, &totalFrames, &totalAcks, &totalFailovers, &totalMigrations)
resourceSamples := sampler.Stop()
finished := time.Now().UTC()
report := summarizeResults(cfg, started, finished, results)
report.TargetProbes = probes
report.ExcludedTargets = excluded
report.BytesSent = totalBytes.Load()
report.FramesSent = totalFrames.Load()
report.AcksReceived = totalAcks.Load()
report.FailoverEvents = int(totalFailovers.Load())
report.MigrationEvents = int(totalMigrations.Load())
report.TransportSnapshot = transport.Snapshot()
report.ChannelOpens = report.TransportSnapshot.Stats.StreamOpens
report.ChannelCloses = report.TransportSnapshot.Stats.StreamCloses
report.ChannelLeaks = report.TransportSnapshot.ActiveStreams
if elapsed := finished.Sub(started).Seconds(); elapsed > 0 {
report.ThroughputBps = int64(float64(report.BytesSent*8) / elapsed)
report.ChannelChurnPerSec = int64(float64(report.ChannelOpens) / elapsed)
}
report.RoutePressure = pressure.SnapshotPressure()
report.DegradedTargets = health.Snapshot()
report.ResourceSamples = resourceSamples
report.ResourceSummary = summarizeResourceSamples(resourceSamples)
applyRoutePressureToTargetStats(&report)
report.Verdict, report.VerdictReasons = verdict(report)
return report, nil
}
func runClientStreams(ctx context.Context, transport *mesh.QUICFabricTransport, pressure *mesh.FabricRoutePressureTracker, health *targetHealthTracker, cfg loadtestConfig, payload []byte, totalBytes *atomic.Int64, totalFrames *atomic.Int64, totalAcks *atomic.Int64, totalFailovers *atomic.Int64, totalMigrations *atomic.Int64) *streamResultCollector {
if cfg.Soak {
return runClientSoakStreams(ctx, transport, pressure, health, cfg, payload, totalBytes, totalFrames, totalAcks, totalFailovers, totalMigrations)
}
results := newStreamResultCollector()
var wg sync.WaitGroup
sem := make(chan struct{}, cfg.Concurrency)
for i := 0; i < cfg.Streams; i++ {
i := i
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
result := runStream(ctx, transport, pressure, health, cfg, i, payload)
results.Add(result)
addStreamTotals(result, totalBytes, totalFrames, totalAcks, totalFailovers, totalMigrations)
}()
}
wg.Wait()
return results
}
type resourceSampler struct {
done chan struct{}
stopped chan []resourceSample
}
func startResourceSampler(_ context.Context, started time.Time, transport *mesh.QUICFabricTransport, pressure *mesh.FabricRoutePressureTracker, interval time.Duration) *resourceSampler {
sampler := &resourceSampler{done: make(chan struct{}), stopped: make(chan []resourceSample, 1)}
go func() {
samples := []resourceSample{captureResourceSample(started, transport, pressure)}
if interval <= 0 {
<-sampler.done
samples = append(samples, captureResourceSample(started, transport, pressure))
sampler.stopped <- samples
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-sampler.done:
samples = append(samples, captureResourceSample(started, transport, pressure))
sampler.stopped <- samples
return
case <-ticker.C:
samples = append(samples, captureResourceSample(started, transport, pressure))
}
}
}()
return sampler
}
func (s *resourceSampler) Stop() []resourceSample {
if s == nil {
return nil
}
close(s.done)
return <-s.stopped
}
func captureResourceSample(started time.Time, transport *mesh.QUICFabricTransport, pressure *mesh.FabricRoutePressureTracker) resourceSample {
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
now := time.Now().UTC()
sample := resourceSample{
ObservedAt: now,
ElapsedMs: now.Sub(started).Milliseconds(),
Goroutines: runtime.NumGoroutine(),
HeapAllocBytes: mem.HeapAlloc,
HeapInuseBytes: mem.HeapInuse,
HeapObjects: mem.HeapObjects,
OpenFDs: countOpenFDs(),
NumGC: mem.NumGC,
}
if transport != nil {
snapshot := transport.Snapshot()
sample.ActiveStreams = snapshot.ActiveStreams
}
if pressure != nil {
snapshot := pressure.SnapshotPressure()
sample.ActiveRoutes = len(snapshot.Active)
sample.ActiveRouteLoad = snapshot.ActiveTotal
}
return sample
}
func countOpenFDs() int {
entries, err := os.ReadDir("/proc/self/fd")
if err != nil {
return -1
}
return len(entries)
}
func summarizeResourceSamples(samples []resourceSample) resourceSummary {
if len(samples) == 0 {
return resourceSummary{}
}
first := samples[0]
last := samples[len(samples)-1]
summary := resourceSummary{
SampleCount: len(samples),
GoroutinesStart: first.Goroutines,
GoroutinesEnd: last.Goroutines,
GoroutinesMax: first.Goroutines,
GoroutinesDelta: last.Goroutines - first.Goroutines,
HeapAllocStartBytes: first.HeapAllocBytes,
HeapAllocEndBytes: last.HeapAllocBytes,
HeapAllocMaxBytes: first.HeapAllocBytes,
HeapAllocDeltaBytes: int64(last.HeapAllocBytes) - int64(first.HeapAllocBytes),
HeapObjectsStart: first.HeapObjects,
HeapObjectsEnd: last.HeapObjects,
HeapObjectsMax: first.HeapObjects,
HeapObjectsDelta: int64(last.HeapObjects) - int64(first.HeapObjects),
OpenFDsStart: first.OpenFDs,
OpenFDsEnd: last.OpenFDs,
OpenFDsMax: first.OpenFDs,
OpenFDsDelta: last.OpenFDs - first.OpenFDs,
GCCountDelta: last.NumGC - first.NumGC,
ActiveStreamsMax: first.ActiveStreams,
ActiveRouteLoadMax: first.ActiveRouteLoad,
}
for _, sample := range samples[1:] {
if sample.Goroutines > summary.GoroutinesMax {
summary.GoroutinesMax = sample.Goroutines
}
if sample.HeapAllocBytes > summary.HeapAllocMaxBytes {
summary.HeapAllocMaxBytes = sample.HeapAllocBytes
}
if sample.HeapObjects > summary.HeapObjectsMax {
summary.HeapObjectsMax = sample.HeapObjects
}
if sample.OpenFDs > summary.OpenFDsMax {
summary.OpenFDsMax = sample.OpenFDs
}
if sample.ActiveStreams > summary.ActiveStreamsMax {
summary.ActiveStreamsMax = sample.ActiveStreams
}
if sample.ActiveRouteLoad > summary.ActiveRouteLoadMax {
summary.ActiveRouteLoadMax = sample.ActiveRouteLoad
}
}
return summary
}
func runClientSoakStreams(ctx context.Context, transport *mesh.QUICFabricTransport, pressure *mesh.FabricRoutePressureTracker, health *targetHealthTracker, cfg loadtestConfig, payload []byte, totalBytes *atomic.Int64, totalFrames *atomic.Int64, totalAcks *atomic.Int64, totalFailovers *atomic.Int64, totalMigrations *atomic.Int64) *streamResultCollector {
var wg sync.WaitGroup
var nextIndex atomic.Int64
results := newStreamResultCollector()
var stopAt time.Time
if cfg.Duration > 0 {
stopAt = time.Now().Add(cfg.Duration)
}
for worker := 0; worker < cfg.Concurrency; worker++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
if !stopAt.IsZero() && !time.Now().Before(stopAt) {
return
}
index := int(nextIndex.Add(1) - 1)
result := runStream(ctx, transport, pressure, health, cfg, index, payload)
if ctx.Err() != nil && result.Error != "" && strings.Contains(result.Error, "context deadline exceeded") && result.BytesSent == 0 {
return
}
addStreamTotals(result, totalBytes, totalFrames, totalAcks, totalFailovers, totalMigrations)
results.Add(result)
}
}()
}
wg.Wait()
return results
}
func addStreamTotals(result streamResult, totalBytes *atomic.Int64, totalFrames *atomic.Int64, totalAcks *atomic.Int64, totalFailovers *atomic.Int64, totalMigrations *atomic.Int64) {
totalBytes.Add(result.BytesSent)
totalFrames.Add(result.FramesSent)
totalAcks.Add(result.AcksReceived)
totalFailovers.Add(int64(result.FailoverCount))
totalMigrations.Add(int64(result.MigrationCount))
}
func runStream(ctx context.Context, transport *mesh.QUICFabricTransport, pressure *mesh.FabricRoutePressureTracker, health *targetHealthTracker, cfg loadtestConfig, index int, payload []byte) streamResult {
if cfg.StreamTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cfg.StreamTimeout)
defer cancel()
}
initialTargetIndex, spreadOffset := loadtestSpreadStart(index, len(cfg.Targets))
initialTargetIndex = loadtestPreferredTargetIndex(cfg.Targets, initialTargetIndex, spreadOffset, health, -1)
target := cfg.Targets[initialTargetIndex]
logicalStreamID := loadtestLogicalStreamID(index)
trafficClass := loadtestTrafficClass(cfg, index)
bytesPerStream := cfg.BytesPerStream
if trafficClass == fabricproto.TrafficClassControl {
bytesPerStream = cfg.ControlBytes
}
result := streamResult{
StreamIndex: index,
InitialTarget: target,
Target: target,
ShortSession: cfg.ShortSessions,
TrafficClass: loadtestTrafficClassName(trafficClass),
LogicalStream: logicalStreamID,
}
targetCount := 1
if cfg.PoolFailover {
targetCount = len(cfg.Targets)
}
var lastErr string
remaining := bytesPerStream
lastTargetIndex := -1
for attempt := 0; attempt < targetCount; attempt++ {
targetIndex := initialTargetIndex
if attempt > 0 {
targetIndex = loadtestSpreadUsableTargetIndex(cfg.Targets, spreadOffset+attempt-1, health, lastTargetIndex)
}
target = cfg.Targets[targetIndex]
lastTargetIndex = targetIndex
routeID := loadtestRouteID(targetIndex, target)
routeMode := loadtestRouteMode(cfg, targetIndex)
attemptResult := runStreamAttempt(ctx, transport, pressure, cfg, index, targetIndex, target, routeID, logicalStreamID, trafficClass, payload, remaining)
attemptResult.RouteMode = routeMode
result.TargetAttempts = append(result.TargetAttempts, target)
result.RouteAttempts = append(result.RouteAttempts, routeID)
result.BytesSent += attemptResult.BytesSent
result.FramesSent += attemptResult.FramesSent
result.AcksReceived += attemptResult.AcksReceived
result.AbandonedFrames += attemptResult.AbandonedFrames
result.AckIntegrityErrors += attemptResult.AckIntegrityErrors
result.SetupMs += attemptResult.SetupMs
result.DurationMs += attemptResult.DurationMs
if attempt > 0 && attemptResult.SetupMs > result.RerouteLatencyMs {
result.RerouteLatencyMs = attemptResult.SetupMs
}
if attemptResult.MaxAckMs > result.MaxAckMs {
result.MaxAckMs = attemptResult.MaxAckMs
}
if attemptResult.Degraded {
result.Degraded = true
if health != nil {
health.MarkDegraded(target, "slow_ack", cfg.TargetQuarantine)
}
}
if attemptResult.Error != "" && health != nil && shouldQuarantineTarget(attemptResult.Error) {
ttl := cfg.TargetQuarantine
if targetIndex == cfg.FailTarget && cfg.FailureQuarantine > ttl {
ttl = cfg.FailureQuarantine
}
health.MarkDegraded(target, attemptResult.Error, ttl)
}
remaining -= attemptResult.BytesSent
if attemptResult.Error == "slow_ack_migration" && cfg.MigrateSlow && remaining > 0 && attempt+1 < targetCount {
result.MigrationCount++
result.Error = ""
continue
}
if attemptResult.Degraded && cfg.MigrateSlow && attempt+1 < targetCount && trafficClass == fabricproto.TrafficClassControl {
remaining = bytesPerStream
result.BytesSent -= attemptResult.BytesSent
result.FramesSent -= attemptResult.FramesSent
result.AcksReceived -= attemptResult.AcksReceived
result.AbandonedFrames -= attemptResult.AbandonedFrames
result.AckIntegrityErrors -= attemptResult.AckIntegrityErrors
result.MaxAckMs = 0
result.MigrationCount++
result.Error = ""
continue
}
if attemptResult.Error == "" {
result.Target = target
result.RouteID = routeID
result.RouteMode = routeMode
result.FailoverCount = attempt
result.Error = ""
return result
}
lastErr = attemptResult.Error
result.Error = lastErr
}
result.FailoverCount = len(result.TargetAttempts) - 1
return result
}
func loadtestSpreadStart(streamIndex int, targetCount int) (int, int) {
if targetCount <= 0 {
return 0, 0
}
if streamIndex < 0 {
streamIndex = 0
}
return streamIndex % targetCount, streamIndex / targetCount
}
func loadtestLogicalStreamID(streamIndex int) uint64 {
if streamIndex < 0 {
streamIndex = 0
}
return uint64(streamIndex) + 10_000
}
func loadtestPreferredTargetIndex(targets []string, preferred int, spread int, health *targetHealthTracker, exclude int) int {
if len(targets) == 0 {
return 0
}
placementOrdinal := spread*len(targets) + preferred
if index, ok := loadtestLatencyAwareCandidateIndex(targets, placementOrdinal, health, exclude); ok {
return index
}
if preferred >= 0 && preferred < len(targets) && preferred != exclude && (health == nil || !health.IsDegraded(targets[preferred])) {
return preferred
}
return loadtestSpreadUsableTargetIndex(targets, spread, health, exclude)
}
func loadtestSpreadUsableTargetIndex(targets []string, spread int, health *targetHealthTracker, exclude int) int {
if len(targets) == 0 {
return 0
}
if index, ok := loadtestLatencyAwareCandidateIndex(targets, spread, health, exclude); ok {
return index
}
usable := loadtestUsableTargetIndexes(targets, health, exclude)
if len(usable) == 0 {
return nextUsableTargetIndex(targets, spread, health, exclude)
}
if spread < 0 {
spread = -spread
}
return usable[spread%len(usable)]
}
func loadtestLatencyAwareCandidateIndex(targets []string, spread int, health *targetHealthTracker, exclude int) (int, bool) {
usable := loadtestUsableTargetIndexes(targets, health, exclude)
if len(usable) == 0 {
return 0, false
}
if index, ok := health.latencyAwareTargetIndex(targets, usable, spread); ok {
return index, true
}
return 0, false
}
func loadtestUsableTargetIndexes(targets []string, health *targetHealthTracker, exclude int) []int {
var usable []int
for index, target := range targets {
if index == exclude {
continue
}
if health != nil && health.IsDegraded(target) {
continue
}
usable = append(usable, index)
}
return usable
}
func runStreamAttempt(ctx context.Context, transport *mesh.QUICFabricTransport, pressure *mesh.FabricRoutePressureTracker, cfg loadtestConfig, streamIndex int, targetIndex int, target string, routeID string, logicalStreamID uint64, trafficClass fabricproto.TrafficClass, payload []byte, bytesToSend int64) streamResult {
result := streamResult{
StreamIndex: streamIndex,
InitialTarget: target,
Target: target,
RouteID: routeID,
RouteMode: loadtestRouteMode(cfg, targetIndex),
ShortSession: cfg.ShortSessions,
TrafficClass: loadtestTrafficClassName(trafficClass),
LogicalStream: logicalStreamID,
}
started := time.Now()
releaseRoute := pressure.Acquire(routeID)
defer releaseRoute()
session, err := transport.Connect(ctx, mesh.FabricTransportTarget{
PeerID: fmt.Sprintf("target-%d", targetIndex),
Endpoint: target,
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"rap-fabric-data-session-v1"},
},
Timeout: 15 * time.Second,
InboundBuffer: 128,
ErrorBuffer: 8,
})
if err != nil {
result.Error = err.Error()
return result
}
defer session.Close()
result.SetupMs = time.Since(started).Milliseconds()
if err := session.Send(ctx, fabricproto.Frame{Type: fabricproto.FrameOpenStream, TrafficClass: trafficClass, StreamID: logicalStreamID}); err != nil {
result.Error = err.Error()
return result
}
defer func() {
_ = session.Send(context.Background(), fabricproto.Frame{Type: fabricproto.FrameCloseStream, TrafficClass: trafficClass, StreamID: logicalStreamID})
}()
var sent int64
var seq uint64
framePayload := make([]byte, len(payload))
for sent < bytesToSend {
select {
case <-ctx.Done():
result.Error = ctx.Err().Error()
result.DurationMs = time.Since(started).Milliseconds()
return result
default:
}
chunkSize := len(framePayload)
if remaining := bytesToSend - sent; remaining < int64(chunkSize) {
chunkSize = int(remaining)
}
seq++
chunk := framePayload[:chunkSize]
fillLoadtestPayload(chunk, streamIndex, logicalStreamID, seq, sent)
expectedAckPayload := fabricproto.DataAckPayload(chunk)
if err := session.Send(ctx, fabricproto.Frame{
Type: fabricproto.FrameData,
TrafficClass: trafficClass,
StreamID: logicalStreamID,
Sequence: seq,
Payload: chunk,
}); err != nil {
result.Error = err.Error()
break
}
result.FramesSent++
if ok, ackMs, integrityOK := waitForAck(ctx, session, cfg.AckTimeout, logicalStreamID, seq, expectedAckPayload); ok {
result.AcksReceived++
sent += int64(len(chunk))
result.BytesSent = sent
if !integrityOK {
result.AckIntegrityErrors++
result.Error = "ack payload checksum mismatch"
break
}
if ackMs > result.MaxAckMs {
result.MaxAckMs = ackMs
}
if cfg.MigrateSlow && cfg.MaxAckMs > 0 && ackMs > cfg.MaxAckMs {
result.Degraded = true
if sent < bytesToSend {
result.Error = "slow_ack_migration"
break
}
}
} else {
if result.FramesSent > result.AcksReceived {
result.AbandonedFrames += result.FramesSent - result.AcksReceived
}
result.Error = "ack timeout or session closed"
break
}
}
result.DurationMs = time.Since(started).Milliseconds()
return result
}
func loadtestTrafficClass(cfg loadtestConfig, streamIndex int) fabricproto.TrafficClass {
if cfg.ControlEvery > 0 && streamIndex%cfg.ControlEvery == 0 {
return fabricproto.TrafficClassControl
}
return fabricproto.TrafficClassBulk
}
func loadtestTrafficClassName(trafficClass fabricproto.TrafficClass) string {
switch trafficClass {
case fabricproto.TrafficClassControl:
return "control"
case fabricproto.TrafficClassInteractive:
return "interactive"
case fabricproto.TrafficClassReliable:
return "reliable"
case fabricproto.TrafficClassBulk:
return "bulk"
default:
return fmt.Sprintf("traffic_class_%d", trafficClass)
}
}
func newTargetHealthTracker() *targetHealthTracker {
return &targetHealthTracker{
degraded: map[string]string{},
degradedUntil: map[string]time.Time{},
observed: map[string]string{},
rttMs: map[string]int64{},
}
}
func (t *targetHealthTracker) RecordProbes(probes []targetProbeResult) {
if t == nil || len(probes) == 0 {
return
}
t.mu.Lock()
defer t.mu.Unlock()
if t.rttMs == nil {
t.rttMs = map[string]int64{}
}
for _, probe := range probes {
target := strings.TrimSpace(probe.Target)
if target == "" || !probe.Usable || probe.RTTMs <= 0 {
continue
}
t.rttMs[target] = probe.RTTMs
}
}
func (t *targetHealthTracker) MarkDegraded(target string, reason string, ttl time.Duration) {
target = strings.TrimSpace(target)
if t == nil || target == "" {
return
}
t.mu.Lock()
if t.degraded == nil {
t.degraded = map[string]string{}
}
if t.degradedUntil == nil {
t.degradedUntil = map[string]time.Time{}
}
if t.observed == nil {
t.observed = map[string]string{}
}
t.degraded[target] = strings.TrimSpace(reason)
t.observed[target] = strings.TrimSpace(reason)
if ttl > 0 {
t.degradedUntil[target] = time.Now().Add(ttl)
} else {
delete(t.degradedUntil, target)
}
t.mu.Unlock()
}
func (t *targetHealthTracker) IsDegraded(target string) bool {
if t == nil {
return false
}
t.mu.Lock()
defer t.mu.Unlock()
target = strings.TrimSpace(target)
if until, ok := t.degradedUntil[target]; ok && !until.IsZero() && time.Now().After(until) {
delete(t.degraded, target)
delete(t.degradedUntil, target)
return false
}
_, ok := t.degraded[target]
return ok
}
func (t *targetHealthTracker) Snapshot() map[string]string {
if t == nil {
return nil
}
t.mu.Lock()
defer t.mu.Unlock()
source := t.observed
if len(source) == 0 {
source = t.degraded
}
out := make(map[string]string, len(source))
for target, reason := range source {
out[target] = reason
}
return out
}
func (t *targetHealthTracker) latencyAwareTargetIndex(targets []string, candidates []int, spread int) (int, bool) {
if t == nil || len(candidates) == 0 {
return 0, false
}
t.mu.Lock()
defer t.mu.Unlock()
if len(t.rttMs) == 0 {
return 0, false
}
minRTT := int64(0)
maxRTT := int64(0)
rtts := make(map[int]int64, len(candidates))
for _, index := range candidates {
rtt := t.rttMs[strings.TrimSpace(targets[index])]
if rtt <= 0 {
continue
}
rtts[index] = rtt
if minRTT == 0 || rtt < minRTT {
minRTT = rtt
}
if rtt > maxRTT {
maxRTT = rtt
}
}
if minRTT <= 0 || maxRTT < minRTT*4 {
return 0, false
}
totalWeight := 0
weights := make(map[int]int, len(candidates))
for _, index := range candidates {
rtt := rtts[index]
weight := 1
if rtt > 0 {
weight = int((maxRTT + rtt - 1) / rtt)
if weight < 1 {
weight = 1
}
if weight > 32 {
weight = 32
}
}
weights[index] = weight
totalWeight += weight
}
if totalWeight <= 0 {
return 0, false
}
if spread < 0 {
spread = -spread
}
slot := spread % totalWeight
for _, index := range candidates {
weight := weights[index]
if slot < weight {
return index, true
}
slot -= weight
}
return candidates[len(candidates)-1], true
}
func shouldQuarantineTarget(reason string) bool {
reason = strings.ToLower(strings.TrimSpace(reason))
if reason == "" {
return false
}
if reason == context.DeadlineExceeded.Error() {
return false
}
return strings.Contains(reason, "timeout") ||
strings.Contains(reason, "deadline") ||
strings.Contains(reason, "connection refused") ||
strings.Contains(reason, "connection reset") ||
strings.Contains(reason, "no route") ||
strings.Contains(reason, "session closed") ||
strings.Contains(reason, "application error")
}
func nextUsableTargetIndex(targets []string, start int, health *targetHealthTracker, exclude int) int {
if len(targets) == 0 {
return 0
}
for offset := 0; offset < len(targets); offset++ {
index := (start + offset) % len(targets)
if index < 0 {
index += len(targets)
}
if index == exclude {
continue
}
if health == nil || !health.IsDegraded(targets[index]) {
return index
}
}
index := start % len(targets)
if index < 0 {
index += len(targets)
}
return index
}
func loadtestRouteID(targetIndex int, target string) string {
target = strings.TrimSpace(target)
if target == "" {
return fmt.Sprintf("target-%d", targetIndex)
}
sum := sha256.Sum256([]byte(target))
return fmt.Sprintf("target-%d-%s", targetIndex, hex.EncodeToString(sum[:4]))
}
func loadtestRouteMode(cfg loadtestConfig, targetIndex int) string {
switch strings.ToLower(strings.TrimSpace(cfg.TopologyProfile)) {
case "nat-lan-relay", "mixed-public-nat-lan-relay":
switch targetIndex % 4 {
case 0:
return string(mesh.FabricRouteLAN)
case 1:
return string(mesh.FabricRouteICE)
case 2:
return string(mesh.FabricRouteReverse)
default:
return string(mesh.FabricRouteRelay)
}
case "public":
return string(mesh.FabricRouteDirect)
default:
return string(mesh.FabricRouteDirect)
}
}
func waitForAck(ctx context.Context, session mesh.FabricTransportSession, timeout time.Duration, streamID uint64, sequence uint64, expectedPayload []byte) (bool, int64, bool) {
started := time.Now()
var timeoutC <-chan time.Time
var timer *time.Timer
if timeout > 0 {
timer = time.NewTimer(timeout)
defer timer.Stop()
timeoutC = timer.C
}
for {
select {
case frame, ok := <-session.Frames():
if !ok {
return false, 0, false
}
if frame.Type == fabricproto.FrameAck && frame.StreamID == streamID && frame.Sequence == sequence {
return true, time.Since(started).Milliseconds(), bytes.Equal(frame.Payload, expectedPayload)
}
case <-session.Errors():
return false, 0, false
case <-timeoutC:
return false, time.Since(started).Milliseconds(), false
case <-ctx.Done():
return false, 0, false
}
}
}
func fillLoadtestPayload(dst []byte, streamIndex int, logicalStreamID uint64, sequence uint64, offset int64) {
seed := uint64(streamIndex+1)*0x9e3779b185ebca87 ^ logicalStreamID*0xc2b2ae3d27d4eb4f ^ sequence*0x165667b19e3779f9 ^ uint64(offset)
for i := range dst {
x := seed + uint64(i)*0x27d4eb2f165667c5
x ^= x >> 33
x *= 0xff51afd7ed558ccd
x ^= x >> 33
x *= 0xc4ceb9fe1a85ec53
x ^= x >> 33
dst[i] = byte(x)
}
}
func probeAndFilterTargets(ctx context.Context, transport *mesh.QUICFabricTransport, cfg loadtestConfig) ([]targetProbeResult, []string, []string) {
probes := make([]targetProbeResult, 0, len(cfg.Targets))
filtered := make([]string, 0, len(cfg.Targets))
excluded := []string{}
for index, target := range cfg.Targets {
probe := probeTarget(ctx, transport, index, target)
if probe.Error == "" && (cfg.MaxTargetRTTMs <= 0 || probe.RTTMs <= cfg.MaxTargetRTTMs) {
probe.Usable = true
filtered = append(filtered, target)
} else {
excluded = append(excluded, target)
}
probes = append(probes, probe)
}
return probes, excluded, filtered
}
func probeTarget(ctx context.Context, transport *mesh.QUICFabricTransport, index int, target string) targetProbeResult {
probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
started := time.Now()
session, err := transport.Connect(probeCtx, mesh.FabricTransportTarget{
PeerID: fmt.Sprintf("target-%d", index),
Endpoint: target,
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"rap-fabric-data-session-v1"},
},
Timeout: 5 * time.Second,
InboundBuffer: 4,
ErrorBuffer: 4,
})
if err != nil {
return targetProbeResult{Target: target, Error: err.Error()}
}
defer session.Close()
if err := session.Send(probeCtx, fabricproto.Frame{Type: fabricproto.FramePing, Sequence: 1, Payload: []byte("probe")}); err != nil {
return targetProbeResult{Target: target, Error: err.Error()}
}
for {
select {
case frame, ok := <-session.Frames():
if !ok {
return targetProbeResult{Target: target, Error: "probe session closed"}
}
if frame.Type == fabricproto.FramePong && frame.Sequence == 1 {
return targetProbeResult{Target: target, RTTMs: time.Since(started).Milliseconds()}
}
case err := <-session.Errors():
if err == nil {
return targetProbeResult{Target: target, Error: "probe session error"}
}
return targetProbeResult{Target: target, Error: err.Error()}
case <-probeCtx.Done():
return targetProbeResult{Target: target, Error: probeCtx.Err().Error()}
}
}
}
func newStreamResultCollector() *streamResultCollector {
return &streamResultCollector{
errors: map[string]int{},
rerouteCauses: map[string]int{},
targetBytes: map[string]int64{},
targetStreams: map[string]int{},
targetFrames: map[string]int64{},
targetAcks: map[string]int64{},
targetMaxAck: map[string]int64{},
targetFailoverEntrypoint: map[string]int{},
targetDegradedEvents: map[string]int{},
targetRouteModes: map[string]map[string]int{},
targetSetup: map[string][]int64{},
targetSetupCount: map[string]int{},
targetDurations: map[string][]int64{},
targetDurationCount: map[string]int{},
}
}
func (c *streamResultCollector) Add(result streamResult) {
if c == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.total++
if len(c.streamSamples) < maxStreamResultSamples {
c.streamSamples = append(c.streamSamples, result)
}
if result.Error != "" && len(c.errorSamples) < maxStreamResultSamples {
c.errorSamples = append(c.errorSamples, result)
}
if result.Error == "" {
c.successful++
c.setup = recordMetric(c.setup, result.SetupMs, c.setupCount)
c.setupCount++
if result.FailoverCount > 0 || result.MigrationCount > 0 {
c.rerouteLatency = recordMetric(c.rerouteLatency, result.RerouteLatencyMs, c.rerouteLatencyCount)
c.rerouteLatencyCount++
}
c.durations = recordMetric(c.durations, result.DurationMs, c.durationCount)
c.durationCount++
c.targetSetup[result.Target] = recordMetric(c.targetSetup[result.Target], result.SetupMs, c.targetSetupCount[result.Target])
c.targetSetupCount[result.Target]++
c.targetDurations[result.Target] = recordMetric(c.targetDurations[result.Target], result.DurationMs, c.targetDurationCount[result.Target])
c.targetDurationCount[result.Target]++
switch result.TrafficClass {
case "control":
c.controlStreams++
c.controlAck = recordMetric(c.controlAck, result.MaxAckMs, c.controlAckCount)
c.controlAckCount++
case "bulk":
c.bulkStreams++
c.bulkAck = recordMetric(c.bulkAck, result.MaxAckMs, c.bulkAckCount)
c.bulkAckCount++
}
if result.MaxAckMs > 0 {
c.allAck = recordMetric(c.allAck, result.MaxAckMs, c.allAckCount)
c.allAckCount++
}
if result.FramesSent-result.AcksReceived > result.AbandonedFrames {
c.ackMismatchedStreams++
}
c.targetStreams[result.Target]++
} else {
c.failed++
c.errors[result.Error]++
}
if result.AckIntegrityErrors > 0 {
c.ackIntegrityErrors += result.AckIntegrityErrors
}
c.abandonedFrames += result.AbandonedFrames
c.routeAttempts += int64(len(result.RouteAttempts))
if result.FailoverCount > 0 {
c.rerouteCauses["pool_failover"] += result.FailoverCount
}
if result.MigrationCount > 0 {
c.rerouteCauses["slow_ack_migration"] += result.MigrationCount
}
c.targetBytes[result.Target] += result.BytesSent
if result.RouteMode != "" {
if c.targetRouteModes[result.Target] == nil {
c.targetRouteModes[result.Target] = map[string]int{}
}
c.targetRouteModes[result.Target][result.RouteMode]++
}
c.targetFrames[result.Target] += result.FramesSent
c.targetAcks[result.Target] += result.AcksReceived
if result.MaxAckMs > c.targetMaxAck[result.Target] {
c.targetMaxAck[result.Target] = result.MaxAckMs
}
if result.FailoverCount > 0 {
c.targetFailoverEntrypoint[result.InitialTarget]++
}
if result.Degraded {
c.targetDegradedEvents[result.Target]++
}
}
func recordMetric(values []int64, value int64, ordinal int) []int64 {
if len(values) < maxMetricSamples {
return append(values, value)
}
values[ordinal%maxMetricSamples] = value
return values
}
func summarizeResults(cfg loadtestConfig, started time.Time, finished time.Time, results *streamResultCollector) loadtestReport {
report := loadtestReport{
SchemaVersion: "rap.fabric_loadtest.v1",
StartedAt: started,
FinishedAt: finished,
Config: cfg,
Errors: map[string]int{},
RerouteCauses: map[string]int{},
TargetBytes: map[string]int64{},
TargetStreams: map[string]int{},
TargetStats: map[string]targetStats{},
}
if results == nil {
return report
}
results.mu.Lock()
defer results.mu.Unlock()
report.TotalStreams = results.total
report.SuccessfulStreams = results.successful
report.FailedStreams = results.failed
report.Errors = cloneStringIntMap(results.errors)
report.RerouteCauses = cloneStringIntMap(results.rerouteCauses)
report.TargetBytes = cloneStringInt64Map(results.targetBytes)
report.TargetStreams = cloneStringIntMap(results.targetStreams)
report.RouteAttemptsTotal = results.routeAttempts
report.ControlStreams = results.controlStreams
report.BulkStreams = results.bulkStreams
report.AckMismatchedStreams = results.ackMismatchedStreams
report.AckIntegrityErrors = results.ackIntegrityErrors
report.AbandonedFrames = results.abandonedFrames
report.StreamSamples = append(report.StreamSamples, results.streamSamples...)
report.ErrorSamples = append(report.ErrorSamples, results.errorSamples...)
if len(report.Errors) == 0 {
report.Errors = map[string]int{}
}
for target, streams := range report.TargetStreams {
report.TargetStats[target] = targetStats{
Streams: streams,
BytesSent: report.TargetBytes[target],
FramesSent: results.targetFrames[target],
AcksReceived: results.targetAcks[target],
MaxAckMs: results.targetMaxAck[target],
SetupLatencyP50Ms: percentile(results.targetSetup[target], 50),
SetupLatencyP95Ms: percentile(results.targetSetup[target], 95),
DurationP50Ms: percentile(results.targetDurations[target], 50),
DurationP95Ms: percentile(results.targetDurations[target], 95),
FailoverEntrypoint: results.targetFailoverEntrypoint[target],
DegradedEvents: results.targetDegradedEvents[target],
RouteModes: cloneStringIntMap(results.targetRouteModes[target]),
}
}
report.SetupLatencyP50Ms = percentile(results.setup, 50)
report.SetupLatencyP95Ms = percentile(results.setup, 95)
report.SetupLatencyP99Ms = percentile(results.setup, 99)
report.ChannelOpenP50Ms = report.SetupLatencyP50Ms
report.ChannelOpenP95Ms = report.SetupLatencyP95Ms
report.ChannelOpenP99Ms = report.SetupLatencyP99Ms
report.RerouteLatencyP95Ms = percentile(results.rerouteLatency, 95)
report.RerouteLatencyP99Ms = percentile(results.rerouteLatency, 99)
report.StreamDurationP95Ms = percentile(results.durations, 95)
report.AckP95Ms = percentile(results.allAck, 95)
report.AckP99Ms = percentile(results.allAck, 99)
report.ControlAckP95Ms = percentile(results.controlAck, 95)
report.BulkAckP95Ms = percentile(results.bulkAck, 95)
if len(report.RerouteCauses) == 0 {
report.RerouteCauses = nil
}
return report
}
func cloneStringIntMap(in map[string]int) map[string]int {
out := make(map[string]int, len(in))
for key, value := range in {
out[key] = value
}
return out
}
func cloneStringInt64Map(in map[string]int64) map[string]int64 {
out := make(map[string]int64, len(in))
for key, value := range in {
out[key] = value
}
return out
}
func applyRoutePressureToTargetStats(report *loadtestReport) {
if report == nil || len(report.TargetStats) == 0 || len(report.RoutePressure.MaxActive) == 0 {
return
}
targetMaxActive := map[string]int{}
for index, target := range report.Config.Targets {
routeID := loadtestRouteID(index, target)
if maxActive := report.RoutePressure.MaxActive[routeID]; maxActive > targetMaxActive[target] {
targetMaxActive[target] = maxActive
}
}
if len(targetMaxActive) == 0 {
return
}
for target, stats := range report.TargetStats {
stats.MaxActiveChannels = targetMaxActive[target]
report.TargetStats[target] = stats
}
}
func verdict(report loadtestReport) (string, []string) {
var reasons []string
if report.FailedStreams > 0 {
reasons = append(reasons, fmt.Sprintf("failed_streams=%d", report.FailedStreams))
}
if report.Config.MaxSetupP95Ms > 0 && report.SetupLatencyP95Ms > report.Config.MaxSetupP95Ms {
reasons = append(reasons, fmt.Sprintf("setup_p95_ms=%d > %d", report.SetupLatencyP95Ms, report.Config.MaxSetupP95Ms))
}
if report.Config.MaxSetupP99Ms > 0 && report.SetupLatencyP99Ms > report.Config.MaxSetupP99Ms {
reasons = append(reasons, fmt.Sprintf("setup_p99_ms=%d > %d", report.SetupLatencyP99Ms, report.Config.MaxSetupP99Ms))
}
if report.Config.MaxRerouteP95Ms > 0 && report.RerouteLatencyP95Ms > report.Config.MaxRerouteP95Ms {
reasons = append(reasons, fmt.Sprintf("reroute_p95_ms=%d > %d", report.RerouteLatencyP95Ms, report.Config.MaxRerouteP95Ms))
}
if report.Config.MaxRerouteP99Ms > 0 && report.RerouteLatencyP99Ms > report.Config.MaxRerouteP99Ms {
reasons = append(reasons, fmt.Sprintf("reroute_p99_ms=%d > %d", report.RerouteLatencyP99Ms, report.Config.MaxRerouteP99Ms))
}
if report.BytesSent <= 0 {
reasons = append(reasons, "no_bytes_sent")
}
if report.Config.MinThroughputMbps > 0 {
minThroughputBps := report.Config.MinThroughputMbps * 1000 * 1000
if report.ThroughputBps < minThroughputBps {
reasons = append(reasons, fmt.Sprintf("throughput_bps=%d < %d", report.ThroughputBps, minThroughputBps))
}
}
if report.Config.MinChannelChurn > 0 && report.ChannelChurnPerSec < report.Config.MinChannelChurn {
reasons = append(reasons, fmt.Sprintf("channel_churn_per_sec=%d < %d", report.ChannelChurnPerSec, report.Config.MinChannelChurn))
}
if report.AckMismatchedStreams > 0 {
reasons = append(reasons, fmt.Sprintf("ack_mismatched_streams=%d", report.AckMismatchedStreams))
}
if report.AckIntegrityErrors > 0 {
reasons = append(reasons, fmt.Sprintf("ack_integrity_errors=%d", report.AckIntegrityErrors))
}
if report.RoutePressure.ActiveTotal != 0 || len(report.RoutePressure.Active) != 0 {
reasons = append(reasons, fmt.Sprintf("route_pressure_active_leak=%d", report.RoutePressure.ActiveTotal))
}
if report.RoutePressure.AcquiredTotal != report.RoutePressure.ReleasedTotal {
reasons = append(reasons, fmt.Sprintf("route_pressure_acquire_release_mismatch=%d/%d", report.RoutePressure.AcquiredTotal, report.RoutePressure.ReleasedTotal))
}
if report.ChannelLeaks != 0 {
reasons = append(reasons, fmt.Sprintf("channel_leaks=%d", report.ChannelLeaks))
}
if report.ChannelOpens != report.ChannelCloses {
reasons = append(reasons, fmt.Sprintf("channel_open_close_mismatch=%d/%d", report.ChannelOpens, report.ChannelCloses))
}
routeAcquiresAndProbes := report.RoutePressure.AcquiredTotal + uint64(len(report.TargetProbes))
if report.RoutePressure.AcquiredTotal > 0 && report.ChannelOpens > routeAcquiresAndProbes {
reasons = append(reasons, fmt.Sprintf("channel_opens_exceed_route_acquires_and_probes=%d/%d", report.ChannelOpens, routeAcquiresAndProbes))
}
if report.SuccessfulStreams > 0 && report.RoutePressure.AcquiredTotal < uint64(report.SuccessfulStreams) {
reasons = append(reasons, fmt.Sprintf("route_pressure_missing_acquires=%d < successful_streams=%d", report.RoutePressure.AcquiredTotal, report.SuccessfulStreams))
}
if report.Config.Concurrency > 0 && report.RoutePressure.MaxActiveTotal > report.Config.Concurrency {
reasons = append(reasons, fmt.Sprintf("route_pressure_max_active_total=%d > concurrency=%d", report.RoutePressure.MaxActiveTotal, report.Config.Concurrency))
}
if report.Config.MaxAckP95Ms > 0 && report.AckP95Ms > report.Config.MaxAckP95Ms {
reasons = append(reasons, fmt.Sprintf("ack_p95_ms=%d > %d", report.AckP95Ms, report.Config.MaxAckP95Ms))
}
if report.Config.MaxAckP99Ms > 0 && report.AckP99Ms > report.Config.MaxAckP99Ms {
reasons = append(reasons, fmt.Sprintf("ack_p99_ms=%d > %d", report.AckP99Ms, report.Config.MaxAckP99Ms))
}
if report.ControlStreams > 0 && report.Config.MaxControlP95 > 0 && report.ControlAckP95Ms > report.Config.MaxControlP95 {
reasons = append(reasons, fmt.Sprintf("control_ack_p95_ms=%d > %d", report.ControlAckP95Ms, report.Config.MaxControlP95))
}
if report.Config.MaxGoroutineDelta > 0 && report.ResourceSummary.GoroutinesDelta > report.Config.MaxGoroutineDelta {
reasons = append(reasons, fmt.Sprintf("goroutine_delta=%d > %d", report.ResourceSummary.GoroutinesDelta, report.Config.MaxGoroutineDelta))
}
if report.Config.MaxHeapDeltaMB > 0 {
maxHeapDeltaBytes := report.Config.MaxHeapDeltaMB * 1024 * 1024
if report.ResourceSummary.HeapAllocDeltaBytes > maxHeapDeltaBytes {
reasons = append(reasons, fmt.Sprintf("heap_alloc_delta_bytes=%d > %d", report.ResourceSummary.HeapAllocDeltaBytes, maxHeapDeltaBytes))
}
}
if report.Config.MaxOpenFDDelta > 0 && report.ResourceSummary.OpenFDsStart >= 0 && report.ResourceSummary.OpenFDsDelta > report.Config.MaxOpenFDDelta {
reasons = append(reasons, fmt.Sprintf("open_fd_delta=%d > %d", report.ResourceSummary.OpenFDsDelta, report.Config.MaxOpenFDDelta))
}
if report.Config.MaxOpenFDs > 0 && report.ResourceSummary.OpenFDsMax >= 0 && report.ResourceSummary.OpenFDsMax > report.Config.MaxOpenFDs {
reasons = append(reasons, fmt.Sprintf("open_fds_max=%d > %d", report.ResourceSummary.OpenFDsMax, report.Config.MaxOpenFDs))
}
if report.Config.ImpairTarget >= 0 && report.Config.MigrateSlow && report.Config.MaxAckMs > 0 && len(report.DegradedTargets) == 0 {
reasons = append(reasons, "expected_degraded_target_not_observed")
}
if len(report.DegradedTargets) > 0 && report.MigrationEvents == 0 && report.Config.MigrateSlow {
reasons = append(reasons, "degraded_targets_without_migration")
}
reasons = append(reasons, targetDistributionVerdictReasons(report)...)
reasons = append(reasons, targetByteDistributionVerdictReasons(report)...)
reasons = append(reasons, targetAckVerdictReasons(report)...)
reasons = append(reasons, routePressureDistributionVerdictReasons(report)...)
reasons = append(reasons, targetEndpointPolicyVerdictReasons(report)...)
reasons = append(reasons, disallowedRouteModeVerdictReasons(report)...)
reasons = append(reasons, routeModeCoverageVerdictReasons(report)...)
if len(reasons) > 0 {
return "fail", reasons
}
return "pass", nil
}
func targetDistributionVerdictReasons(report loadtestReport) []string {
targets := loadBalancedVerdictTargets(report)
if len(targets) <= 1 || report.SuccessfulStreams < len(targets) {
return nil
}
if report.Config.ImpairTarget >= 0 {
return nil
}
minStreams := report.SuccessfulStreams
maxStreams := 0
usedTargets := 0
for _, target := range targets {
streams := report.TargetStreams[target]
if streams > 0 {
usedTargets++
}
if streams < minStreams {
minStreams = streams
}
if streams > maxStreams {
maxStreams = streams
}
}
if usedTargets < len(targets) {
return []string{fmt.Sprintf("target_distribution_collapsed=%d/%d_targets_used", usedTargets, len(targets))}
}
if loadtestProbeRTTHeterogeneous(report.TargetProbes) {
return nil
}
allowedSkew := report.Config.Concurrency
if quarter := report.SuccessfulStreams / 4; quarter > allowedSkew {
allowedSkew = quarter
}
if allowedSkew < 1 {
allowedSkew = 1
}
if maxStreams-minStreams > allowedSkew {
return []string{fmt.Sprintf("target_distribution_skew=max_%d_min_%d_allowed_%d", maxStreams, minStreams, allowedSkew)}
}
return nil
}
func targetByteDistributionVerdictReasons(report loadtestReport) []string {
targets := loadBalancedVerdictTargets(report)
if len(targets) <= 1 || report.SuccessfulStreams < len(targets) || report.BytesSent <= 0 {
return nil
}
if report.Config.ImpairTarget >= 0 {
return nil
}
minBytes := report.BytesSent
maxBytes := int64(0)
usedTargets := 0
for _, target := range targets {
bytesSent := report.TargetBytes[target]
if bytesSent > 0 {
usedTargets++
}
if bytesSent < minBytes {
minBytes = bytesSent
}
if bytesSent > maxBytes {
maxBytes = bytesSent
}
}
if usedTargets < len(targets) {
return nil
}
if loadtestProbeRTTHeterogeneous(report.TargetProbes) {
return nil
}
avgBytes := report.BytesSent / int64(len(targets))
allowedSkew := avgBytes / 4
if concurrencyBudget := int64(report.Config.Concurrency) * report.Config.BytesPerStream; concurrencyBudget > allowedSkew {
allowedSkew = concurrencyBudget
}
if allowedSkew < 1 {
allowedSkew = 1
}
if maxBytes-minBytes > allowedSkew {
return []string{fmt.Sprintf("target_byte_distribution_skew=max_%d_min_%d_allowed_%d", maxBytes, minBytes, allowedSkew)}
}
return nil
}
func targetAckVerdictReasons(report loadtestReport) []string {
if report.Config.MaxTargetAckMs <= 0 || len(report.TargetStats) == 0 {
return nil
}
if report.Config.ImpairTarget >= 0 {
return nil
}
var reasons []string
for _, target := range loadBalancedVerdictTargets(report) {
stats, ok := report.TargetStats[target]
if !ok || stats.Streams == 0 {
continue
}
if stats.MaxAckMs > report.Config.MaxTargetAckMs {
reasons = append(reasons, fmt.Sprintf("target_ack_ms=%s:%d>%d", target, stats.MaxAckMs, report.Config.MaxTargetAckMs))
}
}
return reasons
}
func routePressureDistributionVerdictReasons(report loadtestReport) []string {
targets := loadBalancedVerdictTargets(report)
if len(targets) <= 1 || report.Config.Concurrency <= 1 || report.RoutePressure.MaxActiveTotal <= 0 {
return nil
}
if report.Config.ImpairTarget >= 0 {
return nil
}
minActive := report.Config.Concurrency
maxActive := 0
usedTargets := 0
for _, target := range targets {
index := loadtestTargetIndex(report.Config.Targets, target)
routeID := loadtestRouteID(index, target)
active := report.RoutePressure.MaxActive[routeID]
if active > 0 {
usedTargets++
}
if active < minActive {
minActive = active
}
if active > maxActive {
maxActive = active
}
}
if usedTargets < len(targets) {
return []string{fmt.Sprintf("route_pressure_distribution_collapsed=%d/%d_targets_used", usedTargets, len(targets))}
}
if loadtestProbeRTTHeterogeneous(report.TargetProbes) {
return nil
}
allowedSkew := report.Config.Concurrency / 2
if allowedSkew < 1 {
allowedSkew = 1
}
if maxActive-minActive > allowedSkew {
return []string{fmt.Sprintf("route_pressure_distribution_skew=max_%d_min_%d_allowed_%d", maxActive, minActive, allowedSkew)}
}
return nil
}
func loadtestProbeRTTHeterogeneous(probes []targetProbeResult) bool {
minRTT := int64(0)
maxRTT := int64(0)
for _, probe := range probes {
if !probe.Usable || probe.RTTMs <= 0 {
continue
}
if minRTT == 0 || probe.RTTMs < minRTT {
minRTT = probe.RTTMs
}
if probe.RTTMs > maxRTT {
maxRTT = probe.RTTMs
}
}
return minRTT > 0 && maxRTT >= minRTT*4
}
func loadBalancedVerdictTargets(report loadtestReport) []string {
excluded := map[string]struct{}{}
for _, target := range report.ExcludedTargets {
excluded[strings.TrimSpace(target)] = struct{}{}
}
targets := make([]string, 0, len(report.Config.Targets))
for index, target := range report.Config.Targets {
target = strings.TrimSpace(target)
if target == "" {
continue
}
if index == report.Config.FailTarget {
continue
}
if _, skip := excluded[target]; skip {
continue
}
targets = append(targets, target)
}
return targets
}
func loadtestTargetIndex(targets []string, target string) int {
for index, candidate := range targets {
if strings.TrimSpace(candidate) == target {
return index
}
}
return -1
}
func targetEndpointPolicyVerdictReasons(report loadtestReport) []string {
if len(report.Config.Targets) == 0 {
return nil
}
var invalid []string
for _, target := range report.Config.Targets {
trimmed := strings.TrimSpace(target)
normalized := strings.ToLower(trimmed)
if normalized == "" {
invalid = append(invalid, "<empty>")
continue
}
if !strings.HasPrefix(normalized, "quic://") {
invalid = append(invalid, trimmed)
}
}
if len(invalid) == 0 {
return nil
}
sort.Strings(invalid)
return []string{fmt.Sprintf("non_quic_targets=%s", strings.Join(invalid, ","))}
}
func disallowedRouteModeVerdictReasons(report loadtestReport) []string {
if len(report.TargetStats) == 0 {
return nil
}
supportedModes := map[string]struct{}{
string(mesh.FabricRouteDirect): {},
string(mesh.FabricRouteLAN): {},
string(mesh.FabricRouteICE): {},
string(mesh.FabricRouteReverse): {},
string(mesh.FabricRouteRelay): {},
}
found := map[string]int{}
for _, stats := range report.TargetStats {
for mode, count := range stats.RouteModes {
mode = strings.ToLower(strings.TrimSpace(mode))
if _, supported := supportedModes[mode]; !supported && count > 0 {
found[mode] += count
}
}
}
if len(found) == 0 {
return nil
}
modes := make([]string, 0, len(found))
for mode, count := range found {
modes = append(modes, fmt.Sprintf("%s:%d", mode, count))
}
sort.Strings(modes)
return []string{fmt.Sprintf("compat_route_modes_observed=%s", strings.Join(modes, ","))}
}
func routeModeCoverageVerdictReasons(report loadtestReport) []string {
profile := strings.ToLower(strings.TrimSpace(report.Config.TopologyProfile))
if profile != "mixed-public-nat-lan-relay" && profile != "nat-lan-relay" {
return nil
}
if len(report.Config.Targets) < 4 || report.SuccessfulStreams < len(report.Config.Targets) {
return nil
}
if report.Config.FailTarget >= 0 || report.Config.ImpairTarget >= 0 || len(report.ExcludedTargets) > 0 {
return nil
}
observed := map[string]int{}
for _, stats := range report.TargetStats {
for mode, count := range stats.RouteModes {
observed[mode] += count
}
}
required := []string{
string(mesh.FabricRouteLAN),
string(mesh.FabricRouteICE),
string(mesh.FabricRouteReverse),
string(mesh.FabricRouteRelay),
}
var missing []string
for _, mode := range required {
if observed[mode] <= 0 {
missing = append(missing, mode)
}
}
if len(missing) > 0 {
return []string{fmt.Sprintf("route_mode_coverage_missing=%s", strings.Join(missing, ","))}
}
return nil
}
func percentile(values []int64, p int) int64 {
if len(values) == 0 {
return 0
}
values = append([]int64(nil), values...)
sort.Slice(values, func(i, j int) bool { return values[i] < values[j] })
index := ((len(values) * p) + 99) / 100
if index <= 0 {
index = 1
}
if index > len(values) {
index = len(values)
}
return values[index-1]
}
func writeReport(report loadtestReport) {
if path := strings.TrimSpace(report.Config.ReportPath); path != "" {
file, err := os.Create(path)
if err != nil {
log.Fatal(err)
}
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err := encoder.Encode(report); err != nil {
_ = file.Close()
log.Fatal(err)
}
if err := file.Close(); err != nil {
log.Fatal(err)
}
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
if err := encoder.Encode(report); err != nil {
log.Fatal(err)
}
}
func splitCSV(value string) []string {
parts := strings.Split(value, ",")
out := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
out = append(out, part)
}
}
return out
}
func loadtestQUICConfig(cfg loadtestConfig) *quic.Config {
maxStreams := int64(cfg.Concurrency)
if maxStreams < 1000 {
maxStreams = 1000
}
return &quic.Config{
EnableDatagrams: true,
MaxIncomingStreams: maxStreams,
MaxIncomingUniStreams: maxStreams,
MaxIdleTimeout: 2 * time.Minute,
KeepAlivePeriod: 15 * time.Second,
}
}
func selfSignedTLSConfig() (*tls.Config, string, error) {
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, "", err
}
template := x509.Certificate{
SerialNumber: big.NewInt(time.Now().UnixNano()),
Subject: pkix.Name{CommonName: "rap-fabric-loadtest"},
NotBefore: time.Now().Add(-time.Minute),
NotAfter: time.Now().Add(24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
DNSNames: []string{"localhost"},
}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
return nil, "", err
}
sum := sha256.Sum256(certDER)
return &tls.Config{
Certificates: []tls.Certificate{{
Certificate: [][]byte{certDER},
PrivateKey: key,
}},
NextProtos: []string{"rap-fabric-data-session-v1"},
}, hex.EncodeToString(sum[:]), nil
}