Report QUIC fabric capacity pressure

This commit is contained in:
2026-05-16 11:54:05 +03:00
parent 6dc7a61c9d
commit d67b80a7aa
3 changed files with 54 additions and 20 deletions
@@ -409,23 +409,26 @@ type fabricServiceChannelAccessStats struct {
} }
type vpnFabricSessionDialStats struct { type vpnFabricSessionDialStats struct {
Attempts atomic.Int64 Attempts atomic.Int64
Selected atomic.Int64 Selected atomic.Int64
CandidateFailures atomic.Int64 CandidateFailures atomic.Int64
TransportFailures atomic.Int64 TransportFailures atomic.Int64
SessionOpenFailures atomic.Int64 SessionOpenFailures atomic.Int64
StreamOpenFailures atomic.Int64 StreamOpenFailures atomic.Int64
CapacityLimited atomic.Int64 CapacityLimited atomic.Int64
AllCandidatesFailed atomic.Int64 AllCandidatesFailed atomic.Int64
QUICSelected atomic.Int64 QUICSelected atomic.Int64
WebSocketSelected atomic.Int64 WebSocketSelected atomic.Int64
LegacySelected atomic.Int64 LegacySelected atomic.Int64
PinnedCertSelected atomic.Int64 PinnedCertSelected atomic.Int64
LastTransport atomic.Value LastTransport atomic.Value
LastEndpoint atomic.Value LastEndpoint atomic.Value
LastFailureReason atomic.Value LastCapacityEndpoint atomic.Value
LastSelectedUnixSec atomic.Int64 LastCapacityTransport atomic.Value
LastFailureUnixSec atomic.Int64 LastFailureReason atomic.Value
LastSelectedUnixSec atomic.Int64
LastCapacityUnixSec atomic.Int64
LastFailureUnixSec atomic.Int64
} }
type vpnFabricEndpointObservationStore struct { type vpnFabricEndpointObservationStore struct {
@@ -610,6 +613,20 @@ func (s *vpnFabricSessionDialStats) ObserveCandidateFailure(reason string) {
s.LastFailureUnixSec.Store(time.Now().UTC().Unix()) s.LastFailureUnixSec.Store(time.Now().UTC().Unix())
} }
func (s *vpnFabricSessionDialStats) ObserveCapacityLimited(target mesh.FabricTransportTarget) {
if s == nil {
return
}
s.ObserveCandidateFailure("capacity_limited")
s.LastCapacityEndpoint.Store(strings.TrimSpace(target.Endpoint))
transport := strings.TrimSpace(target.Transport)
if transport == "" {
transport = "legacy_peer_endpoint"
}
s.LastCapacityTransport.Store(transport)
s.LastCapacityUnixSec.Store(time.Now().UTC().Unix())
}
func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() { func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() {
if s == nil { if s == nil {
return return
@@ -664,6 +681,7 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any
"legacy_selected": s.LegacySelected.Load(), "legacy_selected": s.LegacySelected.Load(),
"pinned_cert_selected": s.PinnedCertSelected.Load(), "pinned_cert_selected": s.PinnedCertSelected.Load(),
"last_selected_unix_sec": s.LastSelectedUnixSec.Load(), "last_selected_unix_sec": s.LastSelectedUnixSec.Load(),
"last_capacity_unix_sec": s.LastCapacityUnixSec.Load(),
"last_failure_unix_sec": s.LastFailureUnixSec.Load(), "last_failure_unix_sec": s.LastFailureUnixSec.Load(),
} }
if value, ok := s.LastTransport.Load().(string); ok && value != "" { if value, ok := s.LastTransport.Load().(string); ok && value != "" {
@@ -672,6 +690,12 @@ func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any
if value, ok := s.LastEndpoint.Load().(string); ok && value != "" { if value, ok := s.LastEndpoint.Load().(string); ok && value != "" {
report["last_endpoint"] = value report["last_endpoint"] = value
} }
if value, ok := s.LastCapacityEndpoint.Load().(string); ok && value != "" {
report["last_capacity_endpoint"] = value
}
if value, ok := s.LastCapacityTransport.Load().(string); ok && value != "" {
report["last_capacity_transport"] = value
}
if value, ok := s.LastFailureReason.Load().(string); ok && value != "" { if value, ok := s.LastFailureReason.Load().(string); ok && value != "" {
report["last_failure_reason"] = value report["last_failure_reason"] = value
} }
@@ -5037,8 +5061,10 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st
if err != nil { if err != nil {
cancel() cancel()
reason := fabricSessionOpenFailureReason(err) reason := fabricSessionOpenFailureReason(err)
meshState.VPNFabricSessionDialStats.ObserveCandidateFailure(reason) if reason == "capacity_limited" {
if reason != "capacity_limited" { meshState.VPNFabricSessionDialStats.ObserveCapacityLimited(selectedTarget)
} else {
meshState.VPNFabricSessionDialStats.ObserveCandidateFailure(reason)
meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, reason) meshState.VPNFabricEndpointObservations.ObserveFailure(selectedTarget.EndpointID, reason)
} }
log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=%s error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, reason, err) log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=%s error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, reason, err)
@@ -776,7 +776,10 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) {
func TestVPNFabricSessionDialStatsReport(t *testing.T) { func TestVPNFabricSessionDialStatsReport(t *testing.T) {
stats := newVPNFabricSessionDialStats() stats := newVPNFabricSessionDialStats()
stats.Attempts.Add(1) stats.Attempts.Add(1)
stats.ObserveCandidateFailure("capacity_limited") stats.ObserveCapacityLimited(mesh.FabricTransportTarget{
Endpoint: "quic://node-b.example.test:19443",
Transport: "direct_quic",
})
stats.ObserveCandidateFailure("session_open_failed") stats.ObserveCandidateFailure("session_open_failed")
stats.ObserveSelected(mesh.FabricTransportTarget{ stats.ObserveSelected(mesh.FabricTransportTarget{
Endpoint: "quic://node-b.example.test:19443", Endpoint: "quic://node-b.example.test:19443",
@@ -794,6 +797,8 @@ func TestVPNFabricSessionDialStatsReport(t *testing.T) {
report["pinned_cert_selected"] != int64(1) || report["pinned_cert_selected"] != int64(1) ||
report["last_transport"] != "direct_quic" || report["last_transport"] != "direct_quic" ||
report["last_endpoint"] != "quic://node-b.example.test:19443" || report["last_endpoint"] != "quic://node-b.example.test:19443" ||
report["last_capacity_endpoint"] != "quic://node-b.example.test:19443" ||
report["last_capacity_transport"] != "direct_quic" ||
report["last_failure_reason"] != "session_open_failed" { report["last_failure_reason"] != "session_open_failed" {
t.Fatalf("unexpected dial stats report: %+v", report) t.Fatalf("unexpected dial stats report: %+v", report)
} }
@@ -371,6 +371,9 @@ profiles.
QUIC stream-limit rejects are classified as capacity pressure instead of peer QUIC stream-limit rejects are classified as capacity pressure instead of peer
endpoint failure, so local health feedback does not incorrectly demote a healthy endpoint failure, so local health feedback does not incorrectly demote a healthy
but saturated carrier. but saturated carrier.
VPN fabric dial telemetry records the last capacity-limited endpoint and
transport, making stream saturation visible without poisoning endpoint health
observations.
Cached QUIC carrier idle TTL is configurable through Cached QUIC carrier idle TTL is configurable through
`RAP_VPN_FABRIC_QUIC_IDLE_TTL_SECONDS` / `-vpn-fabric-quic-idle-ttl` and `RAP_VPN_FABRIC_QUIC_IDLE_TTL_SECONDS` / `-vpn-fabric-quic-idle-ttl` and
propagated by host-agent install profiles. propagated by host-agent install profiles.