diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main.go b/agents/rap-node-agent/cmd/rap-node-agent/main.go index e8d0cc7..b3d387b 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main.go @@ -362,6 +362,7 @@ type syntheticMeshState struct { VPNFabricIngress *vpnruntime.FabricClientPacketIngress VPNFabricSessionPeers *mesh.FabricSessionPeerManager VPNFabricTransport *mesh.WebSocketFabricTransport + VPNFabricSessionDialStats *vpnFabricSessionDialStats PeerEndpoints map[string]string PeerEndpointCandidates map[string][]mesh.PeerEndpointCandidate VPNGateway *vpnruntime.Gateway @@ -399,6 +400,131 @@ type fabricServiceChannelAccessStats struct { LastViolationReason atomic.Value } +type vpnFabricSessionDialStats struct { + Attempts atomic.Int64 + Selected atomic.Int64 + CandidateFailures atomic.Int64 + TransportFailures atomic.Int64 + SessionOpenFailures atomic.Int64 + StreamOpenFailures atomic.Int64 + AllCandidatesFailed atomic.Int64 + QUICSelected atomic.Int64 + WebSocketSelected atomic.Int64 + LegacySelected atomic.Int64 + PinnedCertSelected atomic.Int64 + LastTransport atomic.Value + LastEndpoint atomic.Value + LastFailureReason atomic.Value + LastSelectedUnixSec atomic.Int64 + LastFailureUnixSec atomic.Int64 +} + +func newVPNFabricSessionDialStats() *vpnFabricSessionDialStats { + return &vpnFabricSessionDialStats{} +} + +func fabricTransportLabelIsQUIC(label string) bool { + switch strings.ToLower(strings.TrimSpace(label)) { + case "quic", "direct_quic", "udp_quic", "quic_udp": + return true + default: + return false + } +} + +func fabricTransportLabelIsWebSocket(label string) bool { + switch strings.ToLower(strings.TrimSpace(label)) { + case "websocket", "ws", "wss", "direct_http", "direct_https", "direct_tcp_tls": + return true + default: + return false + } +} + +func (s *vpnFabricSessionDialStats) ObserveCandidateFailure(reason string) { + if s == nil { + return + } + s.CandidateFailures.Add(1) + switch strings.TrimSpace(reason) { + case "transport_select_failed": + s.TransportFailures.Add(1) + case "session_open_failed": + s.SessionOpenFailures.Add(1) + case "stream_open_failed": + s.StreamOpenFailures.Add(1) + } + s.LastFailureReason.Store(strings.TrimSpace(reason)) + s.LastFailureUnixSec.Store(time.Now().UTC().Unix()) +} + +func (s *vpnFabricSessionDialStats) ObserveAllCandidatesFailed() { + if s == nil { + return + } + s.AllCandidatesFailed.Add(1) + s.LastFailureReason.Store("all_candidates_failed") + s.LastFailureUnixSec.Store(time.Now().UTC().Unix()) +} + +func (s *vpnFabricSessionDialStats) ObserveSelected(target mesh.FabricTransportTarget) { + if s == nil { + return + } + s.Selected.Add(1) + transport := strings.TrimSpace(target.Transport) + if transport == "" { + transport = "legacy_peer_endpoint" + } + s.LastTransport.Store(transport) + s.LastEndpoint.Store(strings.TrimSpace(target.Endpoint)) + s.LastSelectedUnixSec.Store(time.Now().UTC().Unix()) + switch { + case fabricTransportLabelIsQUIC(transport): + s.QUICSelected.Add(1) + case transport == "legacy_peer_endpoint": + s.LegacySelected.Add(1) + case fabricTransportLabelIsWebSocket(transport): + s.WebSocketSelected.Add(1) + } + if strings.TrimSpace(target.PeerCertSHA256) != "" { + s.PinnedCertSelected.Add(1) + } +} + +func (s *vpnFabricSessionDialStats) Report(observedAt time.Time) map[string]any { + if s == nil { + return nil + } + report := map[string]any{ + "schema_version": "rap.vpn_fabric_session_dial_stats.v1", + "observed_at": observedAt.UTC().Format(time.RFC3339Nano), + "attempts": s.Attempts.Load(), + "selected": s.Selected.Load(), + "candidate_failures": s.CandidateFailures.Load(), + "transport_failures": s.TransportFailures.Load(), + "session_open_failures": s.SessionOpenFailures.Load(), + "stream_open_failures": s.StreamOpenFailures.Load(), + "all_candidates_failed": s.AllCandidatesFailed.Load(), + "quic_selected": s.QUICSelected.Load(), + "websocket_selected": s.WebSocketSelected.Load(), + "legacy_selected": s.LegacySelected.Load(), + "pinned_cert_selected": s.PinnedCertSelected.Load(), + "last_selected_unix_sec": s.LastSelectedUnixSec.Load(), + "last_failure_unix_sec": s.LastFailureUnixSec.Load(), + } + if value, ok := s.LastTransport.Load().(string); ok && value != "" { + report["last_transport"] = value + } + if value, ok := s.LastEndpoint.Load().(string); ok && value != "" { + report["last_endpoint"] = value + } + if value, ok := s.LastFailureReason.Load().(string); ok && value != "" { + report["last_failure_reason"] = value + } + return report +} + func newFabricServiceChannelAccessStats() *fabricServiceChannelAccessStats { return &fabricServiceChannelAccessStats{} } @@ -818,6 +944,7 @@ func startSyntheticMeshEndpoint(ctx context.Context, _ context.CancelFunc, cfg c VPNFabricIngress: vpnFabricIngress, VPNFabricSessionPeers: vpnFabricSessionPeers, VPNFabricTransport: mesh.NewWebSocketFabricTransport(vpnFabricSessionPeers), + VPNFabricSessionDialStats: newVPNFabricSessionDialStats(), PeerEndpoints: copyStringMap(peerEndpoints), PeerEndpointCandidates: copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates), VPNGateway: vpnGateway, @@ -1711,6 +1838,9 @@ func applyRefreshedSyntheticMeshConfig(ctx context.Context, cfg config.Config, i if meshState.VPNFabricTransport == nil { meshState.VPNFabricTransport = mesh.NewWebSocketFabricTransport(meshState.VPNFabricSessionPeers) } + if meshState.VPNFabricSessionDialStats == nil { + meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() + } meshState.PeerEndpoints = copyStringMap(loadedConfig.PeerEndpoints) meshState.PeerEndpointCandidates = copyPeerEndpointCandidatesMap(loadedConfig.PeerEndpointCandidates) if productionForwardingEnabled { @@ -2643,6 +2773,9 @@ func heartbeatPayload(cfg config.Config, identity state.Identity, meshState *syn } else if meshState != nil && meshState.VPNFabricSessionPeers != nil { report["peer_sessions"] = meshState.VPNFabricSessionPeers.Snapshot() } + if meshState != nil && meshState.VPNFabricSessionDialStats != nil { + report["dial_stats"] = meshState.VPNFabricSessionDialStats.Report(observedAt) + } payload.Metadata["vpn_fabric_session_transport_report"] = report payload.Capabilities["vpn_fabric_session_transport"] = true payload.Capabilities["vpn_packet_batch_binary_frames"] = true @@ -4633,6 +4766,10 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=peer_endpoint_missing", assignment.VPNConnectionID, nextHop) return nil } + if meshState.VPNFabricSessionDialStats == nil { + meshState.VPNFabricSessionDialStats = newVPNFabricSessionDialStats() + } + meshState.VPNFabricSessionDialStats.Attempts.Add(1) if meshState.VPNFabricSessionPeers == nil { meshState.VPNFabricSessionPeers = mesh.NewFabricSessionPeerManager() } @@ -4651,12 +4788,14 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st carrier, selectedTarget, err := mesh.FabricTransportForTarget(target, meshState.VPNFabricTransport, nil) if err != nil { cancel() + meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("transport_select_failed") log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=transport_select_failed error=%v", assignment.VPNConnectionID, nextHop, index, target.Endpoint, target.Transport, err) continue } session, err := carrier.Connect(dialCtx, selectedTarget) if err != nil { cancel() + meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("session_open_failed") log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=session_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) continue } @@ -4671,10 +4810,12 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st }); err != nil { cancel() _ = session.Close() + meshState.VPNFabricSessionDialStats.ObserveCandidateFailure("stream_open_failed") log.Printf("vpn fabric session candidate skipped: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s reason=stream_open_failed error=%v", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, err) continue } cancel() + meshState.VPNFabricSessionDialStats.ObserveSelected(selectedTarget) log.Printf("vpn fabric session transport selected: vpn_connection_id=%s next_hop=%s candidate=%d endpoint=%s transport=%s pinned_cert=%t fallback_candidates=%d", assignment.VPNConnectionID, nextHop, index, selectedTarget.Endpoint, selectedTarget.Transport, selectedTarget.PeerCertSHA256 != "", len(targets)-index-1) return &vpnruntime.FabricSessionPacketTransport{ Sender: session, @@ -4687,6 +4828,7 @@ func fabricSessionGatewayTransportForAssignment(ctx context.Context, identity st TrafficClass: vpnruntime.FabricTrafficClassInteractive, } } + meshState.VPNFabricSessionDialStats.ObserveAllCandidatesFailed() log.Printf("vpn fabric session transport skipped: vpn_connection_id=%s next_hop=%s reason=all_candidates_failed candidates=%d", assignment.VPNConnectionID, nextHop, len(targets)) return nil } diff --git a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go index eba5f81..24068ea 100644 --- a/agents/rap-node-agent/cmd/rap-node-agent/main_test.go +++ b/agents/rap-node-agent/cmd/rap-node-agent/main_test.go @@ -755,6 +755,30 @@ func TestHeartbeatPayloadIncludesMeshEndpointReport(t *testing.T) { } } +func TestVPNFabricSessionDialStatsReport(t *testing.T) { + stats := newVPNFabricSessionDialStats() + stats.Attempts.Add(1) + stats.ObserveCandidateFailure("session_open_failed") + stats.ObserveSelected(mesh.FabricTransportTarget{ + Endpoint: "quic://node-b.example.test:19443", + Transport: "direct_quic", + PeerCertSHA256: "abcdef", + }) + + report := stats.Report(time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC)) + if report["attempts"] != int64(1) || + report["selected"] != int64(1) || + report["candidate_failures"] != int64(1) || + report["session_open_failures"] != int64(1) || + report["quic_selected"] != int64(1) || + report["pinned_cert_selected"] != int64(1) || + report["last_transport"] != "direct_quic" || + report["last_endpoint"] != "quic://node-b.example.test:19443" || + report["last_failure_reason"] != "session_open_failed" { + t.Fatalf("unexpected dial stats report: %+v", report) + } +} + func TestVPNFabricSessionTargetPrefersRankedQUICCandidate(t *testing.T) { now := time.Now().UTC() target, ok := vpnFabricSessionTarget(&syntheticMeshState{ diff --git a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md index a66a62f..16f5574 100644 --- a/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md +++ b/docs/architecture/DISTRIBUTED_FABRIC_NODE_PROTOCOL_PLAN.md @@ -321,6 +321,9 @@ falling back to the legacy peer endpoint, so a failed QUIC candidate does not block WebSocket/HTTPS compatibility transport. Successful VPN fabric-session dialing logs the selected candidate, transport, certificate pin usage, and remaining fallback count for phone-side diagnostics. +Heartbeat telemetry now includes VPN fabric-session dial counters for attempts, +candidate failures, selected transport family, certificate pin usage, and the +last selected endpoint/failure reason. Deliverables: