package mesh import ( "context" "sync" ) type ProductionEnvelopeObservationSink struct { mu sync.Mutex capacity int items []ProductionEnvelopeObservation accepted uint64 dropped uint64 } type ProductionEnvelopeObservationSinkMetrics struct { Capacity int `json:"capacity"` CurrentDepth int `json:"current_depth"` AcceptedTotal uint64 `json:"accepted_total"` DroppedOldest uint64 `json:"dropped_oldest"` } func NewProductionEnvelopeObservationSink(capacity int) *ProductionEnvelopeObservationSink { if capacity < 1 { capacity = 1 } return &ProductionEnvelopeObservationSink{ capacity: capacity, items: make([]ProductionEnvelopeObservation, 0, capacity), } } func (s *ProductionEnvelopeObservationSink) Observe(_ context.Context, observation ProductionEnvelopeObservation) error { s.mu.Lock() defer s.mu.Unlock() s.accepted++ if len(s.items) == s.capacity { copy(s.items, s.items[1:]) s.items[len(s.items)-1] = observation s.dropped++ return nil } s.items = append(s.items, observation) return nil } func (s *ProductionEnvelopeObservationSink) Snapshot() []ProductionEnvelopeObservation { s.mu.Lock() defer s.mu.Unlock() out := make([]ProductionEnvelopeObservation, len(s.items)) copy(out, s.items) return out } func (s *ProductionEnvelopeObservationSink) Len() int { s.mu.Lock() defer s.mu.Unlock() return len(s.items) } func (s *ProductionEnvelopeObservationSink) Capacity() int { s.mu.Lock() defer s.mu.Unlock() return s.capacity } func (s *ProductionEnvelopeObservationSink) Metrics() ProductionEnvelopeObservationSinkMetrics { s.mu.Lock() defer s.mu.Unlock() return ProductionEnvelopeObservationSinkMetrics{ Capacity: s.capacity, CurrentDepth: len(s.items), AcceptedTotal: s.accepted, DroppedOldest: s.dropped, } }