Summarize VPN fabric stream sharding health
This commit is contained in:
@@ -290,7 +290,10 @@ func smokeFabricVPNPacketOverSession(ctx context.Context, fabricSession *mesh.Fa
|
||||
snapshot := transport.Snapshot()
|
||||
framesByClass, _ := snapshot["send_frames_by_class"].(map[string]uint64)
|
||||
sharded := framesByClass[vpnruntime.FabricTrafficClassInteractive] == 1 &&
|
||||
framesByClass[vpnruntime.FabricTrafficClassBulk] == 1
|
||||
framesByClass[vpnruntime.FabricTrafficClassBulk] == 1 &&
|
||||
snapshot["sharding_active"] == true &&
|
||||
snapshot["send_class_count"] == 2 &&
|
||||
snapshot["send_stream_count"] == 2
|
||||
return true, sharded, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,10 +281,16 @@ func (t *FabricSessionPacketTransport) Snapshot() map[string]any {
|
||||
sendFramesByStream[fmt.Sprintf("%d", streamID)] = count
|
||||
}
|
||||
t.statsMu.Unlock()
|
||||
streamIDsByClass := copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass)
|
||||
return map[string]any{
|
||||
"schema_version": "rap.vpn_fabric_session_packet_transport.v1",
|
||||
"stream_id": t.StreamID,
|
||||
"stream_ids_by_class": copyStreamIDsByTrafficClass(t.StreamIDsByTrafficClass),
|
||||
"stream_ids_by_class": streamIDsByClass,
|
||||
"stream_class_count": len(streamIDsByClass),
|
||||
"stream_shard_count": countStreamIDs(streamIDsByClass) + len(t.StreamIDs),
|
||||
"send_class_count": countNonZeroStringUint64Values(sendFramesByClass),
|
||||
"send_stream_count": countNonZeroStringUint64Values(sendFramesByStream),
|
||||
"sharding_active": len(streamIDsByClass) > 1 || countStreamIDs(streamIDsByClass)+len(t.StreamIDs) > 1,
|
||||
"send_frames_by_class": sendFramesByClass,
|
||||
"send_packets_by_class": sendPacketsByClass,
|
||||
"send_frames_by_stream_id": sendFramesByStream,
|
||||
@@ -322,3 +328,21 @@ func copyStreamIDsByTrafficClass(values map[string][]uint64) map[string][]uint64
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func countStreamIDs(values map[string][]uint64) int {
|
||||
total := 0
|
||||
for _, ids := range values {
|
||||
total += len(ids)
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
func countNonZeroStringUint64Values(values map[string]uint64) int {
|
||||
total := 0
|
||||
for _, value := range values {
|
||||
if value > 0 {
|
||||
total++
|
||||
}
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
@@ -265,6 +265,13 @@ func TestFabricSessionPacketTransportShardsStreamsByTrafficClass(t *testing.T) {
|
||||
if snapshot["schema_version"] != "rap.vpn_fabric_session_packet_transport.v1" {
|
||||
t.Fatalf("snapshot schema missing: %+v", snapshot)
|
||||
}
|
||||
if snapshot["stream_class_count"] != 2 ||
|
||||
snapshot["stream_shard_count"] != 4 ||
|
||||
snapshot["send_class_count"] != 2 ||
|
||||
snapshot["send_stream_count"] != 2 ||
|
||||
snapshot["sharding_active"] != true {
|
||||
t.Fatalf("unexpected shard summary: %+v", snapshot)
|
||||
}
|
||||
framesByClass := snapshot["send_frames_by_class"].(map[string]uint64)
|
||||
if framesByClass[FabricTrafficClassBulk] != 1 || framesByClass[FabricTrafficClassInteractive] != 1 {
|
||||
t.Fatalf("send frames by class = %+v", framesByClass)
|
||||
|
||||
@@ -388,6 +388,9 @@ VPN fabric-session/QUIC tuning flags as install profiles, keeping manual and
|
||||
profile-based rollout paths aligned.
|
||||
Gateway runtime snapshots include the fabric-session packet transport stream
|
||||
layout and send counters by traffic class/stream id for load-test diagnosis.
|
||||
Those snapshots also summarize configured stream class/shard counts and active
|
||||
send class/stream counts, making sharding health visible without expanding
|
||||
per-stream maps.
|
||||
Endpoint ranking treats `capacity_limited` observations as a soft pressure
|
||||
penalty instead of a hard recent failure, enabling load spreading without
|
||||
marking the carrier unhealthy.
|
||||
|
||||
Reference in New Issue
Block a user