2116 lines
97 KiB
Go
2116 lines
97 KiB
Go
package mesh
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity = 8
|
|
const DefaultRemoteWorkspaceFrameProbeSinkSessionTTL = 2 * time.Minute
|
|
const DefaultRemoteWorkspaceAdapterMailboxCapacity = 16
|
|
const DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity = 32
|
|
const RemoteWorkspaceFrameProbeSinkRuntimeID = "node_agent_rdp_worker_contract_probe"
|
|
|
|
type RemoteWorkspaceFrameProbeSink struct {
|
|
mu sync.Mutex
|
|
sequence int64
|
|
queueCapacity int
|
|
sessionTTL time.Duration
|
|
sessions map[string]*remoteWorkspaceAdapterProbeSession
|
|
terminalSessions map[string]remoteWorkspaceAdapterProbeTerminalSession
|
|
sessionCreatedTotal int64
|
|
sessionBoundTotal int64
|
|
sessionBackpressureTotal int64
|
|
sessionExpiredTotal int64
|
|
sessionClosedTotal int64
|
|
sessionResetTotal int64
|
|
sessionControlTotal int64
|
|
mailboxEventSequence int64
|
|
mailboxEnqueuedTotal int64
|
|
mailboxDrainedTotal int64
|
|
mailboxDroppedTotal int64
|
|
mailboxReadTotal int64
|
|
mailboxWaitTotal int64
|
|
mailboxWaitTimeoutTotal int64
|
|
mailboxEmptyReadTotal int64
|
|
mailboxResumeReadTotal int64
|
|
mailboxAfterSequenceReadTotal int64
|
|
mailboxReturnedTotal int64
|
|
mailboxSkippedTotal int64
|
|
mailboxPreflightTotal int64
|
|
mailboxPreflightAckTotal int64
|
|
mailboxPreflightCheckpointTotal int64
|
|
mailboxConsumerReadTotal int64
|
|
mailboxConsumerAckTotal int64
|
|
mailboxConsumerResetTotal int64
|
|
mailboxConsumerEvictedTotal int64
|
|
lastMailboxReadAt string
|
|
lastMailboxAdapterSessionID string
|
|
lastMailboxWaitMs int
|
|
lastMailboxWaited bool
|
|
lastMailboxWaitTimeout bool
|
|
lastMailboxEmpty bool
|
|
lastMailboxResumeFrom string
|
|
lastMailboxResumeSequence int64
|
|
lastMailboxResumeConsumerID string
|
|
lastMailboxAfterSequence int64
|
|
lastMailboxSkippedCount int
|
|
lastMailboxReturnedCount int
|
|
lastMailboxPreflightAt string
|
|
lastMailboxPreflightAdapterSessionID string
|
|
lastMailboxPreflightConsumerID string
|
|
lastMailboxPreflightResumeFrom string
|
|
lastMailboxPreflightResumeSequence int64
|
|
lastMailboxPreflightAfterSequence int64
|
|
lastMailboxPreflightAvailableCount int
|
|
lastMailboxPreflightReturnedCount int
|
|
lastMailboxPreflightSkippedCount int
|
|
lastMailboxPreflightFirstSequence int64
|
|
lastMailboxPreflightLastSequence int64
|
|
lastMailboxPreflightFirstRetained int64
|
|
lastMailboxPreflightLastRetained int64
|
|
lastMailboxPreflightMailboxDropped int64
|
|
lastMailboxPreflightDiagnosticState string
|
|
lastMailboxPreflightStaleCursor bool
|
|
lastMailboxPreflightMissingDropped int
|
|
lastMailboxPreflightRecommendedAction string
|
|
lastMailboxPreflightActionHints []string
|
|
lastMailboxPreflightActionReason string
|
|
lastMailboxPreflightActionContext map[string]any
|
|
lastMailboxPreflightOperatorSummary string
|
|
lastMailboxPreflightOperatorStatus string
|
|
lastMailboxPreflightOperatorSeverity string
|
|
lastMailboxPreflightOperatorFields map[string]any
|
|
lastMailboxConsumerID string
|
|
lastMailboxConsumerAdapterSessionID string
|
|
lastMailboxConsumerReadAt string
|
|
lastMailboxConsumerAckAt string
|
|
lastMailboxConsumerCheckpoint int64
|
|
lastMailboxConsumerAck int64
|
|
acceptedFramesTotal int64
|
|
droppedFramesTotal int64
|
|
ackedFramesTotal int64
|
|
backpressureCount int64
|
|
lastBackpressureAt string
|
|
lastBackpressureReason string
|
|
lastRejectedFrameCount int
|
|
lastRejectedAdapterSessionID string
|
|
lastRejectedChannelClass string
|
|
lastRejectedAdapterContractID string
|
|
lastRejectedQueueCapacity int
|
|
lastRejectedQueueDepth int
|
|
lastControl RemoteWorkspaceAdapterSessionControlResult
|
|
last RemoteWorkspaceFrameBatchDeliveryReceipt
|
|
}
|
|
|
|
type remoteWorkspaceAdapterProbeSession struct {
|
|
ID string
|
|
State string
|
|
CreatedAt time.Time
|
|
BoundAt time.Time
|
|
LastActivityAt time.Time
|
|
LastBackpressureAt time.Time
|
|
ClosedAt time.Time
|
|
DeliveryCount int64
|
|
BackpressureCount int64
|
|
AcceptedFrames int64
|
|
DroppedFrames int64
|
|
AckedFrames int64
|
|
Mailbox []RemoteWorkspaceAdapterMailboxEvent
|
|
MailboxEnqueued int64
|
|
MailboxDrained int64
|
|
MailboxDropped int64
|
|
MailboxRead int64
|
|
MailboxWait int64
|
|
MailboxWaitTimeout int64
|
|
MailboxEmptyRead int64
|
|
MailboxResumeRead int64
|
|
MailboxAfterSequenceRead int64
|
|
MailboxReturnedTotal int64
|
|
MailboxSkippedTotal int64
|
|
MailboxPreflightTotal int64
|
|
MailboxPreflightAckTotal int64
|
|
MailboxPreflightCheckpointTotal int64
|
|
MailboxPreflightOperatorStatusCounts map[string]int64
|
|
MailboxPreflightOperatorSeverityCounts map[string]int64
|
|
MailboxConsumers map[string]*remoteWorkspaceAdapterMailboxConsumerState
|
|
MailboxConsumerReadTotal int64
|
|
MailboxConsumerAckTotal int64
|
|
MailboxConsumerResetTotal int64
|
|
MailboxConsumerEvictedTotal int64
|
|
LastMailboxConsumerID string
|
|
LastMailboxConsumerReadAt time.Time
|
|
LastMailboxConsumerAckAt time.Time
|
|
LastMailboxConsumerCheckpoint int64
|
|
LastMailboxConsumerAck int64
|
|
LastMailboxReadAt time.Time
|
|
LastMailboxWaitMs int
|
|
LastMailboxWaited bool
|
|
LastMailboxTimeout bool
|
|
LastMailboxEmpty bool
|
|
LastMailboxResumeFrom string
|
|
LastMailboxResumeSequence int64
|
|
LastMailboxResumeConsumerID string
|
|
LastMailboxAfterSequence int64
|
|
LastMailboxSkippedCount int
|
|
LastMailboxReturnedCount int
|
|
LastMailboxPreflightAt time.Time
|
|
LastMailboxPreflightConsumerID string
|
|
LastMailboxPreflightResumeFrom string
|
|
LastMailboxPreflightResumeSequence int64
|
|
LastMailboxPreflightAfterSequence int64
|
|
LastMailboxPreflightAvailableCount int
|
|
LastMailboxPreflightReturnedCount int
|
|
LastMailboxPreflightSkippedCount int
|
|
LastMailboxPreflightFirstSequence int64
|
|
LastMailboxPreflightLastSequence int64
|
|
LastMailboxPreflightFirstRetained int64
|
|
LastMailboxPreflightLastRetained int64
|
|
LastMailboxPreflightMailboxDropped int64
|
|
LastMailboxPreflightDiagnosticState string
|
|
LastMailboxPreflightStaleCursor bool
|
|
LastMailboxPreflightMissingDropped int
|
|
LastMailboxPreflightRecommendedAction string
|
|
LastMailboxPreflightActionHints []string
|
|
LastMailboxPreflightActionReason string
|
|
LastMailboxPreflightActionContext map[string]any
|
|
LastMailboxPreflightOperatorSummary string
|
|
LastMailboxPreflightOperatorStatus string
|
|
LastMailboxPreflightOperatorSeverity string
|
|
LastMailboxPreflightOperatorFields map[string]any
|
|
LastChannelID string
|
|
LastResourceID string
|
|
LastRouteID string
|
|
LastReason string
|
|
}
|
|
|
|
type remoteWorkspaceAdapterMailboxConsumerState struct {
|
|
ID string
|
|
CreatedAt time.Time
|
|
ReadTotal int64
|
|
AckTotal int64
|
|
CheckpointSequence int64
|
|
AckSequence int64
|
|
LastReadAt time.Time
|
|
LastAckAt time.Time
|
|
}
|
|
|
|
type remoteWorkspaceAdapterProbeTerminalSession struct {
|
|
State string
|
|
ControlledAt time.Time
|
|
Reason string
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterSessionControlResult struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
AdapterRuntimeID string `json:"adapter_runtime_id"`
|
|
AdapterSessionID string `json:"adapter_session_id"`
|
|
Action string `json:"action"`
|
|
Accepted bool `json:"accepted"`
|
|
PreviousState string `json:"previous_state,omitempty"`
|
|
SessionState string `json:"session_state"`
|
|
Reason string `json:"reason,omitempty"`
|
|
ControlledAt string `json:"controlled_at"`
|
|
ActiveSessions int `json:"active_session_count"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterMailboxEvent struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
Sequence int64 `json:"sequence"`
|
|
AdapterRuntimeID string `json:"adapter_runtime_id"`
|
|
AdapterSessionID string `json:"adapter_session_id"`
|
|
Event string `json:"event"`
|
|
ChannelID string `json:"channel_id,omitempty"`
|
|
ResourceID string `json:"resource_id,omitempty"`
|
|
RouteID string `json:"route_id,omitempty"`
|
|
ChannelClass string `json:"channel_class,omitempty"`
|
|
FrameCount int `json:"frame_count,omitempty"`
|
|
AcceptedFrames int `json:"accepted_frames,omitempty"`
|
|
DroppedFrames int `json:"dropped_frames,omitempty"`
|
|
AckedFrames int `json:"acked_frames,omitempty"`
|
|
Backpressure bool `json:"backpressure,omitempty"`
|
|
Reason string `json:"reason,omitempty"`
|
|
CreatedAt string `json:"created_at"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterMailboxSnapshot struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
AdapterRuntimeID string `json:"adapter_runtime_id"`
|
|
AdapterSessionID string `json:"adapter_session_id"`
|
|
ObservedAt string `json:"observed_at"`
|
|
Drained bool `json:"drained"`
|
|
Empty bool `json:"empty"`
|
|
Waited bool `json:"waited,omitempty"`
|
|
WaitTimeout bool `json:"wait_timeout,omitempty"`
|
|
WaitMs int `json:"wait_ms,omitempty"`
|
|
MailboxCapacity int `json:"mailbox_capacity"`
|
|
MailboxDepth int `json:"mailbox_depth"`
|
|
DepthAfter int `json:"depth_after"`
|
|
EnqueuedTotal int64 `json:"enqueued_total"`
|
|
DrainedTotal int64 `json:"drained_total"`
|
|
DroppedTotal int64 `json:"dropped_total"`
|
|
AfterSequence int64 `json:"after_sequence,omitempty"`
|
|
ResumeFrom string `json:"resume_from,omitempty"`
|
|
ResumeSequence int64 `json:"resume_sequence"`
|
|
SkippedCount int `json:"skipped_count"`
|
|
ReturnedCount int `json:"returned_count"`
|
|
ConsumerID string `json:"consumer_id,omitempty"`
|
|
ConsumerReadTotal int64 `json:"consumer_read_total"`
|
|
ConsumerAckTotal int64 `json:"consumer_ack_total"`
|
|
ConsumerResetTotal int64 `json:"consumer_reset_total"`
|
|
ConsumerEvictedTotal int64 `json:"consumer_evicted_total"`
|
|
ConsumerCheckpointSequence int64 `json:"consumer_checkpoint_sequence"`
|
|
ConsumerAckSequence int64 `json:"consumer_ack_sequence"`
|
|
ConsumerLagCount int `json:"consumer_lag_count"`
|
|
ConsumerCount int `json:"consumer_count"`
|
|
ConsumerCapacity int `json:"consumer_capacity"`
|
|
ConsumerCreated bool `json:"consumer_created,omitempty"`
|
|
ConsumerReset bool `json:"consumer_reset,omitempty"`
|
|
ConsumerEvicted bool `json:"consumer_evicted,omitempty"`
|
|
ConsumerCreatedAt string `json:"consumer_created_at,omitempty"`
|
|
ConsumerLastReadAt string `json:"consumer_last_read_at,omitempty"`
|
|
ConsumerLastAckAt string `json:"consumer_last_ack_at,omitempty"`
|
|
Events []RemoteWorkspaceAdapterMailboxEvent `json:"events"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterMailboxConsumerSnapshot struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
AdapterRuntimeID string `json:"adapter_runtime_id"`
|
|
AdapterSessionID string `json:"adapter_session_id"`
|
|
ObservedAt string `json:"observed_at"`
|
|
ConsumerCapacity int `json:"consumer_capacity"`
|
|
ConsumerCount int `json:"consumer_count"`
|
|
ConsumerReadTotal int64 `json:"consumer_read_total"`
|
|
ConsumerAckTotal int64 `json:"consumer_ack_total"`
|
|
ConsumerResetTotal int64 `json:"consumer_reset_total"`
|
|
ConsumerEvictedTotal int64 `json:"consumer_evicted_total"`
|
|
MailboxDepth int `json:"mailbox_depth"`
|
|
MailboxEnqueued int64 `json:"mailbox_enqueued_total"`
|
|
MailboxDrained int64 `json:"mailbox_drained_total"`
|
|
MailboxDropped int64 `json:"mailbox_dropped_total"`
|
|
Consumers []RemoteWorkspaceAdapterMailboxConsumer `json:"consumers"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterMailboxConsumer struct {
|
|
ConsumerID string `json:"consumer_id"`
|
|
CreatedAt string `json:"created_at,omitempty"`
|
|
ReadTotal int64 `json:"consumer_read_total"`
|
|
AckTotal int64 `json:"consumer_ack_total"`
|
|
CheckpointSequence int64 `json:"consumer_checkpoint_sequence"`
|
|
AckSequence int64 `json:"consumer_ack_sequence"`
|
|
LagCount int `json:"consumer_lag_count"`
|
|
LastReadAt string `json:"last_read_at,omitempty"`
|
|
LastAckAt string `json:"last_ack_at,omitempty"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterMailboxPreflightSnapshot struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
AdapterRuntimeID string `json:"adapter_runtime_id"`
|
|
AdapterSessionID string `json:"adapter_session_id"`
|
|
ObservedAt string `json:"observed_at"`
|
|
ReadOnly bool `json:"read_only"`
|
|
ConsumerID string `json:"consumer_id"`
|
|
ResumeFrom string `json:"resume_from"`
|
|
ResumeSequence int64 `json:"resume_sequence"`
|
|
AfterSequence int64 `json:"after_sequence"`
|
|
Limit int `json:"limit"`
|
|
MailboxDepth int `json:"mailbox_depth"`
|
|
MailboxEnqueued int64 `json:"mailbox_enqueued_total"`
|
|
MailboxDropped int64 `json:"mailbox_dropped_total"`
|
|
MailboxReadTotal int64 `json:"mailbox_read_total"`
|
|
ConsumerReadTotal int64 `json:"consumer_read_total"`
|
|
ConsumerAckTotal int64 `json:"consumer_ack_total"`
|
|
ConsumerCheckpointSequence int64 `json:"consumer_checkpoint_sequence"`
|
|
ConsumerAckSequence int64 `json:"consumer_ack_sequence"`
|
|
ConsumerLagCount int `json:"consumer_lag_count"`
|
|
ExpectedAvailableCount int `json:"expected_available_count"`
|
|
ExpectedReturnedCount int `json:"expected_returned_count"`
|
|
ExpectedSkippedCount int `json:"expected_skipped_count"`
|
|
FirstExpectedSequence int64 `json:"first_expected_sequence,omitempty"`
|
|
LastExpectedSequence int64 `json:"last_expected_sequence,omitempty"`
|
|
FirstRetainedSequence int64 `json:"first_retained_sequence,omitempty"`
|
|
LastRetainedSequence int64 `json:"last_retained_sequence,omitempty"`
|
|
DiagnosticState string `json:"diagnostic_state"`
|
|
StaleCursor bool `json:"stale_cursor"`
|
|
MissingDroppedCount int `json:"missing_dropped_count"`
|
|
RecommendedAction string `json:"recommended_action"`
|
|
ActionHints []string `json:"action_hints"`
|
|
ActionReason string `json:"action_reason"`
|
|
ActionContext map[string]any `json:"action_context"`
|
|
OperatorSummary string `json:"operator_summary"`
|
|
OperatorStatus string `json:"operator_status"`
|
|
OperatorSeverity string `json:"operator_severity"`
|
|
OperatorSummaryFields map[string]any `json:"operator_summary_fields"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterSessionSnapshot struct {
|
|
SchemaVersion string `json:"schema_version"`
|
|
AdapterRuntimeID string `json:"adapter_runtime_id"`
|
|
ObservedAt string `json:"observed_at"`
|
|
ActiveSessionCount int `json:"active_session_count"`
|
|
TerminalSessionCount int `json:"terminal_session_count"`
|
|
Sessions []RemoteWorkspaceAdapterSessionView `json:"sessions"`
|
|
TerminalSessions []RemoteWorkspaceAdapterTerminalSession `json:"terminal_sessions,omitempty"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterSessionView struct {
|
|
AdapterSessionID string `json:"adapter_session_id"`
|
|
SessionState string `json:"session_state"`
|
|
CreatedAt string `json:"created_at"`
|
|
BoundAt string `json:"bound_at,omitempty"`
|
|
LastActivityAt string `json:"last_activity_at"`
|
|
LastBackpressureAt string `json:"last_backpressure_at,omitempty"`
|
|
LastBackpressureReason string `json:"last_backpressure_reason,omitempty"`
|
|
DeliveryCount int64 `json:"delivery_count"`
|
|
BackpressureCount int64 `json:"backpressure_count"`
|
|
AcceptedFrames int64 `json:"accepted_frames"`
|
|
DroppedFrames int64 `json:"dropped_frames"`
|
|
AckedFrames int64 `json:"acked_frames"`
|
|
MailboxCapacity int `json:"mailbox_capacity"`
|
|
MailboxDepth int `json:"mailbox_depth"`
|
|
MailboxEnqueued int64 `json:"mailbox_enqueued_total"`
|
|
MailboxDrained int64 `json:"mailbox_drained_total"`
|
|
MailboxDropped int64 `json:"mailbox_dropped_total"`
|
|
MailboxRead int64 `json:"mailbox_read_total"`
|
|
MailboxWait int64 `json:"mailbox_wait_total"`
|
|
MailboxWaitTimeout int64 `json:"mailbox_wait_timeout_total"`
|
|
MailboxEmptyRead int64 `json:"mailbox_empty_read_total"`
|
|
MailboxResumeRead int64 `json:"mailbox_resume_read_total"`
|
|
MailboxAfterSequenceRead int64 `json:"mailbox_after_sequence_read_total"`
|
|
MailboxReturnedTotal int64 `json:"mailbox_returned_total"`
|
|
MailboxSkippedTotal int64 `json:"mailbox_skipped_total"`
|
|
MailboxConsumerCount int `json:"mailbox_consumer_count"`
|
|
MailboxConsumerRead int64 `json:"mailbox_consumer_read_total"`
|
|
MailboxConsumerAck int64 `json:"mailbox_consumer_ack_total"`
|
|
MailboxConsumerReset int64 `json:"mailbox_consumer_reset_total"`
|
|
MailboxConsumerEvicted int64 `json:"mailbox_consumer_evicted_total"`
|
|
LastMailboxConsumerID string `json:"last_mailbox_consumer_id,omitempty"`
|
|
LastMailboxConsumerReadAt string `json:"last_mailbox_consumer_read_at,omitempty"`
|
|
LastMailboxConsumerAckAt string `json:"last_mailbox_consumer_ack_at,omitempty"`
|
|
LastMailboxConsumerCheckpoint int64 `json:"last_mailbox_consumer_checkpoint_sequence,omitempty"`
|
|
LastMailboxConsumerAck int64 `json:"last_mailbox_consumer_ack_sequence,omitempty"`
|
|
LastMailboxReadAt string `json:"last_mailbox_read_at,omitempty"`
|
|
LastMailboxWaitMs int `json:"last_mailbox_wait_ms,omitempty"`
|
|
LastMailboxWaited bool `json:"last_mailbox_waited,omitempty"`
|
|
LastMailboxWaitTimeout bool `json:"last_mailbox_wait_timeout,omitempty"`
|
|
LastMailboxEmpty bool `json:"last_mailbox_empty,omitempty"`
|
|
LastMailboxResumeFrom string `json:"last_mailbox_resume_from,omitempty"`
|
|
LastMailboxResumeSequence int64 `json:"last_mailbox_resume_sequence,omitempty"`
|
|
LastMailboxResumeConsumerID string `json:"last_mailbox_resume_consumer_id,omitempty"`
|
|
LastMailboxAfterSequence int64 `json:"last_mailbox_after_sequence,omitempty"`
|
|
LastMailboxSkippedCount int `json:"last_mailbox_skipped_count"`
|
|
LastMailboxReturnedCount int `json:"last_mailbox_returned_count"`
|
|
ChannelID string `json:"channel_id,omitempty"`
|
|
ResourceID string `json:"resource_id,omitempty"`
|
|
RouteID string `json:"route_id,omitempty"`
|
|
}
|
|
|
|
type RemoteWorkspaceAdapterTerminalSession struct {
|
|
AdapterSessionID string `json:"adapter_session_id"`
|
|
SessionState string `json:"session_state"`
|
|
ControlledAt string `json:"controlled_at"`
|
|
Reason string `json:"reason,omitempty"`
|
|
}
|
|
|
|
func NewRemoteWorkspaceFrameProbeSink() *RemoteWorkspaceFrameProbeSink {
|
|
return &RemoteWorkspaceFrameProbeSink{
|
|
queueCapacity: DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity,
|
|
sessionTTL: DefaultRemoteWorkspaceFrameProbeSinkSessionTTL,
|
|
sessions: map[string]*remoteWorkspaceAdapterProbeSession{},
|
|
terminalSessions: map[string]remoteWorkspaceAdapterProbeTerminalSession{},
|
|
}
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) AcceptRemoteWorkspaceFrameBatchProbe(_ context.Context, delivery RemoteWorkspaceFrameBatchDelivery) (RemoteWorkspaceFrameBatchDeliveryReceipt, error) {
|
|
if s == nil {
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, fmt.Errorf("remote workspace adapter probe sink unavailable")
|
|
}
|
|
if strings.TrimSpace(delivery.ServiceClass) != FabricServiceClassRemoteWorkspace {
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, fmt.Errorf("remote workspace adapter sink service class mismatch")
|
|
}
|
|
if strings.TrimSpace(delivery.ChannelClass) == "" {
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, fmt.Errorf("remote workspace adapter sink channel class required")
|
|
}
|
|
if strings.TrimSpace(delivery.AdapterSessionID) == "" {
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, fmt.Errorf("remote workspace adapter sink session id required")
|
|
}
|
|
if len(delivery.Frames) == 0 {
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, fmt.Errorf("remote workspace adapter sink requires frames")
|
|
}
|
|
queueCapacity := s.queueCapacity
|
|
if queueCapacity <= 0 {
|
|
queueCapacity = DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity
|
|
}
|
|
acceptedFrames := 0
|
|
droppedFrames := 0
|
|
for _, frame := range delivery.Frames {
|
|
if acceptedFrames < queueCapacity {
|
|
acceptedFrames++
|
|
continue
|
|
}
|
|
if frame.Droppable {
|
|
droppedFrames++
|
|
continue
|
|
}
|
|
err := fmt.Errorf("remote workspace adapter sink backpressure")
|
|
s.recordBackpressure(delivery, queueCapacity, err.Error())
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, err
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
now := time.Now().UTC()
|
|
s.expireIdleSessionsLocked(now)
|
|
session := s.ensureSessionLocked(delivery, now)
|
|
session.State = "probe_bound"
|
|
if session.BoundAt.IsZero() {
|
|
session.BoundAt = now
|
|
s.sessionBoundTotal++
|
|
}
|
|
session.LastActivityAt = now
|
|
session.DeliveryCount++
|
|
session.AcceptedFrames += int64(acceptedFrames)
|
|
session.DroppedFrames += int64(droppedFrames)
|
|
session.AckedFrames += int64(acceptedFrames)
|
|
session.LastChannelID = strings.TrimSpace(delivery.ChannelID)
|
|
session.LastResourceID = strings.TrimSpace(delivery.ResourceID)
|
|
session.LastRouteID = strings.TrimSpace(delivery.PreferredRouteID)
|
|
s.enqueueAdapterMailboxEventLocked(session, RemoteWorkspaceAdapterMailboxEvent{
|
|
Event: "frame_batch_probe_delivered",
|
|
ChannelID: strings.TrimSpace(delivery.ChannelID),
|
|
ResourceID: strings.TrimSpace(delivery.ResourceID),
|
|
RouteID: strings.TrimSpace(delivery.PreferredRouteID),
|
|
ChannelClass: strings.TrimSpace(delivery.ChannelClass),
|
|
FrameCount: len(delivery.Frames),
|
|
AcceptedFrames: acceptedFrames,
|
|
DroppedFrames: droppedFrames,
|
|
AckedFrames: acceptedFrames,
|
|
}, now)
|
|
s.sequence++
|
|
s.acceptedFramesTotal += int64(acceptedFrames)
|
|
s.droppedFramesTotal += int64(droppedFrames)
|
|
s.ackedFramesTotal += int64(acceptedFrames)
|
|
receipt := RemoteWorkspaceFrameBatchDeliveryReceipt{
|
|
SchemaVersion: "rap.remote_workspace_frame_batch_delivery.v1",
|
|
Sink: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
Accepted: true,
|
|
ProbeOnly: true,
|
|
ClusterID: strings.TrimSpace(delivery.ClusterID),
|
|
ChannelID: strings.TrimSpace(delivery.ChannelID),
|
|
ResourceID: strings.TrimSpace(delivery.ResourceID),
|
|
ServiceClass: FabricServiceClassRemoteWorkspace,
|
|
ChannelClass: strings.TrimSpace(delivery.ChannelClass),
|
|
AdapterContractID: strings.TrimSpace(delivery.AdapterContractID),
|
|
AdapterSessionID: strings.TrimSpace(delivery.AdapterSessionID),
|
|
AdapterRuntimeID: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
SessionState: "probe_bound",
|
|
SessionCreatedAt: session.CreatedAt.Format(time.RFC3339Nano),
|
|
SessionBoundAt: session.BoundAt.Format(time.RFC3339Nano),
|
|
SessionLastActive: session.LastActivityAt.Format(time.RFC3339Nano),
|
|
SessionLifecycle: session.State,
|
|
SessionDeliveries: session.DeliveryCount,
|
|
SessionPressure: session.BackpressureCount,
|
|
MailboxDepth: len(session.Mailbox),
|
|
MailboxEnqueued: session.MailboxEnqueued,
|
|
FrameCount: len(delivery.Frames),
|
|
QueueCapacity: queueCapacity,
|
|
QueueDepth: 0,
|
|
AcceptedFrames: acceptedFrames,
|
|
DroppedFrames: droppedFrames,
|
|
AckedFrames: acceptedFrames,
|
|
Backpressure: false,
|
|
DropPolicy: "drop_droppable_overflow_ack_accepted",
|
|
DeliverySequence: uint64(s.sequence),
|
|
DeliveredAt: now.Format(time.RFC3339Nano),
|
|
}
|
|
s.last = receipt
|
|
return receipt, nil
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) recordBackpressure(delivery RemoteWorkspaceFrameBatchDelivery, queueCapacity int, reason string) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
now := time.Now().UTC()
|
|
s.expireIdleSessionsLocked(now)
|
|
session := s.ensureSessionLocked(delivery, now)
|
|
session.State = "backpressure"
|
|
session.LastActivityAt = now
|
|
session.LastBackpressureAt = now
|
|
session.BackpressureCount++
|
|
session.LastChannelID = strings.TrimSpace(delivery.ChannelID)
|
|
session.LastResourceID = strings.TrimSpace(delivery.ResourceID)
|
|
session.LastRouteID = strings.TrimSpace(delivery.PreferredRouteID)
|
|
session.LastReason = strings.TrimSpace(reason)
|
|
s.enqueueAdapterMailboxEventLocked(session, RemoteWorkspaceAdapterMailboxEvent{
|
|
Event: "backpressure",
|
|
ChannelID: strings.TrimSpace(delivery.ChannelID),
|
|
ResourceID: strings.TrimSpace(delivery.ResourceID),
|
|
RouteID: strings.TrimSpace(delivery.PreferredRouteID),
|
|
ChannelClass: strings.TrimSpace(delivery.ChannelClass),
|
|
FrameCount: len(delivery.Frames),
|
|
Backpressure: true,
|
|
Reason: strings.TrimSpace(reason),
|
|
}, now)
|
|
s.backpressureCount++
|
|
s.sessionBackpressureTotal++
|
|
s.lastBackpressureAt = now.Format(time.RFC3339Nano)
|
|
s.lastBackpressureReason = strings.TrimSpace(reason)
|
|
s.lastRejectedFrameCount = len(delivery.Frames)
|
|
s.lastRejectedAdapterSessionID = strings.TrimSpace(delivery.AdapterSessionID)
|
|
s.lastRejectedChannelClass = strings.TrimSpace(delivery.ChannelClass)
|
|
s.lastRejectedAdapterContractID = strings.TrimSpace(delivery.AdapterContractID)
|
|
s.lastRejectedQueueCapacity = queueCapacity
|
|
s.lastRejectedQueueDepth = queueCapacity
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) ControlAdapterSession(action string, adapterSessionID string, reason string, now time.Time) (RemoteWorkspaceAdapterSessionControlResult, error) {
|
|
if s == nil {
|
|
return RemoteWorkspaceAdapterSessionControlResult{}, fmt.Errorf("remote workspace adapter probe sink unavailable")
|
|
}
|
|
action = strings.TrimSpace(strings.ToLower(action))
|
|
adapterSessionID = strings.TrimSpace(adapterSessionID)
|
|
reason = strings.TrimSpace(reason)
|
|
if adapterSessionID == "" {
|
|
return RemoteWorkspaceAdapterSessionControlResult{}, fmt.Errorf("remote workspace adapter session id required")
|
|
}
|
|
if !isValidRemoteWorkspaceAdapterSessionID(adapterSessionID) {
|
|
return RemoteWorkspaceAdapterSessionControlResult{}, fmt.Errorf("invalid remote workspace adapter session id")
|
|
}
|
|
if len(reason) > 512 {
|
|
return RemoteWorkspaceAdapterSessionControlResult{}, fmt.Errorf("remote workspace adapter session control reason too long")
|
|
}
|
|
switch action {
|
|
case "close", "expire", "reset":
|
|
default:
|
|
return RemoteWorkspaceAdapterSessionControlResult{}, fmt.Errorf("unsupported remote workspace adapter session control action")
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.expireIdleSessionsLocked(now)
|
|
if s.terminalSessions == nil {
|
|
s.terminalSessions = map[string]remoteWorkspaceAdapterProbeTerminalSession{}
|
|
}
|
|
session := s.sessions[adapterSessionID]
|
|
previousState := ""
|
|
if session != nil {
|
|
previousState = session.State
|
|
}
|
|
nextState := actionToAdapterSessionState(action)
|
|
if session == nil {
|
|
if terminal, ok := s.terminalSessions[adapterSessionID]; ok {
|
|
result := RemoteWorkspaceAdapterSessionControlResult{
|
|
SchemaVersion: "rap.remote_workspace_adapter_session_control.v1",
|
|
AdapterRuntimeID: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
AdapterSessionID: adapterSessionID,
|
|
Action: action,
|
|
Accepted: true,
|
|
PreviousState: terminal.State,
|
|
SessionState: nextState,
|
|
Reason: reason,
|
|
ControlledAt: now.Format(time.RFC3339Nano),
|
|
ActiveSessions: len(s.sessions),
|
|
}
|
|
s.sessionControlTotal++
|
|
s.lastControl = result
|
|
return result, nil
|
|
}
|
|
return RemoteWorkspaceAdapterSessionControlResult{}, fmt.Errorf("remote workspace adapter session not found")
|
|
} else {
|
|
session.State = nextState
|
|
session.LastActivityAt = now
|
|
session.ClosedAt = now
|
|
session.LastReason = reason
|
|
}
|
|
s.sessionControlTotal++
|
|
switch action {
|
|
case "expire":
|
|
s.sessionExpiredTotal++
|
|
s.sessionClosedTotal++
|
|
case "close":
|
|
s.sessionClosedTotal++
|
|
case "reset":
|
|
s.sessionResetTotal++
|
|
s.sessionClosedTotal++
|
|
}
|
|
delete(s.sessions, adapterSessionID)
|
|
s.terminalSessions[adapterSessionID] = remoteWorkspaceAdapterProbeTerminalSession{
|
|
State: nextState,
|
|
ControlledAt: now,
|
|
Reason: reason,
|
|
}
|
|
result := RemoteWorkspaceAdapterSessionControlResult{
|
|
SchemaVersion: "rap.remote_workspace_adapter_session_control.v1",
|
|
AdapterRuntimeID: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
AdapterSessionID: adapterSessionID,
|
|
Action: action,
|
|
Accepted: true,
|
|
PreviousState: previousState,
|
|
SessionState: nextState,
|
|
Reason: reason,
|
|
ControlledAt: now.Format(time.RFC3339Nano),
|
|
ActiveSessions: len(s.sessions),
|
|
}
|
|
s.lastControl = result
|
|
return result, nil
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) enqueueAdapterMailboxEventLocked(session *remoteWorkspaceAdapterProbeSession, event RemoteWorkspaceAdapterMailboxEvent, now time.Time) {
|
|
if session == nil {
|
|
return
|
|
}
|
|
s.mailboxEventSequence++
|
|
event.SchemaVersion = "rap.remote_workspace_adapter_mailbox_event.v1"
|
|
event.Sequence = s.mailboxEventSequence
|
|
event.AdapterRuntimeID = RemoteWorkspaceFrameProbeSinkRuntimeID
|
|
event.AdapterSessionID = session.ID
|
|
event.CreatedAt = now.UTC().Format(time.RFC3339Nano)
|
|
if len(session.Mailbox) >= DefaultRemoteWorkspaceAdapterMailboxCapacity {
|
|
session.Mailbox = append([]RemoteWorkspaceAdapterMailboxEvent(nil), session.Mailbox[1:]...)
|
|
session.MailboxDropped++
|
|
s.mailboxDroppedTotal++
|
|
}
|
|
session.Mailbox = append(session.Mailbox, event)
|
|
session.MailboxEnqueued++
|
|
s.mailboxEnqueuedTotal++
|
|
}
|
|
|
|
func isValidRemoteWorkspaceAdapterSessionID(adapterSessionID string) bool {
|
|
const prefix = "rap-rw-adapter-session-"
|
|
if !strings.HasPrefix(adapterSessionID, prefix) || len(adapterSessionID) != len(prefix)+24 {
|
|
return false
|
|
}
|
|
for _, ch := range adapterSessionID[len(prefix):] {
|
|
if (ch < '0' || ch > '9') && (ch < 'a' || ch > 'f') {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID string) bool {
|
|
consumerID = strings.TrimSpace(consumerID)
|
|
if consumerID == "" || len(consumerID) > 128 {
|
|
return false
|
|
}
|
|
for _, ch := range consumerID {
|
|
switch {
|
|
case ch >= 'a' && ch <= 'z':
|
|
case ch >= 'A' && ch <= 'Z':
|
|
case ch >= '0' && ch <= '9':
|
|
case ch == '-', ch == '_', ch == '.', ch == ':':
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func actionToAdapterSessionState(action string) string {
|
|
switch action {
|
|
case "expire":
|
|
return "expired"
|
|
case "reset":
|
|
return "reset"
|
|
default:
|
|
return "closed"
|
|
}
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) ensureSessionLocked(delivery RemoteWorkspaceFrameBatchDelivery, now time.Time) *remoteWorkspaceAdapterProbeSession {
|
|
if s.sessions == nil {
|
|
s.sessions = map[string]*remoteWorkspaceAdapterProbeSession{}
|
|
}
|
|
if s.terminalSessions == nil {
|
|
s.terminalSessions = map[string]remoteWorkspaceAdapterProbeTerminalSession{}
|
|
}
|
|
sessionID := strings.TrimSpace(delivery.AdapterSessionID)
|
|
session := s.sessions[sessionID]
|
|
if session == nil {
|
|
session = &remoteWorkspaceAdapterProbeSession{
|
|
ID: sessionID,
|
|
State: "created",
|
|
CreatedAt: now,
|
|
LastActivityAt: now,
|
|
MailboxConsumers: map[string]*remoteWorkspaceAdapterMailboxConsumerState{},
|
|
MailboxPreflightOperatorStatusCounts: map[string]int64{},
|
|
MailboxPreflightOperatorSeverityCounts: map[string]int64{},
|
|
}
|
|
s.sessions[sessionID] = session
|
|
s.sessionCreatedTotal++
|
|
}
|
|
return session
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) expireIdleSessionsLocked(now time.Time) {
|
|
ttl := s.sessionTTL
|
|
if ttl <= 0 {
|
|
ttl = DefaultRemoteWorkspaceFrameProbeSinkSessionTTL
|
|
}
|
|
for id, session := range s.sessions {
|
|
if session == nil || session.State == "closed" || session.State == "expired" {
|
|
continue
|
|
}
|
|
if !session.LastActivityAt.IsZero() && now.Sub(session.LastActivityAt) > ttl {
|
|
session.State = "expired"
|
|
session.ClosedAt = now
|
|
s.sessionExpiredTotal++
|
|
s.sessionClosedTotal++
|
|
delete(s.sessions, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) LastReceipt() (RemoteWorkspaceFrameBatchDeliveryReceipt, bool) {
|
|
if s == nil {
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, false
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.sequence == 0 {
|
|
return RemoteWorkspaceFrameBatchDeliveryReceipt{}, false
|
|
}
|
|
return s.last, true
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) SnapshotAdapterSessions(includeTerminal bool, limit int, now time.Time) RemoteWorkspaceAdapterSessionSnapshot {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
if limit > 200 {
|
|
limit = 200
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
snapshot := RemoteWorkspaceAdapterSessionSnapshot{
|
|
SchemaVersion: "rap.remote_workspace_adapter_session_snapshot.v1",
|
|
AdapterRuntimeID: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
ObservedAt: now.Format(time.RFC3339Nano),
|
|
}
|
|
if s == nil {
|
|
return snapshot
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.expireIdleSessionsLocked(now)
|
|
activeIDs := make([]string, 0, len(s.sessions))
|
|
for id := range s.sessions {
|
|
activeIDs = append(activeIDs, id)
|
|
}
|
|
sort.Strings(activeIDs)
|
|
for _, id := range activeIDs {
|
|
if len(snapshot.Sessions) >= limit {
|
|
break
|
|
}
|
|
session := s.sessions[id]
|
|
if session == nil {
|
|
continue
|
|
}
|
|
snapshot.Sessions = append(snapshot.Sessions, remoteWorkspaceAdapterSessionView(*session))
|
|
}
|
|
if includeTerminal {
|
|
terminalIDs := make([]string, 0, len(s.terminalSessions))
|
|
for id := range s.terminalSessions {
|
|
terminalIDs = append(terminalIDs, id)
|
|
}
|
|
sort.Strings(terminalIDs)
|
|
for _, id := range terminalIDs {
|
|
if len(snapshot.TerminalSessions) >= limit {
|
|
break
|
|
}
|
|
terminal := s.terminalSessions[id]
|
|
snapshot.TerminalSessions = append(snapshot.TerminalSessions, RemoteWorkspaceAdapterTerminalSession{
|
|
AdapterSessionID: id,
|
|
SessionState: terminal.State,
|
|
ControlledAt: terminal.ControlledAt.Format(time.RFC3339Nano),
|
|
Reason: terminal.Reason,
|
|
})
|
|
}
|
|
}
|
|
snapshot.ActiveSessionCount = len(s.sessions)
|
|
snapshot.TerminalSessionCount = len(s.terminalSessions)
|
|
return snapshot
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) ReadAdapterSessionMailbox(adapterSessionID string, drain bool, limit int, afterSequence int64, now time.Time) (RemoteWorkspaceAdapterMailboxSnapshot, error) {
|
|
adapterSessionID = strings.TrimSpace(adapterSessionID)
|
|
if !isValidRemoteWorkspaceAdapterSessionID(adapterSessionID) {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("invalid remote workspace adapter session id")
|
|
}
|
|
if afterSequence < 0 {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("invalid remote workspace adapter session mailbox after sequence")
|
|
}
|
|
if drain && afterSequence > 0 {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("remote workspace adapter session mailbox after sequence cannot drain")
|
|
}
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
if limit > 200 {
|
|
limit = 200
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
if s == nil {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("remote workspace adapter probe sink unavailable")
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.expireIdleSessionsLocked(now)
|
|
session := s.sessions[adapterSessionID]
|
|
if session == nil {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("remote workspace adapter session not found")
|
|
}
|
|
beforeDepth := len(session.Mailbox)
|
|
startIndex := 0
|
|
if afterSequence > 0 {
|
|
for startIndex < len(session.Mailbox) && session.Mailbox[startIndex].Sequence <= afterSequence {
|
|
startIndex++
|
|
}
|
|
}
|
|
eventCount := len(session.Mailbox) - startIndex
|
|
if eventCount > limit {
|
|
eventCount = limit
|
|
}
|
|
events := append([]RemoteWorkspaceAdapterMailboxEvent(nil), session.Mailbox[startIndex:startIndex+eventCount]...)
|
|
if drain && eventCount > 0 {
|
|
session.Mailbox = append([]RemoteWorkspaceAdapterMailboxEvent(nil), session.Mailbox[eventCount:]...)
|
|
session.MailboxDrained += int64(eventCount)
|
|
s.mailboxDrainedTotal += int64(eventCount)
|
|
session.LastActivityAt = now
|
|
}
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{
|
|
SchemaVersion: "rap.remote_workspace_adapter_mailbox_snapshot.v1",
|
|
AdapterRuntimeID: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
AdapterSessionID: adapterSessionID,
|
|
ObservedAt: now.Format(time.RFC3339Nano),
|
|
Drained: drain,
|
|
Empty: eventCount == 0,
|
|
MailboxCapacity: DefaultRemoteWorkspaceAdapterMailboxCapacity,
|
|
MailboxDepth: beforeDepth,
|
|
DepthAfter: len(session.Mailbox),
|
|
EnqueuedTotal: session.MailboxEnqueued,
|
|
DrainedTotal: session.MailboxDrained,
|
|
DroppedTotal: session.MailboxDropped,
|
|
AfterSequence: afterSequence,
|
|
SkippedCount: startIndex,
|
|
ReturnedCount: len(events),
|
|
Events: events,
|
|
}, nil
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) RecordAdapterSessionMailboxRead(snapshot RemoteWorkspaceAdapterMailboxSnapshot, now time.Time) {
|
|
if s == nil {
|
|
return
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
adapterSessionID := strings.TrimSpace(snapshot.AdapterSessionID)
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.mailboxReadTotal++
|
|
if snapshot.Waited {
|
|
s.mailboxWaitTotal++
|
|
}
|
|
if snapshot.WaitTimeout {
|
|
s.mailboxWaitTimeoutTotal++
|
|
}
|
|
if snapshot.Empty {
|
|
s.mailboxEmptyReadTotal++
|
|
}
|
|
if snapshot.ResumeFrom != "" {
|
|
s.mailboxResumeReadTotal++
|
|
}
|
|
if snapshot.AfterSequence > 0 {
|
|
s.mailboxAfterSequenceReadTotal++
|
|
}
|
|
s.mailboxReturnedTotal += int64(snapshot.ReturnedCount)
|
|
s.mailboxSkippedTotal += int64(snapshot.SkippedCount)
|
|
s.lastMailboxReadAt = now.Format(time.RFC3339Nano)
|
|
s.lastMailboxAdapterSessionID = adapterSessionID
|
|
s.lastMailboxWaitMs = snapshot.WaitMs
|
|
s.lastMailboxWaited = snapshot.Waited
|
|
s.lastMailboxWaitTimeout = snapshot.WaitTimeout
|
|
s.lastMailboxEmpty = snapshot.Empty
|
|
s.lastMailboxAfterSequence = snapshot.AfterSequence
|
|
s.lastMailboxSkippedCount = snapshot.SkippedCount
|
|
s.lastMailboxReturnedCount = snapshot.ReturnedCount
|
|
if snapshot.ResumeFrom != "" {
|
|
s.lastMailboxResumeFrom = snapshot.ResumeFrom
|
|
s.lastMailboxResumeSequence = snapshot.ResumeSequence
|
|
s.lastMailboxResumeConsumerID = snapshot.ConsumerID
|
|
}
|
|
if session := s.sessions[adapterSessionID]; session != nil {
|
|
session.MailboxRead++
|
|
if snapshot.Waited {
|
|
session.MailboxWait++
|
|
}
|
|
if snapshot.WaitTimeout {
|
|
session.MailboxWaitTimeout++
|
|
}
|
|
if snapshot.Empty {
|
|
session.MailboxEmptyRead++
|
|
}
|
|
if snapshot.ResumeFrom != "" {
|
|
session.MailboxResumeRead++
|
|
}
|
|
if snapshot.AfterSequence > 0 {
|
|
session.MailboxAfterSequenceRead++
|
|
}
|
|
session.MailboxReturnedTotal += int64(snapshot.ReturnedCount)
|
|
session.MailboxSkippedTotal += int64(snapshot.SkippedCount)
|
|
session.LastMailboxReadAt = now
|
|
session.LastMailboxWaitMs = snapshot.WaitMs
|
|
session.LastMailboxWaited = snapshot.Waited
|
|
session.LastMailboxTimeout = snapshot.WaitTimeout
|
|
session.LastMailboxEmpty = snapshot.Empty
|
|
session.LastMailboxAfterSequence = snapshot.AfterSequence
|
|
session.LastMailboxSkippedCount = snapshot.SkippedCount
|
|
session.LastMailboxReturnedCount = snapshot.ReturnedCount
|
|
if snapshot.ResumeFrom != "" {
|
|
session.LastMailboxResumeFrom = snapshot.ResumeFrom
|
|
session.LastMailboxResumeSequence = snapshot.ResumeSequence
|
|
session.LastMailboxResumeConsumerID = snapshot.ConsumerID
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) RecordAdapterSessionMailboxConsumerRead(snapshot RemoteWorkspaceAdapterMailboxSnapshot, consumerID string, ackSequence int64, reset bool, now time.Time) (RemoteWorkspaceAdapterMailboxSnapshot, error) {
|
|
if s == nil {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("remote workspace adapter probe sink unavailable")
|
|
}
|
|
consumerID = strings.TrimSpace(consumerID)
|
|
if !isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID) {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("invalid remote workspace adapter mailbox consumer")
|
|
}
|
|
if ackSequence < 0 {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("invalid remote workspace adapter mailbox ack sequence")
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
adapterSessionID := strings.TrimSpace(snapshot.AdapterSessionID)
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
session := s.sessions[adapterSessionID]
|
|
if session == nil {
|
|
return RemoteWorkspaceAdapterMailboxSnapshot{}, fmt.Errorf("remote workspace adapter session not found")
|
|
}
|
|
if session.MailboxConsumers == nil {
|
|
session.MailboxConsumers = map[string]*remoteWorkspaceAdapterMailboxConsumerState{}
|
|
}
|
|
if reset {
|
|
if _, ok := session.MailboxConsumers[consumerID]; ok {
|
|
delete(session.MailboxConsumers, consumerID)
|
|
}
|
|
session.MailboxConsumerResetTotal++
|
|
s.mailboxConsumerResetTotal++
|
|
}
|
|
consumer := session.MailboxConsumers[consumerID]
|
|
created := false
|
|
evicted := false
|
|
if consumer == nil {
|
|
evicted = s.evictOldestMailboxConsumerLocked(session)
|
|
if evicted {
|
|
session.MailboxConsumerEvictedTotal++
|
|
s.mailboxConsumerEvictedTotal++
|
|
}
|
|
consumer = &remoteWorkspaceAdapterMailboxConsumerState{
|
|
ID: consumerID,
|
|
CreatedAt: now,
|
|
}
|
|
session.MailboxConsumers[consumerID] = consumer
|
|
created = true
|
|
}
|
|
maxSequence := consumer.CheckpointSequence
|
|
for _, event := range snapshot.Events {
|
|
if event.Sequence > maxSequence {
|
|
maxSequence = event.Sequence
|
|
}
|
|
}
|
|
consumer.ReadTotal++
|
|
consumer.LastReadAt = now
|
|
consumer.CheckpointSequence = maxSequence
|
|
if ackSequence > consumer.AckSequence {
|
|
consumer.AckSequence = ackSequence
|
|
}
|
|
if ackSequence > 0 {
|
|
consumer.AckTotal++
|
|
consumer.LastAckAt = now
|
|
session.MailboxConsumerAckTotal++
|
|
s.mailboxConsumerAckTotal++
|
|
s.lastMailboxConsumerAckAt = now.Format(time.RFC3339Nano)
|
|
}
|
|
session.MailboxConsumerReadTotal++
|
|
session.LastMailboxConsumerID = consumerID
|
|
session.LastMailboxConsumerReadAt = now
|
|
session.LastMailboxConsumerAckAt = consumer.LastAckAt
|
|
session.LastMailboxConsumerCheckpoint = consumer.CheckpointSequence
|
|
session.LastMailboxConsumerAck = consumer.AckSequence
|
|
s.mailboxConsumerReadTotal++
|
|
s.lastMailboxConsumerID = consumerID
|
|
s.lastMailboxConsumerAdapterSessionID = adapterSessionID
|
|
s.lastMailboxConsumerReadAt = now.Format(time.RFC3339Nano)
|
|
s.lastMailboxConsumerCheckpoint = consumer.CheckpointSequence
|
|
s.lastMailboxConsumerAck = consumer.AckSequence
|
|
|
|
snapshot.ConsumerID = consumerID
|
|
snapshot.ConsumerReadTotal = consumer.ReadTotal
|
|
snapshot.ConsumerAckTotal = consumer.AckTotal
|
|
snapshot.ConsumerResetTotal = session.MailboxConsumerResetTotal
|
|
snapshot.ConsumerEvictedTotal = session.MailboxConsumerEvictedTotal
|
|
snapshot.ConsumerCheckpointSequence = consumer.CheckpointSequence
|
|
snapshot.ConsumerAckSequence = consumer.AckSequence
|
|
snapshot.ConsumerLagCount = countMailboxEventsAfterSequence(session.Mailbox, consumer.AckSequence)
|
|
snapshot.ConsumerCount = len(session.MailboxConsumers)
|
|
snapshot.ConsumerCapacity = DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity
|
|
snapshot.ConsumerCreated = created
|
|
snapshot.ConsumerReset = reset
|
|
snapshot.ConsumerEvicted = evicted
|
|
snapshot.ConsumerCreatedAt = consumer.CreatedAt.Format(time.RFC3339Nano)
|
|
snapshot.ConsumerLastReadAt = consumer.LastReadAt.Format(time.RFC3339Nano)
|
|
if !consumer.LastAckAt.IsZero() {
|
|
snapshot.ConsumerLastAckAt = consumer.LastAckAt.Format(time.RFC3339Nano)
|
|
}
|
|
return snapshot, nil
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) SnapshotAdapterSessionMailboxConsumers(adapterSessionID string, limit int, now time.Time) (RemoteWorkspaceAdapterMailboxConsumerSnapshot, error) {
|
|
adapterSessionID = strings.TrimSpace(adapterSessionID)
|
|
if !isValidRemoteWorkspaceAdapterSessionID(adapterSessionID) {
|
|
return RemoteWorkspaceAdapterMailboxConsumerSnapshot{}, fmt.Errorf("invalid remote workspace adapter session id")
|
|
}
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
if limit > DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity {
|
|
limit = DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
if s == nil {
|
|
return RemoteWorkspaceAdapterMailboxConsumerSnapshot{}, fmt.Errorf("remote workspace adapter probe sink unavailable")
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.expireIdleSessionsLocked(now)
|
|
session := s.sessions[adapterSessionID]
|
|
if session == nil {
|
|
return RemoteWorkspaceAdapterMailboxConsumerSnapshot{}, fmt.Errorf("remote workspace adapter session not found")
|
|
}
|
|
snapshot := RemoteWorkspaceAdapterMailboxConsumerSnapshot{
|
|
SchemaVersion: "rap.remote_workspace_adapter_mailbox_consumer_snapshot.v1",
|
|
AdapterRuntimeID: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
AdapterSessionID: adapterSessionID,
|
|
ObservedAt: now.Format(time.RFC3339Nano),
|
|
ConsumerCapacity: DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity,
|
|
ConsumerCount: len(session.MailboxConsumers),
|
|
ConsumerReadTotal: session.MailboxConsumerReadTotal,
|
|
ConsumerAckTotal: session.MailboxConsumerAckTotal,
|
|
ConsumerResetTotal: session.MailboxConsumerResetTotal,
|
|
ConsumerEvictedTotal: session.MailboxConsumerEvictedTotal,
|
|
MailboxDepth: len(session.Mailbox),
|
|
MailboxEnqueued: session.MailboxEnqueued,
|
|
MailboxDrained: session.MailboxDrained,
|
|
MailboxDropped: session.MailboxDropped,
|
|
}
|
|
consumerIDs := make([]string, 0, len(session.MailboxConsumers))
|
|
for id := range session.MailboxConsumers {
|
|
consumerIDs = append(consumerIDs, id)
|
|
}
|
|
sort.Strings(consumerIDs)
|
|
for _, id := range consumerIDs {
|
|
if len(snapshot.Consumers) >= limit {
|
|
break
|
|
}
|
|
consumer := session.MailboxConsumers[id]
|
|
if consumer == nil {
|
|
continue
|
|
}
|
|
view := RemoteWorkspaceAdapterMailboxConsumer{
|
|
ConsumerID: consumer.ID,
|
|
ReadTotal: consumer.ReadTotal,
|
|
AckTotal: consumer.AckTotal,
|
|
CheckpointSequence: consumer.CheckpointSequence,
|
|
AckSequence: consumer.AckSequence,
|
|
LagCount: countMailboxEventsAfterSequence(session.Mailbox, consumer.AckSequence),
|
|
}
|
|
if !consumer.CreatedAt.IsZero() {
|
|
view.CreatedAt = consumer.CreatedAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !consumer.LastReadAt.IsZero() {
|
|
view.LastReadAt = consumer.LastReadAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !consumer.LastAckAt.IsZero() {
|
|
view.LastAckAt = consumer.LastAckAt.Format(time.RFC3339Nano)
|
|
}
|
|
snapshot.Consumers = append(snapshot.Consumers, view)
|
|
}
|
|
return snapshot, nil
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) ResolveAdapterSessionMailboxConsumerResume(adapterSessionID string, consumerID string, resumeFrom string, now time.Time) (int64, error) {
|
|
adapterSessionID = strings.TrimSpace(adapterSessionID)
|
|
consumerID = strings.TrimSpace(consumerID)
|
|
resumeFrom = strings.TrimSpace(strings.ToLower(resumeFrom))
|
|
if !isValidRemoteWorkspaceAdapterSessionID(adapterSessionID) {
|
|
return 0, fmt.Errorf("invalid remote workspace adapter session id")
|
|
}
|
|
if !isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID) {
|
|
return 0, fmt.Errorf("invalid remote workspace adapter mailbox consumer")
|
|
}
|
|
if resumeFrom != "ack" && resumeFrom != "checkpoint" {
|
|
return 0, fmt.Errorf("invalid remote workspace adapter mailbox resume cursor")
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
if s == nil {
|
|
return 0, fmt.Errorf("remote workspace adapter probe sink unavailable")
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.expireIdleSessionsLocked(now)
|
|
session := s.sessions[adapterSessionID]
|
|
if session == nil {
|
|
return 0, fmt.Errorf("remote workspace adapter session not found")
|
|
}
|
|
consumer := session.MailboxConsumers[consumerID]
|
|
if consumer == nil {
|
|
return 0, fmt.Errorf("remote workspace adapter mailbox consumer not found")
|
|
}
|
|
if resumeFrom == "checkpoint" {
|
|
return consumer.CheckpointSequence, nil
|
|
}
|
|
return consumer.AckSequence, nil
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) PreflightAdapterSessionMailboxConsumerResume(adapterSessionID string, consumerID string, resumeFrom string, limit int, now time.Time) (RemoteWorkspaceAdapterMailboxPreflightSnapshot, error) {
|
|
adapterSessionID = strings.TrimSpace(adapterSessionID)
|
|
consumerID = strings.TrimSpace(consumerID)
|
|
resumeFrom = strings.TrimSpace(strings.ToLower(resumeFrom))
|
|
if !isValidRemoteWorkspaceAdapterSessionID(adapterSessionID) {
|
|
return RemoteWorkspaceAdapterMailboxPreflightSnapshot{}, fmt.Errorf("invalid remote workspace adapter session id")
|
|
}
|
|
if !isValidRemoteWorkspaceAdapterMailboxConsumerID(consumerID) {
|
|
return RemoteWorkspaceAdapterMailboxPreflightSnapshot{}, fmt.Errorf("invalid remote workspace adapter mailbox consumer")
|
|
}
|
|
if resumeFrom == "" {
|
|
resumeFrom = "checkpoint"
|
|
}
|
|
if resumeFrom != "ack" && resumeFrom != "checkpoint" {
|
|
return RemoteWorkspaceAdapterMailboxPreflightSnapshot{}, fmt.Errorf("invalid remote workspace adapter mailbox resume cursor")
|
|
}
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
if limit > DefaultRemoteWorkspaceAdapterMailboxCapacity {
|
|
limit = DefaultRemoteWorkspaceAdapterMailboxCapacity
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
} else {
|
|
now = now.UTC()
|
|
}
|
|
if s == nil {
|
|
return RemoteWorkspaceAdapterMailboxPreflightSnapshot{}, fmt.Errorf("remote workspace adapter probe sink unavailable")
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.expireIdleSessionsLocked(now)
|
|
session := s.sessions[adapterSessionID]
|
|
if session == nil {
|
|
return RemoteWorkspaceAdapterMailboxPreflightSnapshot{}, fmt.Errorf("remote workspace adapter session not found")
|
|
}
|
|
consumer := session.MailboxConsumers[consumerID]
|
|
if consumer == nil {
|
|
return RemoteWorkspaceAdapterMailboxPreflightSnapshot{}, fmt.Errorf("remote workspace adapter mailbox consumer not found")
|
|
}
|
|
resumeSequence := consumer.AckSequence
|
|
if resumeFrom == "checkpoint" {
|
|
resumeSequence = consumer.CheckpointSequence
|
|
}
|
|
startIndex := 0
|
|
for startIndex < len(session.Mailbox) && session.Mailbox[startIndex].Sequence <= resumeSequence {
|
|
startIndex++
|
|
}
|
|
available := len(session.Mailbox) - startIndex
|
|
if available < 0 {
|
|
available = 0
|
|
}
|
|
returned := available
|
|
if returned > limit {
|
|
returned = limit
|
|
}
|
|
var firstExpected int64
|
|
var lastExpected int64
|
|
if returned > 0 {
|
|
firstExpected = session.Mailbox[startIndex].Sequence
|
|
lastExpected = session.Mailbox[startIndex+returned-1].Sequence
|
|
}
|
|
var firstRetained int64
|
|
var lastRetained int64
|
|
if len(session.Mailbox) > 0 {
|
|
firstRetained = session.Mailbox[0].Sequence
|
|
lastRetained = session.Mailbox[len(session.Mailbox)-1].Sequence
|
|
}
|
|
diagnosticState := "ready"
|
|
staleCursor := false
|
|
missingDropped := 0
|
|
recommendedAction := "resume_from_cursor"
|
|
actionHints := []string{"resume_from_requested_cursor"}
|
|
actionReason := "cursor_window_available"
|
|
if firstRetained > 0 && resumeSequence < firstRetained-1 {
|
|
diagnosticState = "stale_cursor_gap"
|
|
staleCursor = true
|
|
missingDropped = int(firstRetained - resumeSequence - 1)
|
|
recommendedAction = "reset_consumer_and_resync"
|
|
actionHints = []string{"reset_consumer_cursor", "request_full_adapter_resync", "resume_from_checkpoint_after_resync"}
|
|
actionReason = "consumer_cursor_before_first_retained_sequence"
|
|
} else if returned == 0 {
|
|
diagnosticState = "caught_up"
|
|
recommendedAction = "wait_for_new_mailbox_events"
|
|
actionHints = []string{"keep_consumer_cursor", "long_poll_after_sequence"}
|
|
actionReason = "cursor_caught_up_to_retained_mailbox"
|
|
}
|
|
actionContext := map[string]any{
|
|
"consumer_id": consumerID,
|
|
"resume_from": resumeFrom,
|
|
"resume_sequence": resumeSequence,
|
|
"first_retained_sequence": firstRetained,
|
|
"last_retained_sequence": lastRetained,
|
|
"mailbox_depth": len(session.Mailbox),
|
|
"mailbox_dropped_total": session.MailboxDropped,
|
|
"missing_dropped_count": missingDropped,
|
|
"expected_available_count": available,
|
|
"expected_returned_count": returned,
|
|
"expected_skipped_count": startIndex,
|
|
"consumer_checkpoint_sequence": consumer.CheckpointSequence,
|
|
"consumer_ack_sequence": consumer.AckSequence,
|
|
}
|
|
operatorSummary := "consumer cursor can resume from requested window"
|
|
operatorStatus := "ready_to_resume"
|
|
operatorSeverity := "ok"
|
|
if diagnosticState == "stale_cursor_gap" {
|
|
operatorSummary = "stale cursor gap: reset consumer and resync before resume"
|
|
operatorStatus = "resync_required"
|
|
operatorSeverity = "warn"
|
|
} else if diagnosticState == "caught_up" {
|
|
operatorSummary = "consumer cursor is caught up; wait for new mailbox events"
|
|
operatorStatus = "caught_up"
|
|
operatorSeverity = "info"
|
|
}
|
|
operatorSummaryFields := map[string]any{
|
|
"diagnostic_state": diagnosticState,
|
|
"recommended_action": recommendedAction,
|
|
"action_reason": actionReason,
|
|
"operator_status": operatorStatus,
|
|
"operator_severity": operatorSeverity,
|
|
"resume_from": resumeFrom,
|
|
"resume_sequence": resumeSequence,
|
|
"first_retained_sequence": firstRetained,
|
|
"last_retained_sequence": lastRetained,
|
|
"missing_dropped_count": missingDropped,
|
|
"expected_available_count": available,
|
|
"expected_returned_count": returned,
|
|
"expected_skipped_count": startIndex,
|
|
}
|
|
snapshot := RemoteWorkspaceAdapterMailboxPreflightSnapshot{
|
|
SchemaVersion: "rap.remote_workspace_adapter_mailbox_preflight.v1",
|
|
AdapterRuntimeID: RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
AdapterSessionID: adapterSessionID,
|
|
ObservedAt: now.Format(time.RFC3339Nano),
|
|
ReadOnly: true,
|
|
ConsumerID: consumerID,
|
|
ResumeFrom: resumeFrom,
|
|
ResumeSequence: resumeSequence,
|
|
AfterSequence: resumeSequence,
|
|
Limit: limit,
|
|
MailboxDepth: len(session.Mailbox),
|
|
MailboxEnqueued: session.MailboxEnqueued,
|
|
MailboxDropped: session.MailboxDropped,
|
|
MailboxReadTotal: session.MailboxRead,
|
|
ConsumerReadTotal: session.MailboxConsumerReadTotal,
|
|
ConsumerAckTotal: session.MailboxConsumerAckTotal,
|
|
ConsumerCheckpointSequence: consumer.CheckpointSequence,
|
|
ConsumerAckSequence: consumer.AckSequence,
|
|
ConsumerLagCount: countMailboxEventsAfterSequence(session.Mailbox, consumer.AckSequence),
|
|
ExpectedAvailableCount: available,
|
|
ExpectedReturnedCount: returned,
|
|
ExpectedSkippedCount: startIndex,
|
|
FirstExpectedSequence: firstExpected,
|
|
LastExpectedSequence: lastExpected,
|
|
FirstRetainedSequence: firstRetained,
|
|
LastRetainedSequence: lastRetained,
|
|
DiagnosticState: diagnosticState,
|
|
StaleCursor: staleCursor,
|
|
MissingDroppedCount: missingDropped,
|
|
RecommendedAction: recommendedAction,
|
|
ActionHints: actionHints,
|
|
ActionReason: actionReason,
|
|
ActionContext: actionContext,
|
|
OperatorSummary: operatorSummary,
|
|
OperatorStatus: operatorStatus,
|
|
OperatorSeverity: operatorSeverity,
|
|
OperatorSummaryFields: operatorSummaryFields,
|
|
}
|
|
s.recordAdapterSessionMailboxPreflightLocked(session, snapshot, now)
|
|
return snapshot, nil
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) recordAdapterSessionMailboxPreflightLocked(session *remoteWorkspaceAdapterProbeSession, snapshot RemoteWorkspaceAdapterMailboxPreflightSnapshot, now time.Time) {
|
|
s.mailboxPreflightTotal++
|
|
if snapshot.ResumeFrom == "ack" {
|
|
s.mailboxPreflightAckTotal++
|
|
}
|
|
if snapshot.ResumeFrom == "checkpoint" {
|
|
s.mailboxPreflightCheckpointTotal++
|
|
}
|
|
s.lastMailboxPreflightAt = now.Format(time.RFC3339Nano)
|
|
s.lastMailboxPreflightAdapterSessionID = snapshot.AdapterSessionID
|
|
s.lastMailboxPreflightConsumerID = snapshot.ConsumerID
|
|
s.lastMailboxPreflightResumeFrom = snapshot.ResumeFrom
|
|
s.lastMailboxPreflightResumeSequence = snapshot.ResumeSequence
|
|
s.lastMailboxPreflightAfterSequence = snapshot.AfterSequence
|
|
s.lastMailboxPreflightAvailableCount = snapshot.ExpectedAvailableCount
|
|
s.lastMailboxPreflightReturnedCount = snapshot.ExpectedReturnedCount
|
|
s.lastMailboxPreflightSkippedCount = snapshot.ExpectedSkippedCount
|
|
s.lastMailboxPreflightFirstSequence = snapshot.FirstExpectedSequence
|
|
s.lastMailboxPreflightLastSequence = snapshot.LastExpectedSequence
|
|
s.lastMailboxPreflightFirstRetained = snapshot.FirstRetainedSequence
|
|
s.lastMailboxPreflightLastRetained = snapshot.LastRetainedSequence
|
|
s.lastMailboxPreflightMailboxDropped = snapshot.MailboxDropped
|
|
s.lastMailboxPreflightDiagnosticState = snapshot.DiagnosticState
|
|
s.lastMailboxPreflightStaleCursor = snapshot.StaleCursor
|
|
s.lastMailboxPreflightMissingDropped = snapshot.MissingDroppedCount
|
|
s.lastMailboxPreflightRecommendedAction = snapshot.RecommendedAction
|
|
s.lastMailboxPreflightActionHints = append([]string(nil), snapshot.ActionHints...)
|
|
s.lastMailboxPreflightActionReason = snapshot.ActionReason
|
|
s.lastMailboxPreflightActionContext = cloneStringAnyMap(snapshot.ActionContext)
|
|
s.lastMailboxPreflightOperatorSummary = snapshot.OperatorSummary
|
|
s.lastMailboxPreflightOperatorStatus = snapshot.OperatorStatus
|
|
s.lastMailboxPreflightOperatorSeverity = snapshot.OperatorSeverity
|
|
s.lastMailboxPreflightOperatorFields = cloneStringAnyMap(snapshot.OperatorSummaryFields)
|
|
if session == nil {
|
|
return
|
|
}
|
|
session.MailboxPreflightTotal++
|
|
if snapshot.ResumeFrom == "ack" {
|
|
session.MailboxPreflightAckTotal++
|
|
}
|
|
if snapshot.ResumeFrom == "checkpoint" {
|
|
session.MailboxPreflightCheckpointTotal++
|
|
}
|
|
incrementStringInt64Map(&session.MailboxPreflightOperatorStatusCounts, snapshot.OperatorStatus)
|
|
incrementStringInt64Map(&session.MailboxPreflightOperatorSeverityCounts, snapshot.OperatorSeverity)
|
|
session.LastMailboxPreflightAt = now
|
|
session.LastMailboxPreflightConsumerID = snapshot.ConsumerID
|
|
session.LastMailboxPreflightResumeFrom = snapshot.ResumeFrom
|
|
session.LastMailboxPreflightResumeSequence = snapshot.ResumeSequence
|
|
session.LastMailboxPreflightAfterSequence = snapshot.AfterSequence
|
|
session.LastMailboxPreflightAvailableCount = snapshot.ExpectedAvailableCount
|
|
session.LastMailboxPreflightReturnedCount = snapshot.ExpectedReturnedCount
|
|
session.LastMailboxPreflightSkippedCount = snapshot.ExpectedSkippedCount
|
|
session.LastMailboxPreflightFirstSequence = snapshot.FirstExpectedSequence
|
|
session.LastMailboxPreflightLastSequence = snapshot.LastExpectedSequence
|
|
session.LastMailboxPreflightFirstRetained = snapshot.FirstRetainedSequence
|
|
session.LastMailboxPreflightLastRetained = snapshot.LastRetainedSequence
|
|
session.LastMailboxPreflightMailboxDropped = snapshot.MailboxDropped
|
|
session.LastMailboxPreflightDiagnosticState = snapshot.DiagnosticState
|
|
session.LastMailboxPreflightStaleCursor = snapshot.StaleCursor
|
|
session.LastMailboxPreflightMissingDropped = snapshot.MissingDroppedCount
|
|
session.LastMailboxPreflightRecommendedAction = snapshot.RecommendedAction
|
|
session.LastMailboxPreflightActionHints = append([]string(nil), snapshot.ActionHints...)
|
|
session.LastMailboxPreflightActionReason = snapshot.ActionReason
|
|
session.LastMailboxPreflightActionContext = cloneStringAnyMap(snapshot.ActionContext)
|
|
session.LastMailboxPreflightOperatorSummary = snapshot.OperatorSummary
|
|
session.LastMailboxPreflightOperatorStatus = snapshot.OperatorStatus
|
|
session.LastMailboxPreflightOperatorSeverity = snapshot.OperatorSeverity
|
|
session.LastMailboxPreflightOperatorFields = cloneStringAnyMap(snapshot.OperatorSummaryFields)
|
|
}
|
|
|
|
func cloneStringAnyMap(source map[string]any) map[string]any {
|
|
if source == nil {
|
|
return nil
|
|
}
|
|
clone := make(map[string]any, len(source))
|
|
for key, value := range source {
|
|
clone[key] = value
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func cloneStringInt64Map(source map[string]int64) map[string]int64 {
|
|
if source == nil {
|
|
return nil
|
|
}
|
|
clone := make(map[string]int64, len(source))
|
|
for key, value := range source {
|
|
clone[key] = value
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func incrementStringInt64Map(target *map[string]int64, key string) {
|
|
key = strings.TrimSpace(key)
|
|
if key == "" || target == nil {
|
|
return
|
|
}
|
|
if *target == nil {
|
|
*target = map[string]int64{}
|
|
}
|
|
(*target)[key]++
|
|
}
|
|
|
|
func remoteWorkspacePreflightAttentionStatus(statusCounts map[string]int64, severityCounts map[string]int64) string {
|
|
resyncCount := statusCounts["resync_required"]
|
|
warnCount := severityCounts["warn"]
|
|
if resyncCount > 1 || warnCount > 1 {
|
|
return "repeated_resync_required"
|
|
}
|
|
if resyncCount > 0 || warnCount > 0 {
|
|
return "needs_attention"
|
|
}
|
|
if statusCounts["ready_to_resume"] > 0 || statusCounts["caught_up"] > 0 || severityCounts["ok"] > 0 || severityCounts["info"] > 0 {
|
|
return "clean"
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
func remoteWorkspacePreflightAttentionReason(status string, statusCounts map[string]int64, severityCounts map[string]int64) string {
|
|
switch status {
|
|
case "repeated_resync_required":
|
|
return "resync_required_preflight_repeated"
|
|
case "needs_attention":
|
|
if statusCounts["resync_required"] > 0 {
|
|
return "resync_required_preflight_observed"
|
|
}
|
|
if severityCounts["warn"] > 0 {
|
|
return "warn_preflight_observed"
|
|
}
|
|
return "attention_preflight_observed"
|
|
case "clean":
|
|
return "no_resync_required_preflight_observed"
|
|
default:
|
|
return "no_preflight_observed"
|
|
}
|
|
}
|
|
|
|
func remoteWorkspacePreflightRemediationChecklist(operatorStatus string, actionHints []string) []map[string]any {
|
|
hints := map[string]bool{}
|
|
for _, hint := range actionHints {
|
|
hints[hint] = true
|
|
}
|
|
if operatorStatus == "resync_required" {
|
|
return []map[string]any{
|
|
{
|
|
"step": "reset_consumer_cursor",
|
|
"required": true,
|
|
"satisfied": false,
|
|
"source_hint": hints["reset_consumer_cursor"],
|
|
},
|
|
{
|
|
"step": "request_full_adapter_resync",
|
|
"required": true,
|
|
"satisfied": false,
|
|
"source_hint": hints["request_full_adapter_resync"],
|
|
},
|
|
{
|
|
"step": "resume_from_checkpoint_after_resync",
|
|
"required": true,
|
|
"satisfied": false,
|
|
"source_hint": hints["resume_from_checkpoint_after_resync"],
|
|
},
|
|
}
|
|
}
|
|
if operatorStatus == "ready_to_resume" {
|
|
return []map[string]any{{
|
|
"step": "resume_from_requested_cursor",
|
|
"required": true,
|
|
"satisfied": true,
|
|
"source_hint": hints["resume_from_requested_cursor"],
|
|
}}
|
|
}
|
|
return []map[string]any{{
|
|
"step": "wait_for_new_mailbox_events",
|
|
"required": true,
|
|
"satisfied": false,
|
|
"source_hint": hints["long_poll_after_sequence"] || hints["keep_consumer_cursor"],
|
|
}}
|
|
}
|
|
|
|
func remoteWorkspacePreflightRemediationChecklistSummary(checklist []map[string]any) map[string]any {
|
|
total := len(checklist)
|
|
required := 0
|
|
satisfied := 0
|
|
for _, item := range checklist {
|
|
itemRequired, _ := item["required"].(bool)
|
|
itemSatisfied, _ := item["satisfied"].(bool)
|
|
if itemRequired {
|
|
required++
|
|
if itemSatisfied {
|
|
satisfied++
|
|
}
|
|
}
|
|
}
|
|
pending := required - satisfied
|
|
if pending < 0 {
|
|
pending = 0
|
|
}
|
|
status := "not_required"
|
|
if required > 0 && pending == 0 {
|
|
status = "ready"
|
|
} else if pending > 0 {
|
|
status = "action_required"
|
|
}
|
|
return map[string]any{
|
|
"status": status,
|
|
"total_count": total,
|
|
"required_count": required,
|
|
"satisfied_count": satisfied,
|
|
"pending_count": pending,
|
|
}
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) evictOldestMailboxConsumerLocked(session *remoteWorkspaceAdapterProbeSession) bool {
|
|
if session == nil || len(session.MailboxConsumers) < DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity {
|
|
return false
|
|
}
|
|
oldestID := ""
|
|
var oldestAt time.Time
|
|
for id, consumer := range session.MailboxConsumers {
|
|
if consumer == nil {
|
|
delete(session.MailboxConsumers, id)
|
|
return true
|
|
}
|
|
at := consumer.LastReadAt
|
|
if at.IsZero() {
|
|
at = consumer.CreatedAt
|
|
}
|
|
if oldestID == "" || at.Before(oldestAt) || (at.Equal(oldestAt) && id < oldestID) {
|
|
oldestID = id
|
|
oldestAt = at
|
|
}
|
|
}
|
|
if oldestID != "" {
|
|
delete(session.MailboxConsumers, oldestID)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func countMailboxEventsAfterSequence(events []RemoteWorkspaceAdapterMailboxEvent, sequence int64) int {
|
|
count := 0
|
|
for _, event := range events {
|
|
if event.Sequence > sequence {
|
|
count++
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
func countMailboxConsumersLocked(sessions map[string]*remoteWorkspaceAdapterProbeSession) int {
|
|
count := 0
|
|
for _, session := range sessions {
|
|
if session != nil {
|
|
count += len(session.MailboxConsumers)
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
func remoteWorkspaceAdapterRuntimeReadinessLocked(s *RemoteWorkspaceFrameProbeSink, session *remoteWorkspaceAdapterProbeSession, now time.Time) map[string]any {
|
|
readiness := map[string]any{
|
|
"schema_version": "rap.remote_workspace_adapter_runtime_readiness.v1",
|
|
"adapter_runtime_id": RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
"observed_at": now.UTC().Format(time.RFC3339Nano),
|
|
"probe_only": true,
|
|
"payload_traffic": "none",
|
|
"status": "idle",
|
|
"diagnostic_state": "waiting_for_session",
|
|
"ready": false,
|
|
"active_session_count": len(s.sessions),
|
|
"terminal_session_count": len(s.terminalSessions),
|
|
"mailbox_capacity": DefaultRemoteWorkspaceAdapterMailboxCapacity,
|
|
"consumer_capacity": DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity,
|
|
"mailbox_read_total": s.mailboxReadTotal,
|
|
"mailbox_resume_total": s.mailboxResumeReadTotal,
|
|
"mailbox_preflight_total": s.mailboxPreflightTotal,
|
|
}
|
|
if session == nil {
|
|
if s.sequence == 0 {
|
|
readiness["no_session_summary"] = map[string]any{
|
|
"schema_version": "rap.remote_workspace_adapter_no_session_summary.v1",
|
|
"summary_contract": []string{"status", "diagnostic_state", "active_session_count", "terminal_session_count"},
|
|
"summary_features": map[string]bool{"status": true, "diagnostic_state": true, "active_session_count": true, "terminal_session_count": true},
|
|
"status": "idle",
|
|
"diagnostic_state": "waiting_for_session",
|
|
"active_session_count": len(s.sessions),
|
|
"terminal_session_count": len(s.terminalSessions),
|
|
}
|
|
}
|
|
if s.sequence > 0 {
|
|
readiness["last_adapter_session_id"] = s.last.AdapterSessionID
|
|
lastSessionState := s.last.SessionState
|
|
if terminal, ok := s.terminalSessions[s.last.AdapterSessionID]; ok {
|
|
lastSessionState = terminal.State
|
|
readiness["terminal_session_summary"] = map[string]any{
|
|
"schema_version": "rap.remote_workspace_adapter_terminal_session_summary.v1",
|
|
"summary_contract": []string{"adapter_session_id", "session_state", "reason", "controlled_at"},
|
|
"summary_features": map[string]bool{"adapter_session_id": true, "session_state": true, "reason": true, "controlled_at": true},
|
|
"adapter_session_id": s.last.AdapterSessionID,
|
|
"session_state": terminal.State,
|
|
"reason": terminal.Reason,
|
|
"controlled_at": terminal.ControlledAt.Format(time.RFC3339Nano),
|
|
}
|
|
}
|
|
readiness["last_session_state"] = lastSessionState
|
|
readiness["diagnostic_state"] = "last_session_terminal_or_expired"
|
|
}
|
|
return readiness
|
|
}
|
|
|
|
consumerLag := countMailboxEventsAfterSequence(session.Mailbox, session.LastMailboxConsumerAck)
|
|
status := "session_active"
|
|
diagnosticState := "waiting_for_consumer"
|
|
if session.State == "backpressure" {
|
|
status = "backpressure"
|
|
diagnosticState = "backpressure"
|
|
} else if len(session.MailboxConsumers) > 0 {
|
|
status = "cursor_ready"
|
|
diagnosticState = "adapter_cursor_ready"
|
|
}
|
|
readiness["status"] = status
|
|
readiness["diagnostic_state"] = diagnosticState
|
|
readiness["ready"] = len(session.MailboxConsumers) > 0 && session.State != "backpressure"
|
|
readiness["adapter_session_id"] = session.ID
|
|
readiness["session_state"] = session.State
|
|
readiness["mailbox_depth"] = len(session.Mailbox)
|
|
readiness["mailbox_enqueued_total"] = session.MailboxEnqueued
|
|
readiness["mailbox_read_total"] = session.MailboxRead
|
|
readiness["mailbox_resume_read_total"] = session.MailboxResumeRead
|
|
readiness["mailbox_preflight_total"] = session.MailboxPreflightTotal
|
|
readiness["mailbox_preflight_operator_status_counts"] = cloneStringInt64Map(session.MailboxPreflightOperatorStatusCounts)
|
|
readiness["mailbox_preflight_operator_severity_counts"] = cloneStringInt64Map(session.MailboxPreflightOperatorSeverityCounts)
|
|
preflightAttentionStatus := remoteWorkspacePreflightAttentionStatus(session.MailboxPreflightOperatorStatusCounts, session.MailboxPreflightOperatorSeverityCounts)
|
|
preflightAttentionReason := remoteWorkspacePreflightAttentionReason(preflightAttentionStatus, session.MailboxPreflightOperatorStatusCounts, session.MailboxPreflightOperatorSeverityCounts)
|
|
readiness["preflight_attention_status"] = preflightAttentionStatus
|
|
readiness["preflight_attention_reason"] = preflightAttentionReason
|
|
readiness["mailbox_after_sequence_read_total"] = session.MailboxAfterSequenceRead
|
|
readiness["mailbox_returned_total"] = session.MailboxReturnedTotal
|
|
readiness["mailbox_skipped_total"] = session.MailboxSkippedTotal
|
|
readiness["consumer_count"] = len(session.MailboxConsumers)
|
|
readiness["consumer_read_total"] = session.MailboxConsumerReadTotal
|
|
readiness["consumer_ack_total"] = session.MailboxConsumerAckTotal
|
|
readiness["last_consumer_id"] = session.LastMailboxConsumerID
|
|
readiness["last_consumer_checkpoint_sequence"] = session.LastMailboxConsumerCheckpoint
|
|
readiness["last_consumer_ack_sequence"] = session.LastMailboxConsumerAck
|
|
readiness["last_consumer_lag_count"] = consumerLag
|
|
readiness["last_resume_from"] = session.LastMailboxResumeFrom
|
|
readiness["last_resume_sequence"] = session.LastMailboxResumeSequence
|
|
readiness["last_resume_consumer_id"] = session.LastMailboxResumeConsumerID
|
|
readiness["last_after_sequence"] = session.LastMailboxAfterSequence
|
|
readiness["last_returned_count"] = session.LastMailboxReturnedCount
|
|
readiness["last_skipped_count"] = session.LastMailboxSkippedCount
|
|
readiness["last_preflight_consumer_id"] = session.LastMailboxPreflightConsumerID
|
|
readiness["last_preflight_resume_from"] = session.LastMailboxPreflightResumeFrom
|
|
readiness["last_preflight_resume_sequence"] = session.LastMailboxPreflightResumeSequence
|
|
readiness["last_preflight_available_count"] = session.LastMailboxPreflightAvailableCount
|
|
readiness["last_preflight_returned_count"] = session.LastMailboxPreflightReturnedCount
|
|
readiness["last_preflight_skipped_count"] = session.LastMailboxPreflightSkippedCount
|
|
readiness["last_preflight_diagnostic_state"] = session.LastMailboxPreflightDiagnosticState
|
|
readiness["last_preflight_stale_cursor"] = session.LastMailboxPreflightStaleCursor
|
|
readiness["last_preflight_missing_dropped_count"] = session.LastMailboxPreflightMissingDropped
|
|
readiness["last_preflight_recommended_action"] = session.LastMailboxPreflightRecommendedAction
|
|
readiness["last_preflight_action_hints"] = append([]string(nil), session.LastMailboxPreflightActionHints...)
|
|
readiness["last_preflight_action_reason"] = session.LastMailboxPreflightActionReason
|
|
readiness["last_preflight_action_context"] = cloneStringAnyMap(session.LastMailboxPreflightActionContext)
|
|
readiness["last_preflight_operator_summary"] = session.LastMailboxPreflightOperatorSummary
|
|
readiness["last_preflight_operator_status"] = session.LastMailboxPreflightOperatorStatus
|
|
readiness["last_preflight_operator_severity"] = session.LastMailboxPreflightOperatorSeverity
|
|
readiness["last_preflight_operator_summary_fields"] = cloneStringAnyMap(session.LastMailboxPreflightOperatorFields)
|
|
if session.MailboxPreflightTotal > 0 {
|
|
remediationChecklist := remoteWorkspacePreflightRemediationChecklist(session.LastMailboxPreflightOperatorStatus, session.LastMailboxPreflightActionHints)
|
|
remediationChecklistSummary := remoteWorkspacePreflightRemediationChecklistSummary(remediationChecklist)
|
|
readiness["last_preflight"] = map[string]any{
|
|
"diagnostics_schema_version": "rap.remote_workspace_adapter_mailbox_preflight_diagnostics.v1",
|
|
"diagnostics_contract": []string{"retained_window", "remediation_checklist", "attention", "operator_counts"},
|
|
"diagnostics_features": map[string]bool{"retained_window": true, "remediation_checklist": true, "attention": true, "operator_counts": true},
|
|
"observed_at": session.LastMailboxPreflightAt.Format(time.RFC3339Nano),
|
|
"consumer_id": session.LastMailboxPreflightConsumerID,
|
|
"resume_from": session.LastMailboxPreflightResumeFrom,
|
|
"resume_sequence": session.LastMailboxPreflightResumeSequence,
|
|
"after_sequence": session.LastMailboxPreflightAfterSequence,
|
|
"available_count": session.LastMailboxPreflightAvailableCount,
|
|
"returned_count": session.LastMailboxPreflightReturnedCount,
|
|
"skipped_count": session.LastMailboxPreflightSkippedCount,
|
|
"first_sequence": session.LastMailboxPreflightFirstSequence,
|
|
"last_sequence": session.LastMailboxPreflightLastSequence,
|
|
"first_retained_sequence": session.LastMailboxPreflightFirstRetained,
|
|
"last_retained_sequence": session.LastMailboxPreflightLastRetained,
|
|
"mailbox_dropped_total": session.LastMailboxPreflightMailboxDropped,
|
|
"diagnostic_state": session.LastMailboxPreflightDiagnosticState,
|
|
"stale_cursor": session.LastMailboxPreflightStaleCursor,
|
|
"missing_dropped_count": session.LastMailboxPreflightMissingDropped,
|
|
"recommended_action": session.LastMailboxPreflightRecommendedAction,
|
|
"action_hints": append([]string(nil), session.LastMailboxPreflightActionHints...),
|
|
"action_reason": session.LastMailboxPreflightActionReason,
|
|
"action_context": cloneStringAnyMap(session.LastMailboxPreflightActionContext),
|
|
"remediation_checklist": remediationChecklist,
|
|
"remediation_checklist_status": remediationChecklistSummary["status"],
|
|
"remediation_checklist_counts": remediationChecklistSummary,
|
|
"operator_summary": session.LastMailboxPreflightOperatorSummary,
|
|
"operator_status": session.LastMailboxPreflightOperatorStatus,
|
|
"operator_severity": session.LastMailboxPreflightOperatorSeverity,
|
|
"operator_summary_fields": cloneStringAnyMap(session.LastMailboxPreflightOperatorFields),
|
|
"mailbox_preflight_total": session.MailboxPreflightTotal,
|
|
"mailbox_preflight_ack_total": session.MailboxPreflightAckTotal,
|
|
"mailbox_preflight_checkpoint_total": session.MailboxPreflightCheckpointTotal,
|
|
"preflight_attention_status": preflightAttentionStatus,
|
|
"preflight_attention_reason": preflightAttentionReason,
|
|
"operator_status_counts": cloneStringInt64Map(session.MailboxPreflightOperatorStatusCounts),
|
|
"operator_severity_counts": cloneStringInt64Map(session.MailboxPreflightOperatorSeverityCounts),
|
|
}
|
|
}
|
|
if !session.LastActivityAt.IsZero() {
|
|
readiness["last_activity_at"] = session.LastActivityAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastMailboxReadAt.IsZero() {
|
|
readiness["last_mailbox_read_at"] = session.LastMailboxReadAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastMailboxConsumerReadAt.IsZero() {
|
|
readiness["last_consumer_read_at"] = session.LastMailboxConsumerReadAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastMailboxConsumerAckAt.IsZero() {
|
|
readiness["last_consumer_ack_at"] = session.LastMailboxConsumerAckAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastMailboxPreflightAt.IsZero() {
|
|
readiness["last_preflight_at"] = session.LastMailboxPreflightAt.Format(time.RFC3339Nano)
|
|
}
|
|
return readiness
|
|
}
|
|
|
|
func remoteWorkspaceAdapterSessionView(session remoteWorkspaceAdapterProbeSession) RemoteWorkspaceAdapterSessionView {
|
|
view := RemoteWorkspaceAdapterSessionView{
|
|
AdapterSessionID: session.ID,
|
|
SessionState: session.State,
|
|
CreatedAt: session.CreatedAt.Format(time.RFC3339Nano),
|
|
LastActivityAt: session.LastActivityAt.Format(time.RFC3339Nano),
|
|
DeliveryCount: session.DeliveryCount,
|
|
BackpressureCount: session.BackpressureCount,
|
|
AcceptedFrames: session.AcceptedFrames,
|
|
DroppedFrames: session.DroppedFrames,
|
|
AckedFrames: session.AckedFrames,
|
|
MailboxCapacity: DefaultRemoteWorkspaceAdapterMailboxCapacity,
|
|
MailboxDepth: len(session.Mailbox),
|
|
MailboxEnqueued: session.MailboxEnqueued,
|
|
MailboxDrained: session.MailboxDrained,
|
|
MailboxDropped: session.MailboxDropped,
|
|
MailboxRead: session.MailboxRead,
|
|
MailboxWait: session.MailboxWait,
|
|
MailboxWaitTimeout: session.MailboxWaitTimeout,
|
|
MailboxEmptyRead: session.MailboxEmptyRead,
|
|
MailboxResumeRead: session.MailboxResumeRead,
|
|
MailboxAfterSequenceRead: session.MailboxAfterSequenceRead,
|
|
MailboxReturnedTotal: session.MailboxReturnedTotal,
|
|
MailboxSkippedTotal: session.MailboxSkippedTotal,
|
|
MailboxConsumerCount: len(session.MailboxConsumers),
|
|
MailboxConsumerRead: session.MailboxConsumerReadTotal,
|
|
MailboxConsumerAck: session.MailboxConsumerAckTotal,
|
|
MailboxConsumerReset: session.MailboxConsumerResetTotal,
|
|
MailboxConsumerEvicted: session.MailboxConsumerEvictedTotal,
|
|
LastMailboxConsumerID: session.LastMailboxConsumerID,
|
|
LastMailboxConsumerCheckpoint: session.LastMailboxConsumerCheckpoint,
|
|
LastMailboxConsumerAck: session.LastMailboxConsumerAck,
|
|
LastMailboxWaitMs: session.LastMailboxWaitMs,
|
|
LastMailboxWaited: session.LastMailboxWaited,
|
|
LastMailboxWaitTimeout: session.LastMailboxTimeout,
|
|
LastMailboxEmpty: session.LastMailboxEmpty,
|
|
LastMailboxResumeFrom: session.LastMailboxResumeFrom,
|
|
LastMailboxResumeSequence: session.LastMailboxResumeSequence,
|
|
LastMailboxResumeConsumerID: session.LastMailboxResumeConsumerID,
|
|
LastMailboxAfterSequence: session.LastMailboxAfterSequence,
|
|
LastMailboxSkippedCount: session.LastMailboxSkippedCount,
|
|
LastMailboxReturnedCount: session.LastMailboxReturnedCount,
|
|
ChannelID: session.LastChannelID,
|
|
ResourceID: session.LastResourceID,
|
|
RouteID: session.LastRouteID,
|
|
}
|
|
if !session.BoundAt.IsZero() {
|
|
view.BoundAt = session.BoundAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastBackpressureAt.IsZero() {
|
|
view.LastBackpressureAt = session.LastBackpressureAt.Format(time.RFC3339Nano)
|
|
view.LastBackpressureReason = session.LastReason
|
|
}
|
|
if !session.LastMailboxReadAt.IsZero() {
|
|
view.LastMailboxReadAt = session.LastMailboxReadAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastMailboxConsumerReadAt.IsZero() {
|
|
view.LastMailboxConsumerReadAt = session.LastMailboxConsumerReadAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastMailboxConsumerAckAt.IsZero() {
|
|
view.LastMailboxConsumerAckAt = session.LastMailboxConsumerAckAt.Format(time.RFC3339Nano)
|
|
}
|
|
return view
|
|
}
|
|
|
|
func (s *RemoteWorkspaceFrameProbeSink) Report(now time.Time) map[string]any {
|
|
report := map[string]any{
|
|
"schema_version": "rap.remote_workspace_adapter_sink_report.v1",
|
|
"sink": RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
"adapter_runtime_id": RemoteWorkspaceFrameProbeSinkRuntimeID,
|
|
"status": "ready",
|
|
"session_state": "idle",
|
|
"probe_only": true,
|
|
"payload_traffic": "none",
|
|
"observed_at": now.UTC().Format(time.RFC3339Nano),
|
|
}
|
|
if s == nil {
|
|
report["status"] = "unavailable"
|
|
return report
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.expireIdleSessionsLocked(now.UTC())
|
|
queueCapacity := s.queueCapacity
|
|
if queueCapacity <= 0 {
|
|
queueCapacity = DefaultRemoteWorkspaceFrameProbeSinkQueueCapacity
|
|
}
|
|
report["delivery_count"] = s.sequence
|
|
report["queue_capacity"] = queueCapacity
|
|
report["queue_depth"] = 0
|
|
report["total_accepted_frames"] = s.acceptedFramesTotal
|
|
report["total_dropped_frames"] = s.droppedFramesTotal
|
|
report["total_acked_frames"] = s.ackedFramesTotal
|
|
report["backpressure_count"] = s.backpressureCount
|
|
report["session_ttl_seconds"] = int64(s.sessionTTL.Seconds())
|
|
report["active_session_count"] = len(s.sessions)
|
|
report["session_created_total"] = s.sessionCreatedTotal
|
|
report["session_bound_total"] = s.sessionBoundTotal
|
|
report["session_backpressure_total"] = s.sessionBackpressureTotal
|
|
report["session_expired_total"] = s.sessionExpiredTotal
|
|
report["session_closed_total"] = s.sessionClosedTotal
|
|
report["session_reset_total"] = s.sessionResetTotal
|
|
report["session_control_total"] = s.sessionControlTotal
|
|
report["mailbox_capacity"] = DefaultRemoteWorkspaceAdapterMailboxCapacity
|
|
report["mailbox_enqueued_total"] = s.mailboxEnqueuedTotal
|
|
report["mailbox_drained_total"] = s.mailboxDrainedTotal
|
|
report["mailbox_dropped_total"] = s.mailboxDroppedTotal
|
|
report["mailbox_read_total"] = s.mailboxReadTotal
|
|
report["mailbox_wait_total"] = s.mailboxWaitTotal
|
|
report["mailbox_wait_timeout_total"] = s.mailboxWaitTimeoutTotal
|
|
report["mailbox_empty_read_total"] = s.mailboxEmptyReadTotal
|
|
report["mailbox_resume_read_total"] = s.mailboxResumeReadTotal
|
|
report["mailbox_after_sequence_read_total"] = s.mailboxAfterSequenceReadTotal
|
|
report["mailbox_returned_total"] = s.mailboxReturnedTotal
|
|
report["mailbox_skipped_total"] = s.mailboxSkippedTotal
|
|
report["mailbox_preflight_total"] = s.mailboxPreflightTotal
|
|
report["mailbox_preflight_ack_total"] = s.mailboxPreflightAckTotal
|
|
report["mailbox_preflight_checkpoint_total"] = s.mailboxPreflightCheckpointTotal
|
|
report["mailbox_consumer_capacity"] = DefaultRemoteWorkspaceAdapterMailboxConsumerCapacity
|
|
report["mailbox_consumer_count"] = countMailboxConsumersLocked(s.sessions)
|
|
report["mailbox_consumer_read_total"] = s.mailboxConsumerReadTotal
|
|
report["mailbox_consumer_ack_total"] = s.mailboxConsumerAckTotal
|
|
report["mailbox_consumer_reset_total"] = s.mailboxConsumerResetTotal
|
|
report["mailbox_consumer_evicted_total"] = s.mailboxConsumerEvictedTotal
|
|
if s.mailboxReadTotal > 0 {
|
|
report["last_mailbox_read_at"] = s.lastMailboxReadAt
|
|
report["last_mailbox_adapter_session_id"] = s.lastMailboxAdapterSessionID
|
|
report["last_mailbox_wait_ms"] = s.lastMailboxWaitMs
|
|
report["last_mailbox_waited"] = s.lastMailboxWaited
|
|
report["last_mailbox_wait_timeout"] = s.lastMailboxWaitTimeout
|
|
report["last_mailbox_empty"] = s.lastMailboxEmpty
|
|
report["last_mailbox_after_sequence"] = s.lastMailboxAfterSequence
|
|
report["last_mailbox_skipped_count"] = s.lastMailboxSkippedCount
|
|
report["last_mailbox_returned_count"] = s.lastMailboxReturnedCount
|
|
}
|
|
if s.mailboxResumeReadTotal > 0 {
|
|
report["last_mailbox_resume_from"] = s.lastMailboxResumeFrom
|
|
report["last_mailbox_resume_sequence"] = s.lastMailboxResumeSequence
|
|
report["last_mailbox_resume_consumer_id"] = s.lastMailboxResumeConsumerID
|
|
}
|
|
if s.mailboxPreflightTotal > 0 {
|
|
report["last_mailbox_preflight_at"] = s.lastMailboxPreflightAt
|
|
report["last_mailbox_preflight_adapter_session_id"] = s.lastMailboxPreflightAdapterSessionID
|
|
report["last_mailbox_preflight_consumer_id"] = s.lastMailboxPreflightConsumerID
|
|
report["last_mailbox_preflight_resume_from"] = s.lastMailboxPreflightResumeFrom
|
|
report["last_mailbox_preflight_resume_sequence"] = s.lastMailboxPreflightResumeSequence
|
|
report["last_mailbox_preflight_after_sequence"] = s.lastMailboxPreflightAfterSequence
|
|
report["last_mailbox_preflight_available_count"] = s.lastMailboxPreflightAvailableCount
|
|
report["last_mailbox_preflight_returned_count"] = s.lastMailboxPreflightReturnedCount
|
|
report["last_mailbox_preflight_skipped_count"] = s.lastMailboxPreflightSkippedCount
|
|
report["last_mailbox_preflight_first_sequence"] = s.lastMailboxPreflightFirstSequence
|
|
report["last_mailbox_preflight_last_sequence"] = s.lastMailboxPreflightLastSequence
|
|
report["last_mailbox_preflight_diagnostic_state"] = s.lastMailboxPreflightDiagnosticState
|
|
report["last_mailbox_preflight_stale_cursor"] = s.lastMailboxPreflightStaleCursor
|
|
report["last_mailbox_preflight_missing_dropped_count"] = s.lastMailboxPreflightMissingDropped
|
|
report["last_mailbox_preflight_recommended_action"] = s.lastMailboxPreflightRecommendedAction
|
|
report["last_mailbox_preflight_action_hints"] = append([]string(nil), s.lastMailboxPreflightActionHints...)
|
|
report["last_mailbox_preflight_action_reason"] = s.lastMailboxPreflightActionReason
|
|
report["last_mailbox_preflight_action_context"] = cloneStringAnyMap(s.lastMailboxPreflightActionContext)
|
|
report["last_mailbox_preflight_operator_summary"] = s.lastMailboxPreflightOperatorSummary
|
|
report["last_mailbox_preflight_operator_status"] = s.lastMailboxPreflightOperatorStatus
|
|
report["last_mailbox_preflight_operator_severity"] = s.lastMailboxPreflightOperatorSeverity
|
|
report["last_mailbox_preflight_operator_summary_fields"] = cloneStringAnyMap(s.lastMailboxPreflightOperatorFields)
|
|
}
|
|
if s.mailboxConsumerReadTotal > 0 {
|
|
report["last_mailbox_consumer_id"] = s.lastMailboxConsumerID
|
|
report["last_mailbox_consumer_read_at"] = s.lastMailboxConsumerReadAt
|
|
report["last_mailbox_consumer_adapter_session_id"] = s.lastMailboxConsumerAdapterSessionID
|
|
report["last_mailbox_consumer_checkpoint_sequence"] = s.lastMailboxConsumerCheckpoint
|
|
report["last_mailbox_consumer_ack_sequence"] = s.lastMailboxConsumerAck
|
|
}
|
|
if s.mailboxConsumerAckTotal > 0 {
|
|
report["last_mailbox_consumer_ack_at"] = s.lastMailboxConsumerAckAt
|
|
}
|
|
var currentSession *remoteWorkspaceAdapterProbeSession
|
|
if s.sequence > 0 {
|
|
session := s.sessions[s.last.AdapterSessionID]
|
|
currentSession = session
|
|
report["session_state"] = s.last.SessionState
|
|
report["current_adapter_session_id"] = s.last.AdapterSessionID
|
|
report["current_channel_id"] = s.last.ChannelID
|
|
report["current_resource_id"] = s.last.ResourceID
|
|
report["last_delivery"] = s.last
|
|
report["last_delivery_sequence"] = s.last.DeliverySequence
|
|
report["last_frame_count"] = s.last.FrameCount
|
|
report["last_queue_capacity"] = s.last.QueueCapacity
|
|
report["last_queue_depth"] = s.last.QueueDepth
|
|
report["last_accepted_frames"] = s.last.AcceptedFrames
|
|
report["last_dropped_frames"] = s.last.DroppedFrames
|
|
report["last_acked_frames"] = s.last.AckedFrames
|
|
report["last_backpressure"] = s.last.Backpressure
|
|
report["drop_policy"] = s.last.DropPolicy
|
|
report["last_channel_class"] = s.last.ChannelClass
|
|
report["last_adapter_contract_id"] = s.last.AdapterContractID
|
|
report["last_adapter_session_id"] = s.last.AdapterSessionID
|
|
if session != nil {
|
|
report["current_session_lifecycle_state"] = session.State
|
|
report["current_session_created_at"] = session.CreatedAt.Format(time.RFC3339Nano)
|
|
report["current_session_bound_at"] = session.BoundAt.Format(time.RFC3339Nano)
|
|
report["current_session_last_activity_at"] = session.LastActivityAt.Format(time.RFC3339Nano)
|
|
report["current_session_delivery_count"] = session.DeliveryCount
|
|
report["current_session_backpressure_count"] = session.BackpressureCount
|
|
report["current_session_accepted_frames"] = session.AcceptedFrames
|
|
report["current_session_dropped_frames"] = session.DroppedFrames
|
|
report["current_session_acked_frames"] = session.AckedFrames
|
|
report["current_session_mailbox_depth"] = len(session.Mailbox)
|
|
report["current_session_mailbox_enqueued_total"] = session.MailboxEnqueued
|
|
report["current_session_mailbox_drained_total"] = session.MailboxDrained
|
|
report["current_session_mailbox_dropped_total"] = session.MailboxDropped
|
|
report["current_session_mailbox_read_total"] = session.MailboxRead
|
|
report["current_session_mailbox_wait_total"] = session.MailboxWait
|
|
report["current_session_mailbox_wait_timeout_total"] = session.MailboxWaitTimeout
|
|
report["current_session_mailbox_empty_read_total"] = session.MailboxEmptyRead
|
|
report["current_session_mailbox_resume_read_total"] = session.MailboxResumeRead
|
|
report["current_session_mailbox_after_sequence_read_total"] = session.MailboxAfterSequenceRead
|
|
report["current_session_mailbox_returned_total"] = session.MailboxReturnedTotal
|
|
report["current_session_mailbox_skipped_total"] = session.MailboxSkippedTotal
|
|
report["current_session_mailbox_preflight_total"] = session.MailboxPreflightTotal
|
|
report["current_session_mailbox_preflight_ack_total"] = session.MailboxPreflightAckTotal
|
|
report["current_session_mailbox_preflight_checkpoint_total"] = session.MailboxPreflightCheckpointTotal
|
|
report["current_session_mailbox_preflight_operator_status_counts"] = cloneStringInt64Map(session.MailboxPreflightOperatorStatusCounts)
|
|
report["current_session_mailbox_preflight_operator_severity_counts"] = cloneStringInt64Map(session.MailboxPreflightOperatorSeverityCounts)
|
|
report["current_session_mailbox_consumer_count"] = len(session.MailboxConsumers)
|
|
report["current_session_mailbox_consumer_read_total"] = session.MailboxConsumerReadTotal
|
|
report["current_session_mailbox_consumer_ack_total"] = session.MailboxConsumerAckTotal
|
|
report["current_session_mailbox_consumer_reset_total"] = session.MailboxConsumerResetTotal
|
|
report["current_session_mailbox_consumer_evicted_total"] = session.MailboxConsumerEvictedTotal
|
|
if session.MailboxConsumerReadTotal > 0 {
|
|
report["current_session_last_mailbox_consumer_id"] = session.LastMailboxConsumerID
|
|
report["current_session_last_mailbox_consumer_read_at"] = session.LastMailboxConsumerReadAt.Format(time.RFC3339Nano)
|
|
report["current_session_last_mailbox_consumer_checkpoint_sequence"] = session.LastMailboxConsumerCheckpoint
|
|
report["current_session_last_mailbox_consumer_ack_sequence"] = session.LastMailboxConsumerAck
|
|
}
|
|
if session.MailboxConsumerAckTotal > 0 {
|
|
report["current_session_last_mailbox_consumer_ack_at"] = session.LastMailboxConsumerAckAt.Format(time.RFC3339Nano)
|
|
}
|
|
if !session.LastMailboxReadAt.IsZero() {
|
|
report["current_session_last_mailbox_read_at"] = session.LastMailboxReadAt.Format(time.RFC3339Nano)
|
|
report["current_session_last_mailbox_wait_ms"] = session.LastMailboxWaitMs
|
|
report["current_session_last_mailbox_waited"] = session.LastMailboxWaited
|
|
report["current_session_last_mailbox_wait_timeout"] = session.LastMailboxTimeout
|
|
report["current_session_last_mailbox_empty"] = session.LastMailboxEmpty
|
|
report["current_session_last_mailbox_after_sequence"] = session.LastMailboxAfterSequence
|
|
report["current_session_last_mailbox_skipped_count"] = session.LastMailboxSkippedCount
|
|
report["current_session_last_mailbox_returned_count"] = session.LastMailboxReturnedCount
|
|
}
|
|
if session.MailboxResumeRead > 0 {
|
|
report["current_session_last_mailbox_resume_from"] = session.LastMailboxResumeFrom
|
|
report["current_session_last_mailbox_resume_sequence"] = session.LastMailboxResumeSequence
|
|
report["current_session_last_mailbox_resume_consumer_id"] = session.LastMailboxResumeConsumerID
|
|
}
|
|
if session.MailboxPreflightTotal > 0 {
|
|
report["current_session_last_mailbox_preflight_at"] = session.LastMailboxPreflightAt.Format(time.RFC3339Nano)
|
|
report["current_session_last_mailbox_preflight_consumer_id"] = session.LastMailboxPreflightConsumerID
|
|
report["current_session_last_mailbox_preflight_resume_from"] = session.LastMailboxPreflightResumeFrom
|
|
report["current_session_last_mailbox_preflight_resume_sequence"] = session.LastMailboxPreflightResumeSequence
|
|
report["current_session_last_mailbox_preflight_after_sequence"] = session.LastMailboxPreflightAfterSequence
|
|
report["current_session_last_mailbox_preflight_available_count"] = session.LastMailboxPreflightAvailableCount
|
|
report["current_session_last_mailbox_preflight_returned_count"] = session.LastMailboxPreflightReturnedCount
|
|
report["current_session_last_mailbox_preflight_skipped_count"] = session.LastMailboxPreflightSkippedCount
|
|
report["current_session_last_mailbox_preflight_first_sequence"] = session.LastMailboxPreflightFirstSequence
|
|
report["current_session_last_mailbox_preflight_last_sequence"] = session.LastMailboxPreflightLastSequence
|
|
report["current_session_last_mailbox_preflight_diagnostic_state"] = session.LastMailboxPreflightDiagnosticState
|
|
report["current_session_last_mailbox_preflight_stale_cursor"] = session.LastMailboxPreflightStaleCursor
|
|
report["current_session_last_mailbox_preflight_missing_dropped_count"] = session.LastMailboxPreflightMissingDropped
|
|
report["current_session_last_mailbox_preflight_recommended_action"] = session.LastMailboxPreflightRecommendedAction
|
|
report["current_session_last_mailbox_preflight_action_hints"] = append([]string(nil), session.LastMailboxPreflightActionHints...)
|
|
report["current_session_last_mailbox_preflight_action_reason"] = session.LastMailboxPreflightActionReason
|
|
report["current_session_last_mailbox_preflight_action_context"] = cloneStringAnyMap(session.LastMailboxPreflightActionContext)
|
|
report["current_session_last_mailbox_preflight_operator_summary"] = session.LastMailboxPreflightOperatorSummary
|
|
report["current_session_last_mailbox_preflight_operator_status"] = session.LastMailboxPreflightOperatorStatus
|
|
report["current_session_last_mailbox_preflight_operator_severity"] = session.LastMailboxPreflightOperatorSeverity
|
|
report["current_session_last_mailbox_preflight_operator_summary_fields"] = cloneStringAnyMap(session.LastMailboxPreflightOperatorFields)
|
|
}
|
|
if !session.LastBackpressureAt.IsZero() {
|
|
report["current_session_last_backpressure_at"] = session.LastBackpressureAt.Format(time.RFC3339Nano)
|
|
report["current_session_last_backpressure_reason"] = session.LastReason
|
|
}
|
|
}
|
|
}
|
|
if s.backpressureCount > 0 {
|
|
report["last_backpressure_at"] = s.lastBackpressureAt
|
|
report["last_backpressure_reason"] = s.lastBackpressureReason
|
|
report["last_rejected_frame_count"] = s.lastRejectedFrameCount
|
|
report["last_rejected_adapter_session_id"] = s.lastRejectedAdapterSessionID
|
|
report["last_rejected_channel_class"] = s.lastRejectedChannelClass
|
|
report["last_rejected_adapter_contract_id"] = s.lastRejectedAdapterContractID
|
|
report["last_rejected_queue_capacity"] = s.lastRejectedQueueCapacity
|
|
report["last_rejected_queue_depth"] = s.lastRejectedQueueDepth
|
|
}
|
|
if s.sessionControlTotal > 0 {
|
|
report["last_session_control"] = s.lastControl
|
|
report["last_controlled_adapter_session_id"] = s.lastControl.AdapterSessionID
|
|
report["last_session_control_action"] = s.lastControl.Action
|
|
report["last_session_control_state"] = s.lastControl.SessionState
|
|
report["last_session_control_at"] = s.lastControl.ControlledAt
|
|
}
|
|
report["adapter_runtime_readiness"] = remoteWorkspaceAdapterRuntimeReadinessLocked(s, currentSession, now.UTC())
|
|
return report
|
|
}
|