package mesh import ( "context" "crypto/sha256" "crypto/tls" "crypto/x509" "encoding/hex" "fmt" "strings" "sync" "time" "github.com/example/remote-access-platform/agents/rap-node-agent/internal/fabricproto" "github.com/quic-go/quic-go" ) const fabricQUICNextProto = "rap-fabric-data-session-v1" const defaultQUICFabricConnIdleTTL = 5 * time.Minute const defaultQUICFabricMaxStreamsPerConn = 64 const ErrQUICFabricStreamLimitReached = quicFabricError("quic fabric stream limit reached") type quicFabricError string func (e quicFabricError) Error() string { return string(e) } type QUICFabricTransport struct { Config *quic.Config IdleTTL time.Duration MaxStreamsPerConn int mu sync.Mutex conns map[string]*quicFabricConnEntry stats QUICFabricTransportStats } type QUICFabricTransportStats struct { Opens uint64 `json:"opens"` Reuses uint64 `json:"reuses"` OpenFailures uint64 `json:"open_failures"` ClosedEvicted uint64 `json:"closed_evicted"` CloseAllCalls uint64 `json:"close_all_calls"` IdleEvicted uint64 `json:"idle_evicted"` StreamOpens uint64 `json:"stream_opens"` StreamCloses uint64 `json:"stream_closes"` StreamLimitRejects uint64 `json:"stream_limit_rejects"` } type QUICFabricTransportSnapshot struct { SchemaVersion string `json:"schema_version"` ActiveCount int `json:"active_count"` ActiveStreams int `json:"active_streams"` MaxStreamsPerConn int `json:"max_streams_per_conn"` SaturatedConnections int `json:"saturated_connections"` CapacityPressurePercent int `json:"capacity_pressure_percent"` Connections []QUICFabricConnSnapshot `json:"connections,omitempty"` Stats QUICFabricTransportStats `json:"stats"` } type QUICFabricConnSnapshot struct { PeerID string `json:"peer_id,omitempty"` Endpoint string `json:"endpoint,omitempty"` CertSHA256 string `json:"cert_sha256,omitempty"` ActiveStreams int `json:"active_streams"` MaxStreams int `json:"max_streams"` CapacityPressurePercent int `json:"capacity_pressure_percent"` Saturated bool `json:"saturated"` LastUsedUnixSec int64 `json:"last_used_unix_sec,omitempty"` } type quicFabricSession struct { conn *quic.Conn stream *quic.Stream inbound chan fabricproto.Frame errors chan error done chan struct{} closeOnce sync.Once writeMu sync.Mutex maxPayload int timeout time.Duration closeConn bool transport *QUICFabricTransport connKey string } type quicFabricConnEntry struct { conn *quic.Conn lastUsed time.Time activeStreams int } func NewQUICFabricTransport(config *quic.Config) *QUICFabricTransport { return &QUICFabricTransport{Config: config, IdleTTL: defaultQUICFabricConnIdleTTL, MaxStreamsPerConn: defaultQUICFabricMaxStreamsPerConn, conns: map[string]*quicFabricConnEntry{}} } func quicTLSConfigForTarget(target FabricTransportTarget) *tls.Config { expectedFingerprint := normalizeCertSHA256(target.PeerCertSHA256) config := &tls.Config{NextProtos: []string{fabricQUICNextProto}} if expectedFingerprint == "" { return config } config.InsecureSkipVerify = true config.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) error { if len(rawCerts) == 0 { return fmt.Errorf("quic peer certificate missing") } sum := sha256.Sum256(rawCerts[0]) actual := hex.EncodeToString(sum[:]) if actual != expectedFingerprint { return fmt.Errorf("quic peer certificate fingerprint mismatch") } return nil } return config } func normalizeCertSHA256(value string) string { value = strings.ToLower(strings.TrimSpace(value)) value = strings.ReplaceAll(value, "sha256:", "") value = strings.ReplaceAll(value, ":", "") return value } func (t *QUICFabricTransport) Connect(ctx context.Context, target FabricTransportTarget) (FabricTransportSession, error) { if target.Endpoint == "" { return nil, fmt.Errorf("quic fabric endpoint is required") } target.Endpoint = strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://") tlsConfig := target.TLSConfig if tlsConfig == nil { tlsConfig = quicTLSConfigForTarget(target) } else { tlsConfig = tlsConfig.Clone() if len(tlsConfig.NextProtos) == 0 { tlsConfig.NextProtos = []string{fabricQUICNextProto} } } conn, connKey, closeConn, err := t.connectConn(ctx, target, tlsConfig) if err != nil { return nil, err } if err := t.reserveStream(connKey, conn); err != nil { return nil, err } stream, err := conn.OpenStreamSync(ctx) if err != nil { t.releaseStream(connKey) t.evictConn(target, conn) if closeConn { _ = conn.CloseWithError(1, "open stream failed") } return nil, err } maxPayload := target.MaxPayload if maxPayload <= 0 { maxPayload = fabricproto.DefaultMaxPayload } inboundBuffer := target.InboundBuffer if inboundBuffer <= 0 { inboundBuffer = 64 } errorBuffer := target.ErrorBuffer if errorBuffer <= 0 { errorBuffer = 8 } session := &quicFabricSession{ conn: conn, stream: stream, inbound: make(chan fabricproto.Frame, inboundBuffer), errors: make(chan error, errorBuffer), done: make(chan struct{}), maxPayload: maxPayload, timeout: target.Timeout, closeConn: closeConn, transport: t, connKey: connKey, } go session.readLoop(context.Background()) return session, nil } func (t *QUICFabricTransport) connectConn(ctx context.Context, target FabricTransportTarget, tlsConfig *tls.Config) (*quic.Conn, string, bool, error) { if t == nil { conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, nil) return conn, "", true, err } key := quicFabricConnKey(target) if key == "" { conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) return conn, "", true, err } t.mu.Lock() t.pruneIdleLocked(time.Now()) if entry := t.conns[key]; entry != nil && entry.conn != nil { select { case <-entry.conn.Context().Done(): delete(t.conns, key) t.stats.ClosedEvicted++ default: entry.lastUsed = time.Now() t.stats.Reuses++ t.mu.Unlock() return entry.conn, key, false, nil } } t.mu.Unlock() conn, err := quic.DialAddr(ctx, target.Endpoint, tlsConfig, t.Config) if err != nil { t.mu.Lock() t.stats.OpenFailures++ t.mu.Unlock() return nil, "", false, err } t.mu.Lock() t.pruneIdleLocked(time.Now()) if existing := t.conns[key]; existing != nil && existing.conn != nil { select { case <-existing.conn.Context().Done(): delete(t.conns, key) t.stats.ClosedEvicted++ default: existing.lastUsed = time.Now() t.stats.Reuses++ t.mu.Unlock() _ = conn.CloseWithError(0, "duplicate connection") return existing.conn, key, false, nil } } if t.conns == nil { t.conns = map[string]*quicFabricConnEntry{} } t.conns[key] = &quicFabricConnEntry{conn: conn, lastUsed: time.Now()} t.stats.Opens++ t.mu.Unlock() return conn, key, false, nil } func (t *QUICFabricTransport) reserveStream(key string, conn *quic.Conn) error { if t == nil || key == "" { return nil } t.mu.Lock() defer t.mu.Unlock() entry := t.conns[key] if entry == nil || entry.conn != conn { return fmt.Errorf("quic fabric connection is not cached") } limit := t.MaxStreamsPerConn if limit <= 0 { limit = defaultQUICFabricMaxStreamsPerConn } if entry.activeStreams >= limit { t.stats.StreamLimitRejects++ return ErrQUICFabricStreamLimitReached } entry.activeStreams++ entry.lastUsed = time.Now() t.stats.StreamOpens++ return nil } func (t *QUICFabricTransport) releaseStream(key string) { if t == nil || key == "" { return } t.mu.Lock() if entry := t.conns[key]; entry != nil { if entry.activeStreams > 0 { entry.activeStreams-- } entry.lastUsed = time.Now() t.stats.StreamCloses++ } t.mu.Unlock() } func (t *QUICFabricTransport) evictConn(target FabricTransportTarget, conn *quic.Conn) { if t == nil || conn == nil { return } key := quicFabricConnKey(target) if key == "" { return } t.mu.Lock() if entry := t.conns[key]; entry != nil && entry.conn == conn { delete(t.conns, key) t.stats.ClosedEvicted++ } t.mu.Unlock() } func (t *QUICFabricTransport) pruneIdleLocked(now time.Time) { if t == nil || len(t.conns) == 0 { return } ttl := t.IdleTTL if ttl <= 0 { ttl = defaultQUICFabricConnIdleTTL } for key, entry := range t.conns { if entry == nil || entry.conn == nil { delete(t.conns, key) continue } if !entry.lastUsed.IsZero() && now.Sub(entry.lastUsed) > ttl { if entry.activeStreams > 0 { continue } _ = entry.conn.CloseWithError(0, "idle") delete(t.conns, key) t.stats.IdleEvicted++ } } } func quicFabricConnKey(target FabricTransportTarget) string { peerID := strings.TrimSpace(target.PeerID) endpoint := strings.TrimPrefix(strings.TrimSpace(target.Endpoint), "quic://") if peerID == "" || endpoint == "" { return "" } return peerID + "\x00" + endpoint + "\x00" + normalizeCertSHA256(target.PeerCertSHA256) } func parseQUICFabricConnKey(key string) (peerID string, endpoint string, certSHA256 string) { parts := strings.SplitN(key, "\x00", 3) if len(parts) > 0 { peerID = parts[0] } if len(parts) > 1 { endpoint = parts[1] } if len(parts) > 2 { certSHA256 = parts[2] } return peerID, endpoint, certSHA256 } func (t *QUICFabricTransport) Close() error { if t == nil { return nil } t.mu.Lock() t.stats.CloseAllCalls++ conns := t.conns t.conns = map[string]*quicFabricConnEntry{} t.mu.Unlock() for _, entry := range conns { if entry != nil && entry.conn != nil { _ = entry.conn.CloseWithError(0, "closed") } } return nil } func (t *QUICFabricTransport) Snapshot() QUICFabricTransportSnapshot { if t == nil { return QUICFabricTransportSnapshot{SchemaVersion: "rap.quic_fabric_transport.v1"} } t.mu.Lock() defer t.mu.Unlock() t.pruneIdleLocked(time.Now()) limit := t.MaxStreamsPerConn if limit <= 0 { limit = defaultQUICFabricMaxStreamsPerConn } snapshot := QUICFabricTransportSnapshot{ SchemaVersion: "rap.quic_fabric_transport.v1", MaxStreamsPerConn: limit, Stats: t.stats, } for key, entry := range t.conns { if entry == nil || entry.conn == nil { delete(t.conns, key) continue } select { case <-entry.conn.Context().Done(): delete(t.conns, key) t.stats.ClosedEvicted++ snapshot.Stats.ClosedEvicted++ default: snapshot.ActiveCount++ snapshot.ActiveStreams += entry.activeStreams peerID, endpoint, certSHA256 := parseQUICFabricConnKey(key) connSnapshot := QUICFabricConnSnapshot{ PeerID: peerID, Endpoint: endpoint, CertSHA256: certSHA256, ActiveStreams: entry.activeStreams, MaxStreams: limit, Saturated: entry.activeStreams >= limit, } if !entry.lastUsed.IsZero() { connSnapshot.LastUsedUnixSec = entry.lastUsed.UTC().Unix() } if limit > 0 { connSnapshot.CapacityPressurePercent = (entry.activeStreams * 100) / limit } snapshot.Connections = append(snapshot.Connections, connSnapshot) if entry.activeStreams >= limit { snapshot.SaturatedConnections++ } } } if snapshot.ActiveCount > 0 && limit > 0 { capacity := snapshot.ActiveCount * limit if capacity > 0 { snapshot.CapacityPressurePercent = (snapshot.ActiveStreams * 100) / capacity } } return snapshot } func (s *quicFabricSession) Send(ctx context.Context, frame fabricproto.Frame) error { if s == nil || s.stream == nil { return fmt.Errorf("quic fabric session is closed") } select { case <-s.done: return fmt.Errorf("quic fabric session is closed") default: } s.writeMu.Lock() defer s.writeMu.Unlock() s.applyWriteDeadline(ctx) return fabricproto.WriteFrame(s.stream, frame) } func (s *quicFabricSession) Frames() <-chan fabricproto.Frame { if s == nil { return nil } return s.inbound } func (s *quicFabricSession) Errors() <-chan error { if s == nil { return nil } return s.errors } func (s *quicFabricSession) Close() error { if s == nil { return nil } var err error s.closeOnce.Do(func() { close(s.done) if s.stream != nil { err = s.stream.Close() } if s.transport != nil { s.transport.releaseStream(s.connKey) } if s.conn != nil { if s.closeConn { _ = s.conn.CloseWithError(0, "closed") } } }) return err } func (s *quicFabricSession) Closed() bool { if s == nil { return true } select { case <-s.done: return true default: return false } } func (s *quicFabricSession) readLoop(ctx context.Context) { defer s.Close() for { s.applyReadDeadline(ctx) frame, err := fabricproto.ReadFrame(s.stream, s.maxPayload) if err != nil { s.reportError(err) return } select { case <-ctx.Done(): s.reportError(ctx.Err()) return case <-s.done: return case s.inbound <- frame: } } } func (s *quicFabricSession) reportError(err error) { if err == nil { return } select { case s.errors <- err: default: } } func (s *quicFabricSession) applyReadDeadline(ctx context.Context) { if deadline, ok := ctx.Deadline(); ok { _ = s.stream.SetReadDeadline(deadline) } else if s.timeout > 0 { _ = s.stream.SetReadDeadline(time.Now().Add(s.timeout)) } } func (s *quicFabricSession) applyWriteDeadline(ctx context.Context) { if deadline, ok := ctx.Deadline(); ok { _ = s.stream.SetWriteDeadline(deadline) } else if s.timeout > 0 { _ = s.stream.SetWriteDeadline(time.Now().Add(s.timeout)) } }