154 lines
4.8 KiB
Go
154 lines
4.8 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"github.com/example/remote-access-platform/backend/internal/modules/sessionbroker"
|
|
)
|
|
|
|
type EventProcessor struct {
|
|
client *redis.Client
|
|
broker *sessionbroker.Service
|
|
}
|
|
|
|
func NewEventProcessor(client *redis.Client, broker *sessionbroker.Service) *EventProcessor {
|
|
return &EventProcessor{client: client, broker: broker}
|
|
}
|
|
|
|
func (p *EventProcessor) Run(ctx context.Context) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
result, err := p.client.BLPop(ctx, 5*time.Second, "worker:events").Result()
|
|
if err == redis.Nil {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("consume worker event: %w", err)
|
|
}
|
|
if len(result) != 2 {
|
|
continue
|
|
}
|
|
|
|
var event SessionEvent
|
|
if err := json.Unmarshal([]byte(result[1]), &event); err != nil {
|
|
continue
|
|
}
|
|
if err := p.handleEvent(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *EventProcessor) handleEvent(ctx context.Context, event SessionEvent) error {
|
|
switch event.Type {
|
|
case SessionEventConnected, SessionEventDisplayReady:
|
|
if err := p.broker.HandleWorkerConnected(ctx, event.SessionID); err != nil {
|
|
return err
|
|
}
|
|
if len(event.Payload) > 0 {
|
|
if err := p.broker.UpdateWorkerRenderTelemetry(ctx, event.SessionID, event.Payload); err != nil && !errors.Is(err, sessionbroker.ErrSessionNotFound) {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
case SessionEventHeartbeat:
|
|
return p.broker.HandleWorkerHeartbeat(ctx, event.SessionID)
|
|
case SessionEventRenderReady, SessionEventRenderDirty, SessionEventRenderResized, SessionEventCursorUpdated, SessionEventFrame:
|
|
if len(event.Payload) == 0 {
|
|
return nil
|
|
}
|
|
if correlationID, _ := event.Payload["input_correlation_id"].(string); correlationID != "" {
|
|
slog.Info("worker frame event received",
|
|
"session_id", event.SessionID,
|
|
"worker_id", event.WorkerID,
|
|
"frame_sequence", event.Payload["frame_sequence"],
|
|
"correlation_id", correlationID,
|
|
"worker_frame_captured_at", event.Payload["worker_frame_captured_at"],
|
|
"trace_stage", "backend_frame_receive")
|
|
}
|
|
return p.updateRenderTelemetryWithRetry(ctx, event.SessionID, event.Payload)
|
|
case SessionEventClipboardText:
|
|
if len(event.Payload) == 0 {
|
|
return nil
|
|
}
|
|
slog.Info("worker clipboard event received",
|
|
"session_id", event.SessionID,
|
|
"worker_id", event.WorkerID,
|
|
"origin", event.Payload["origin"],
|
|
"sequence_id", event.Payload["sequence_id"],
|
|
"content_hash", event.Payload["content_hash"])
|
|
return p.broker.UpdateWorkerClipboardText(ctx, event.SessionID, event.Payload)
|
|
case SessionEventFileUploaded:
|
|
slog.Info("worker file upload completed",
|
|
"session_id", event.SessionID,
|
|
"worker_id", event.WorkerID,
|
|
"transfer_id", event.Payload["transfer_id"],
|
|
"file_name", event.Payload["file_name"],
|
|
"file_size", event.Payload["file_size"],
|
|
"content_hash", event.Payload["content_hash"],
|
|
"storage_path", event.Payload["storage_path"])
|
|
return nil
|
|
case SessionEventFileDownloadAvailable, SessionEventFileDownloadChunk, SessionEventFileDownloadProgress,
|
|
SessionEventFileDownloadCompleted, SessionEventFileDownloadFailed, SessionEventFileDownloadBlocked:
|
|
slog.Info("worker file download event received",
|
|
"session_id", event.SessionID,
|
|
"worker_id", event.WorkerID,
|
|
"event_type", event.Type,
|
|
"transfer_id", event.Payload["transfer_id"],
|
|
"file_id", event.Payload["file_id"],
|
|
"file_name", event.Payload["file_name"],
|
|
"status", event.Payload["status"])
|
|
return p.broker.UpdateWorkerFileDownloadEvent(ctx, event.SessionID, event.Type, event.Payload)
|
|
case SessionEventFailed:
|
|
reason, _ := event.Payload["reason"].(string)
|
|
err := p.broker.MarkSessionFailed(ctx, sessionbroker.MarkSessionFailedCommand{
|
|
SessionID: event.SessionID,
|
|
Reason: reason,
|
|
})
|
|
if errors.Is(err, sessionbroker.ErrSessionNotFound) || errors.Is(err, sessionbroker.ErrSessionNotTerminable) {
|
|
return nil
|
|
}
|
|
return err
|
|
case SessionEventTerminated:
|
|
reason, _ := event.Payload["reason"].(string)
|
|
err := p.broker.TerminateSession(ctx, sessionbroker.TerminateSessionCommand{
|
|
SessionID: event.SessionID,
|
|
Reason: reason,
|
|
})
|
|
if errors.Is(err, sessionbroker.ErrSessionNotFound) || errors.Is(err, sessionbroker.ErrSessionNotTerminable) {
|
|
return nil
|
|
}
|
|
return err
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (p *EventProcessor) updateRenderTelemetryWithRetry(ctx context.Context, sessionID string, payload map[string]any) error {
|
|
var lastErr error
|
|
for attempt := 0; attempt < 10; attempt++ {
|
|
err := p.broker.UpdateWorkerRenderTelemetry(ctx, sessionID, payload)
|
|
if err == nil || errors.Is(err, sessionbroker.ErrSessionNotFound) {
|
|
return nil
|
|
}
|
|
lastErr = err
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
return lastErr
|
|
}
|