Files
rdp-proxy/workers/rdp-worker/src/runtime/session_runtime.cpp
T
2026-04-28 22:29:50 +03:00

1611 lines
71 KiB
C++

#include "rdp_worker/runtime/session_runtime.hpp"
#include <algorithm>
#include <array>
#include <chrono>
#include <cstdlib>
#include <cmath>
#include <fstream>
#include <iomanip>
#include <sstream>
#include <vector>
#include "rdp_worker/common/json.hpp"
namespace rdp_worker::runtime {
namespace {
double NormalizeCoordinate(const common::JsonObject& payload, const char* key) {
return std::clamp(common::GetNumber(payload, key).value_or(0.0), 0.0, 1.0);
}
std::optional<double> GetNumberEither(const common::JsonObject& payload, const char* primary, const char* secondary) {
if (auto value = common::GetNumber(payload, primary); value.has_value()) {
return value;
}
return common::GetNumber(payload, secondary);
}
std::optional<bool> GetBoolEither(const common::JsonObject& payload, const char* primary, const char* secondary) {
if (auto value = common::GetBool(payload, primary); value.has_value()) {
return value;
}
return common::GetBool(payload, secondary);
}
bool ClipboardAllowsClientToServer(const std::string& mode) {
return mode == "client_to_server" || mode == "bidirectional";
}
bool FileTransferAllowsClientToServer(const std::string& mode) {
return mode == "client_to_server" || mode == "bidirectional";
}
bool FileTransferAllowsServerToClient(const std::string& mode) {
return mode == "server_to_client" || mode == "bidirectional";
}
std::chrono::milliseconds RenderPublishInterval(const RenderNotification& render) {
if (common::GetBool(render.payload, "interactive_frame").value_or(false)) {
return std::chrono::milliseconds(33);
}
const auto update_kind = common::GetString(render.payload, "frame_update_kind").value_or("full");
if (update_kind == "region") {
return std::chrono::milliseconds(33);
}
return std::chrono::milliseconds(100);
}
std::int64_t UnixMillisecondsNow() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}
std::string Base64Encode(const std::uint8_t* data, std::size_t size) {
static constexpr std::array<char, 64> alphabet{
'A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P',
'Q','R','S','T','U','V','W','X','Y','Z','a','b','c','d','e','f',
'g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v',
'w','x','y','z','0','1','2','3','4','5','6','7','8','9','+','/'
};
std::string output;
output.reserve(((size + 2) / 3) * 4);
for (std::size_t index = 0; index < size; ) {
const std::size_t remaining = size - index;
const std::uint32_t octet_a = data[index++];
const std::uint32_t octet_b = remaining > 1 ? data[index++] : 0;
const std::uint32_t octet_c = remaining > 2 ? data[index++] : 0;
const std::uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;
output.push_back(alphabet[(triple >> 18) & 0x3F]);
output.push_back(alphabet[(triple >> 12) & 0x3F]);
output.push_back(remaining > 1 ? alphabet[(triple >> 6) & 0x3F] : '=');
output.push_back(remaining > 2 ? alphabet[triple & 0x3F] : '=');
}
return output;
}
constexpr std::int64_t kMaxUploadBytes = 25LL * 1024LL * 1024LL;
constexpr std::int64_t kMaxUploadChunkBytes = 256LL * 1024LL;
constexpr std::int64_t kMaxDownloadBytes = 25LL * 1024LL * 1024LL;
constexpr std::int64_t kMaxDownloadChunkBytes = 256LL * 1024LL;
constexpr std::size_t kMaxInputBatchSize = 64;
constexpr std::size_t kMaxDirectEnvelopeQueueSize = 256;
constexpr std::size_t kMaxPendingRenderFrames = 256;
constexpr std::size_t kMaxRenderPublishBatchSize = 16;
constexpr auto kMinFramePublishInterval = std::chrono::milliseconds(100);
constexpr auto kRegionLossFullRepairInterval = std::chrono::milliseconds(500);
constexpr auto kRenderRateLogInterval = std::chrono::seconds(2);
constexpr auto kLeaseRenewInterval = std::chrono::seconds(5);
std::optional<std::vector<std::uint8_t>> DecodeBase64(const std::string& input) {
static constexpr unsigned char kInvalid = 255;
static unsigned char table[256];
static bool initialized = false;
if (!initialized) {
std::fill(std::begin(table), std::end(table), kInvalid);
const std::string alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
for (std::size_t i = 0; i < alphabet.size(); ++i) {
table[static_cast<unsigned char>(alphabet[i])] = static_cast<unsigned char>(i);
}
initialized = true;
}
std::vector<std::uint8_t> out;
int val = 0;
int valb = -8;
for (unsigned char c : input) {
if (c == '=') {
break;
}
if (table[c] == kInvalid) {
return std::nullopt;
}
val = (val << 6) + table[c];
valb += 6;
if (valb >= 0) {
out.push_back(static_cast<std::uint8_t>((val >> valb) & 0xFF));
valb -= 8;
}
}
return out;
}
std::string Fnv1a64Hex(std::uint64_t value) {
std::ostringstream stream;
stream << std::hex << std::setw(16) << std::setfill('0') << value;
return stream.str();
}
void UpdateFnv1a64(std::uint64_t& hash, const std::vector<std::uint8_t>& bytes) {
for (std::uint8_t byte : bytes) {
hash ^= byte;
hash *= 1099511628211ULL;
}
}
void UpdateFnv1a64Bytes(std::uint64_t& hash, const char* data, std::size_t size) {
for (std::size_t index = 0; index < size; ++index) {
hash ^= static_cast<unsigned char>(data[index]);
hash *= 1099511628211ULL;
}
}
void UpdateFnv1a64String(std::uint64_t& hash, const std::string& value) {
UpdateFnv1a64Bytes(hash, value.data(), value.size());
}
std::filesystem::path TransferRoot() {
if (const char* root = std::getenv("RDP_WORKER_TRANSFER_ROOT"); root != nullptr && std::string(root).size() > 0) {
return std::filesystem::path(root);
}
return std::filesystem::path("/tmp/rap-rdp-worker-transfers");
}
bool IsSafeFileName(const std::string& name) {
if (name.empty() || name == "." || name == ".." || name.size() > 255 || name.find("..") != std::string::npos) {
return false;
}
return name.find('/') == std::string::npos && name.find('\\') == std::string::npos && name.find(':') == std::string::npos;
}
bool IsSafePathSegment(const std::string& value) {
return IsSafeFileName(value);
}
bool IsMouseMoveInputEnvelope(const common::JsonObject& envelope) {
if (common::GetString(envelope, "type").value_or("") != "input") {
return false;
}
const common::JsonObject* payload = common::GetObject(envelope, "payload");
if (payload == nullptr) {
return false;
}
return common::GetString(*payload, "kind").value_or("") == "mouse" &&
common::GetString(*payload, "action").value_or("") == "move";
}
bool IsFrameRenderNotification(const RenderNotification& notification) {
return notification.type == "session_frame";
}
bool IsFullFrameRenderNotification(const RenderNotification& notification) {
return common::GetString(notification.payload, "frame_update_kind").value_or("full") == "full";
}
bool IsRegionFrameRenderNotification(const RenderNotification& notification) {
return common::GetString(notification.payload, "frame_update_kind").value_or("full") == "region";
}
bool IsTakeoverDirectEvent(const WorkerEvent& event) {
return event.type == "session_taken_over";
}
std::filesystem::path SessionTransferRoot(const std::string& session_id) {
return TransferRoot() / session_id;
}
std::filesystem::path VisibleTransferDirectory(const std::string& session_id) {
return SessionTransferRoot(session_id) / "visible";
}
std::filesystem::path OutboundDownloadDirectory(const std::string& session_id) {
return VisibleTransferDirectory(session_id) / "ToClient";
}
bool IsIgnoredOutboundFileName(const std::string& name) {
return name.empty() ||
name.front() == '.' ||
name.rfind("~$", 0) == 0 ||
(name.size() >= 4 && name.substr(name.size() - 4) == ".tmp") ||
(name.size() >= 5 && name.substr(name.size() - 5) == ".part");
}
bool PathExistsOrIsSymlink(const std::filesystem::path& path) {
std::error_code ec;
const auto status = std::filesystem::symlink_status(path, ec);
return !ec && status.type() != std::filesystem::file_type::not_found;
}
bool IsSymlinkPath(const std::filesystem::path& path) {
std::error_code ec;
return std::filesystem::is_symlink(std::filesystem::symlink_status(path, ec));
}
std::optional<std::pair<std::string, std::uint64_t>> ComputeFileFnv1a64(const std::filesystem::path& path, std::int64_t max_bytes) {
std::ifstream input(path, std::ios::binary);
if (!input.good()) {
return std::nullopt;
}
std::uint64_t hash = 1469598103934665603ULL;
std::array<char, 64 * 1024> buffer{};
std::int64_t total = 0;
while (input.good()) {
input.read(buffer.data(), static_cast<std::streamsize>(buffer.size()));
const auto read = input.gcount();
if (read <= 0) {
break;
}
total += static_cast<std::int64_t>(read);
if (total > max_bytes) {
return std::nullopt;
}
UpdateFnv1a64Bytes(hash, buffer.data(), static_cast<std::size_t>(read));
}
return std::make_pair(Fnv1a64Hex(hash), hash);
}
} // namespace
SessionRuntime::SessionRuntime(Assignment assignment,
std::shared_ptr<coordination::ControlPlane> control_plane,
std::shared_ptr<common::Logger> logger)
: assignment_(std::move(assignment)),
control_plane_(std::move(control_plane)),
logger_(std::move(logger)),
rdp_adapter_(logger_),
stop_requested_(false),
attached_(true) {}
SessionRuntime::~SessionRuntime() {
Stop(true, "shutdown");
}
void SessionRuntime::Start() {
thread_ = std::thread(&SessionRuntime::Run, this);
}
void SessionRuntime::ApplyAssignment(const Assignment& assignment) {
std::optional<std::string> takeover_of = assignment.takeover_of;
{
std::lock_guard<std::mutex> lock(mutex_);
assignment_ = assignment;
attached_.store(true);
if (assignment.state == SessionState::kActive || assignment.state == SessionState::kReconnecting) {
(void)PrepareVisibleTransferDirectory(assignment.session_id);
}
}
if (takeover_of.has_value() && !takeover_of->empty()) {
DispatchDirectEvent(WorkerEvent{
"session_taken_over",
assignment.session_id,
assignment.worker_id,
"takeover",
rdp_adapter_.DesktopWidth(),
rdp_adapter_.DesktopHeight(),
common::JsonObject{
{"state", "taken_over"},
{"attachment_id", assignment.attachment_id},
{"takeover_of", *takeover_of},
},
});
}
}
void SessionRuntime::Stop(bool terminate, const std::string& reason) {
stop_requested_.store(true);
if (thread_.joinable()) {
thread_.join();
}
rdp_adapter_.Disconnect(terminate);
if (terminate) {
PublishEvent("session_terminated", reason);
CleanupSessionTransferDirectory(SessionId());
}
}
std::string SessionRuntime::SessionId() const {
std::lock_guard<std::mutex> lock(mutex_);
return assignment_.session_id;
}
Assignment SessionRuntime::Snapshot() const {
std::lock_guard<std::mutex> lock(mutex_);
return assignment_;
}
bool SessionRuntime::EnqueueDirectEnvelope(common::JsonObject envelope) {
std::lock_guard<std::mutex> lock(mutex_);
if (direct_envelopes_.size() >= kMaxDirectEnvelopeQueueSize) {
if (IsMouseMoveInputEnvelope(envelope)) {
for (auto iterator = direct_envelopes_.rbegin(); iterator != direct_envelopes_.rend(); ++iterator) {
if (IsMouseMoveInputEnvelope(*iterator)) {
*iterator = std::move(envelope);
return true;
}
}
}
return false;
}
direct_envelopes_.push_back(std::move(envelope));
return true;
}
void SessionRuntime::AddDirectEventSink(std::weak_ptr<DirectEventSink> sink) {
Assignment snapshot;
int width = 0;
int height = 0;
std::vector<WorkerEvent> available_downloads;
{
std::lock_guard<std::mutex> lock(mutex_);
direct_event_sinks_.push_back(sink);
snapshot = assignment_;
width = rdp_adapter_.DesktopWidth();
height = rdp_adapter_.DesktopHeight();
for (const auto& [_, candidate] : download_candidates_) {
if (!candidate.available_published) {
continue;
}
available_downloads.push_back(WorkerEvent{
"session_file_download_available",
snapshot.session_id,
snapshot.worker_id,
"",
width,
height,
common::JsonObject{
{"direction", "server_to_client"},
{"file_id", candidate.file_id},
{"file_name", candidate.file_name},
{"file_size", static_cast<double>(candidate.file_size)},
{"content_hash", candidate.content_hash},
{"sequence", static_cast<double>(++file_download_event_sequence_)},
{"status", "available"},
{"visible_drive_name", "RAP_Transfers"},
{"visible_drive_path", "ToClient"},
},
});
}
}
if (auto locked = sink.lock()) {
locked->EnqueueEvent(WorkerEvent{
"session_connected",
snapshot.session_id,
snapshot.worker_id,
"",
width,
height,
common::JsonObject{
{"state", snapshot.state == SessionState::kActive ? "active" : "reconnecting"},
{"attachment_id", snapshot.attachment_id},
{"render_quality_profile", snapshot.connection.render_quality_profile},
{"render_state", rdp_adapter_.IsConnected() ? "ready" : "connecting"},
},
});
std::optional<WorkerEvent> cached_frame;
{
std::lock_guard<std::mutex> lock(mutex_);
if (last_direct_render_frame_.has_value() &&
last_direct_render_frame_->session_id == snapshot.session_id &&
snapshot.state == SessionState::kActive) {
cached_frame = last_direct_render_frame_;
}
}
if (cached_frame.has_value()) {
cached_frame->payload["frame_update_kind"] = "full";
cached_frame->payload["direct_attach_baseline"] = true;
locked->EnqueueEvent(*cached_frame);
logger_->Info("direct data-plane baseline frame replayed on attach session=" + snapshot.session_id);
} else {
direct_attach_baseline_requested_.store(true);
logger_->Info("direct data-plane baseline frame unavailable on attach; full frame capture requested session=" +
snapshot.session_id);
}
for (const auto& event : available_downloads) {
locked->EnqueueEvent(event);
}
}
}
void SessionRuntime::RequestDirectFullFrameRepair(std::string reason) {
{
std::lock_guard<std::mutex> lock(mutex_);
direct_full_repair_requested_ = true;
direct_full_repair_reason_ = std::move(reason);
}
logger_->Info("render.region_loss direct full repair requested session=" + SessionId());
}
void SessionRuntime::Run() {
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
const auto visible_dir = PrepareVisibleTransferDirectory(snapshot.session_id);
if (visible_dir.empty()) {
PublishEvent("session_failed", "transfer_visible_directory_unavailable");
return;
}
snapshot.connection.redirected_drive_name = "RAP_Transfers";
snapshot.connection.redirected_drive_path = visible_dir.string();
if (!rdp_adapter_.Start(snapshot.connection)) {
PublishEvent("session_failed", "rdp_connect_failed");
CleanupSessionTransferDirectory(snapshot.session_id);
return;
}
for (auto render = rdp_adapter_.PopRenderNotification(); render.has_value(); render = rdp_adapter_.PopRenderNotification()) {
std::string keys;
for (const auto& [key, _] : render->payload) {
if (!keys.empty()) {
keys += ",";
}
keys += key;
}
logger_->Info("publishing initial render telemetry " + render->type + " for session " + snapshot.session_id + " payload_keys=" + std::to_string(render->payload.size()) + " keys=" + keys);
if (IsFrameRenderNotification(*render)) {
++render_frames_seen_since_log_;
if (IsFullFrameRenderNotification(*render)) {
pending_render_frames_.clear();
}
pending_render_frames_.push_back(std::move(*render));
} else {
PublishEvent(render->type, "", render->payload);
}
}
PublishDirectAttachBaselineIfRequested(snapshot.session_id);
DrainAndPublishRenderNotifications(snapshot.session_id);
PublishEvent("session_connected");
PublishEvent("session_display_ready");
auto last_heartbeat = std::chrono::steady_clock::now();
auto last_lease_renewal = std::chrono::steady_clock::now() - kLeaseRenewInterval;
while (!stop_requested_.load()) {
DrainAndHandleEnvelopes(snapshot.session_id);
if (!rdp_adapter_.PumpEvents(std::chrono::milliseconds(20))) {
PublishEvent("session_failed", "rdp_event_loop_failed");
CleanupSessionTransferDirectory(snapshot.session_id);
return;
}
DrainAndHandleEnvelopes(snapshot.session_id);
PublishDirectAttachBaselineIfRequested(snapshot.session_id);
DrainAndPublishRenderNotifications(snapshot.session_id);
for (auto clipboard = rdp_adapter_.PopClipboardNotification(); clipboard.has_value(); clipboard = rdp_adapter_.PopClipboardNotification()) {
logger_->Info("publishing clipboard notification for session " + snapshot.session_id +
" origin=" + clipboard->origin +
" sequence_id=" + std::to_string(clipboard->sequence_id) +
" content_hash=" + clipboard->content_hash +
" text_bytes=" + std::to_string(clipboard->text.size()));
PublishEvent("session_clipboard_text", "", common::JsonObject{
{"direction", "server_to_client"},
{"text", clipboard->text},
{"sequence_id", static_cast<double>(clipboard->sequence_id)},
{"origin", clipboard->origin},
{"content_hash", clipboard->content_hash},
});
}
const auto now = std::chrono::steady_clock::now();
if (last_download_scan_at_.time_since_epoch().count() == 0 ||
now - last_download_scan_at_ >= std::chrono::seconds(1)) {
ScanOutboundDownloadDirectory(snapshot.session_id);
last_download_scan_at_ = now;
}
if (now - last_lease_renewal >= kLeaseRenewInterval) {
if (auto lease = control_plane_->GetLeaseBySession(snapshot.session_id); lease.has_value()) {
control_plane_->RenewLease(*lease);
}
last_lease_renewal = now;
}
if (now - last_heartbeat >= std::chrono::seconds(5)) {
PublishEvent("session_heartbeat");
last_heartbeat = now;
}
if (!rdp_adapter_.IsConnected()) {
PublishEvent("session_failed", "rdp_transport_disconnected");
CleanupSessionTransferDirectory(snapshot.session_id);
return;
}
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
}
}
void SessionRuntime::DrainAndHandleEnvelopes(const std::string& session_id) {
auto direct_envelopes = DrainDirectEnvelopes(kMaxInputBatchSize);
if (!direct_envelopes.empty()) {
bool has_non_move = false;
for (const auto& envelope : direct_envelopes) {
const auto type = common::GetString(envelope, "type").value_or("");
const common::JsonObject* payload = common::GetObject(envelope, "payload");
const auto kind = payload == nullptr ? "" : common::GetString(*payload, "kind").value_or("");
const auto action = payload == nullptr ? "" : common::GetString(*payload, "action").value_or("");
if (type != "input" || kind != "mouse" || action != "move") {
has_non_move = true;
break;
}
}
if (has_non_move) {
logger_->Info("direct data-plane input batch session=" + session_id +
" drained=" + std::to_string(direct_envelopes.size()));
}
HandleEnvelopeBatch(direct_envelopes);
}
auto envelopes = control_plane_->DrainSessionEnvelopes(session_id, kMaxInputBatchSize);
if (envelopes.empty()) {
return;
}
const auto queue_length_after = control_plane_->SessionEnvelopeQueueLength(session_id);
const auto queue_length_before = queue_length_after + static_cast<int64_t>(envelopes.size());
logger_->Info("input.trace worker_input_batch session=" + session_id +
" drained=" + std::to_string(envelopes.size()) +
" queue_length_before=" + std::to_string(queue_length_before) +
" queue_length_after=" + std::to_string(queue_length_after));
HandleEnvelopeBatch(envelopes);
}
std::vector<common::JsonObject> SessionRuntime::DrainDirectEnvelopes(std::size_t max_count) {
std::lock_guard<std::mutex> lock(mutex_);
std::vector<common::JsonObject> output;
output.reserve(std::min(max_count, direct_envelopes_.size()));
while (!direct_envelopes_.empty() && output.size() < max_count) {
output.push_back(std::move(direct_envelopes_.front()));
direct_envelopes_.pop_front();
}
return output;
}
void SessionRuntime::HandleEnvelopeBatch(const std::vector<common::JsonObject>& envelopes) {
std::optional<common::JsonObject> latest_mouse_move;
std::size_t coalesced_mouse_moves = 0;
std::size_t applied_non_move = 0;
for (const auto& envelope : envelopes) {
if (IsMouseMoveInputEnvelope(envelope)) {
latest_mouse_move = envelope;
++coalesced_mouse_moves;
continue;
}
HandleEnvelope(envelope);
++applied_non_move;
}
if (latest_mouse_move.has_value()) {
HandleEnvelope(*latest_mouse_move);
}
if (coalesced_mouse_moves > 1 || envelopes.size() > 1) {
logger_->Info("input.trace worker_input_batch_applied session=" + SessionId() +
" total=" + std::to_string(envelopes.size()) +
" non_move_applied=" + std::to_string(applied_non_move) +
" mouse_moves_seen=" + std::to_string(coalesced_mouse_moves) +
" mouse_moves_applied=" + std::to_string(latest_mouse_move.has_value() ? 1 : 0));
}
}
void SessionRuntime::PublishDirectAttachBaselineIfRequested(const std::string& session_id) {
if (!direct_attach_baseline_requested_.exchange(false)) {
return;
}
if (!HasCurrentDirectEventSink()) {
logger_->Info("direct data-plane baseline capture skipped because no direct viewer is attached session=" +
session_id);
return;
}
auto render = rdp_adapter_.CaptureFullFrameNotification("ready", "direct_attach_baseline");
if (!render.has_value()) {
logger_->Warn("direct data-plane baseline full frame capture unavailable on attach session=" + session_id);
return;
}
render->payload["direct_attach_baseline"] = true;
render->payload["frame_update_kind"] = "full";
render->payload["frame_data_is_region"] = false;
render->payload["interactive_frame"] = true;
const std::size_t raw_frame_bytes = render->raw_frame_bytes.size();
render->payload["raw_frame_bytes"] = static_cast<double>(raw_frame_bytes);
render->payload["base64_compat_bytes"] = 0;
render->payload["binary_direct_bytes"] = static_cast<double>(raw_frame_bytes);
render->payload["encode_skipped_for_direct"] = true;
render->payload["fallback_compat_frame_built"] = false;
render->payload["fallback_compat_frame_skipped_for_direct"] = true;
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
WorkerEvent direct_event;
direct_event.type = render->type;
direct_event.session_id = snapshot.session_id;
direct_event.worker_id = snapshot.worker_id;
direct_event.width = rdp_adapter_.DesktopWidth();
direct_event.height = rdp_adapter_.DesktopHeight();
direct_event.payload = render->payload;
direct_event.raw_frame_bytes = std::move(render->raw_frame_bytes);
{
std::lock_guard<std::mutex> lock(mutex_);
last_direct_render_frame_ = direct_event;
}
logger_->Info("direct data-plane baseline full frame captured on attach session=" + snapshot.session_id +
" raw_frame_bytes=" + std::to_string(raw_frame_bytes) +
" width=" + std::to_string(direct_event.width) +
" height=" + std::to_string(direct_event.height));
DispatchDirectEvent(direct_event);
}
void SessionRuntime::DrainAndPublishRenderNotifications(const std::string& session_id) {
std::vector<RenderNotification> non_frame_notifications;
bool saw_interactive_frame = false;
bool dropped_region_requires_full_repair = false;
for (auto render = rdp_adapter_.PopRenderNotification(); render.has_value(); render = rdp_adapter_.PopRenderNotification()) {
if (IsFrameRenderNotification(*render)) {
++render_frames_seen_since_log_;
const bool incoming_full_frame = IsFullFrameRenderNotification(*render);
const bool incoming_region_frame = IsRegionFrameRenderNotification(*render);
saw_interactive_frame = saw_interactive_frame || common::GetBool(render->payload, "interactive_frame").value_or(false);
if (incoming_full_frame) {
if (!pending_render_frames_.empty()) {
render_frames_dropped_since_log_ += pending_render_frames_.size();
logger_->Info("render.region_queue session runtime cleared pending frames for full frame session=" +
session_id +
" cleared=" + std::to_string(pending_render_frames_.size()));
}
pending_render_frames_.clear();
pending_render_frames_.push_back(std::move(*render));
} else if (incoming_region_frame) {
if (pending_render_frames_.size() >= kMaxPendingRenderFrames) {
const auto dropped = pending_render_frames_.size() + 1;
render_frames_dropped_since_log_ += dropped;
pending_render_frames_.clear();
dropped_region_requires_full_repair = true;
logger_->Warn("render.region_queue session runtime overflow session=" + session_id +
" dropped=" + std::to_string(dropped) +
" repair_requested=true");
} else {
pending_render_frames_.push_back(std::move(*render));
}
} else {
if (!pending_render_frames_.empty()) {
render_frames_dropped_since_log_ += pending_render_frames_.size();
}
pending_render_frames_.clear();
pending_render_frames_.push_back(std::move(*render));
}
} else {
non_frame_notifications.push_back(std::move(*render));
}
}
const auto now = std::chrono::steady_clock::now();
bool direct_full_repair_requested = false;
std::string direct_full_repair_reason;
{
std::lock_guard<std::mutex> lock(mutex_);
direct_full_repair_requested = direct_full_repair_requested_;
if (direct_full_repair_requested) {
direct_full_repair_requested_ = false;
direct_full_repair_reason = direct_full_repair_reason_;
direct_full_repair_reason_.clear();
}
}
const bool full_repair_needed = dropped_region_requires_full_repair || direct_full_repair_requested;
if (full_repair_needed && HasCurrentDirectEventSink() &&
(last_region_loss_full_repair_at_.time_since_epoch().count() == 0 ||
now - last_region_loss_full_repair_at_ >= kRegionLossFullRepairInterval)) {
if (auto repair_frame = rdp_adapter_.CaptureFullFrameNotification("ready", "region_loss_repair"); repair_frame.has_value()) {
repair_frame->payload["frame_update_kind"] = "full";
repair_frame->payload["frame_data_is_region"] = false;
repair_frame->payload["region_loss_repair"] = true;
repair_frame->payload["interactive_frame"] = true;
repair_frame->payload["region_loss_repair_reason"] = direct_full_repair_requested
? direct_full_repair_reason
: "session_runtime_region_coalesce";
if (!pending_render_frames_.empty()) {
render_frames_dropped_since_log_ += pending_render_frames_.size();
}
pending_render_frames_.clear();
pending_render_frames_.push_back(std::move(*repair_frame));
last_region_loss_full_repair_at_ = now;
logger_->Info("render.region_loss full repair captured session=" + session_id +
" reason=" + (direct_full_repair_requested ? direct_full_repair_reason : std::string("session_runtime_region_coalesce")) +
" raw_frame_bytes=" + std::to_string(pending_render_frames_.back().raw_frame_bytes.size()));
} else {
logger_->Warn("render.region_loss full repair capture unavailable session=" + session_id);
}
} else if (full_repair_needed && HasCurrentDirectEventSink()) {
std::lock_guard<std::mutex> lock(mutex_);
direct_full_repair_requested_ = true;
direct_full_repair_reason_ = direct_full_repair_requested
? direct_full_repair_reason
: "session_runtime_region_coalesce_throttled";
logger_->Info("render.region_loss full repair deferred by throttle session=" + session_id +
" reason=" + direct_full_repair_reason_);
}
if (saw_interactive_frame && !pending_render_frames_.empty()) {
for (auto& frame : pending_render_frames_) {
if (!common::GetBool(frame.payload, "interactive_frame").value_or(false)) {
frame.payload["interactive_frame"] = true;
frame.payload["interactive_reason"] = "ordered_after_interactive";
}
}
}
for (auto& render : non_frame_notifications) {
std::string keys;
for (const auto& [key, _] : render.payload) {
if (!keys.empty()) {
keys += ",";
}
keys += key;
}
logger_->Info("publishing render telemetry " + render.type + " for session " + session_id + " payload_keys=" + std::to_string(render.payload.size()) + " keys=" + keys);
PublishEvent(render.type, "", render.payload);
}
std::size_t published_this_drain = 0;
while (!pending_render_frames_.empty() && published_this_drain < kMaxRenderPublishBatchSize) {
const bool pending_frame_interactive =
common::GetBool(pending_render_frames_.front().payload, "interactive_frame").value_or(false);
const auto publish_interval = RenderPublishInterval(pending_render_frames_.front());
if (!pending_frame_interactive &&
last_frame_published_at_.time_since_epoch().count() != 0 &&
now - last_frame_published_at_ < publish_interval &&
published_this_drain == 0) {
break;
}
auto render = std::move(pending_render_frames_.front());
pending_render_frames_.pop_front();
if (!last_input_correlation_id_.empty()) {
render.payload["input_correlation_id"] = last_input_correlation_id_;
render.payload["worker_frame_captured_at"] = std::to_string(UnixMillisecondsNow());
logger_->Info("input.trace worker_frame_after_input correlation_id=" + last_input_correlation_id_ +
" session=" + session_id +
" frame_sequence=" + std::to_string(static_cast<int>(common::GetNumber(render.payload, "frame_sequence").value_or(0))) +
" worker_frame_captured_at=" + std::to_string(UnixMillisecondsNow()));
last_input_correlation_id_.clear();
}
++render_frames_published_since_log_;
++published_this_drain;
last_frame_published_at_ = now;
const std::size_t raw_frame_bytes = render.raw_frame_bytes.size();
const bool has_direct_viewer = HasCurrentDirectEventSink();
const std::string compat_frame = has_direct_viewer || raw_frame_bytes == 0
? std::string{}
: Base64Encode(render.raw_frame_bytes.data(), raw_frame_bytes);
render.payload["raw_frame_bytes"] = static_cast<double>(raw_frame_bytes);
render.payload["base64_compat_bytes"] = static_cast<double>(compat_frame.size());
render.payload["binary_direct_bytes"] = static_cast<double>(raw_frame_bytes);
render.payload["encode_skipped_for_direct"] = true;
render.payload["fallback_compat_frame_built"] = !compat_frame.empty();
render.payload["fallback_compat_frame_skipped_for_direct"] = has_direct_viewer;
logger_->Info("publishing render telemetry " + render.type + " for session " + session_id +
" frame_sequence=" + std::to_string(static_cast<int>(common::GetNumber(render.payload, "frame_sequence").value_or(0))) +
" interactive=" + (pending_frame_interactive ? std::string("true") : std::string("false")) +
" publish_interval_ms=" + std::to_string(publish_interval.count()) +
" raw_frame_bytes=" + std::to_string(raw_frame_bytes) +
" base64_compat_bytes=" + std::to_string(compat_frame.size()) +
" binary_direct_bytes=" + std::to_string(raw_frame_bytes) +
" encode_skipped_for_direct=true" +
" fallback_compat_frame_built=" + (!compat_frame.empty() ? std::string("true") : std::string("false")) +
" fallback_compat_frame_skipped_for_direct=" + (has_direct_viewer ? std::string("true") : std::string("false")));
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
WorkerEvent direct_event;
direct_event.type = render.type;
direct_event.session_id = snapshot.session_id;
direct_event.worker_id = snapshot.worker_id;
direct_event.width = rdp_adapter_.DesktopWidth();
direct_event.height = rdp_adapter_.DesktopHeight();
direct_event.payload = render.payload;
direct_event.raw_frame_bytes = render.raw_frame_bytes;
const bool direct_event_is_full_frame =
common::GetString(direct_event.payload, "frame_update_kind").value_or("full") == "full";
if (direct_event_is_full_frame) {
std::lock_guard<std::mutex> lock(mutex_);
last_direct_render_frame_ = direct_event;
}
DispatchDirectEvent(direct_event);
if (!has_direct_viewer) {
WorkerEvent compat_event = direct_event;
compat_event.raw_frame_bytes.clear();
compat_event.payload["frame_data"] = compat_frame;
control_plane_->PublishEvent(compat_event);
}
}
if (last_render_rate_logged_at_.time_since_epoch().count() == 0) {
last_render_rate_logged_at_ = now;
} else if (now - last_render_rate_logged_at_ >= kRenderRateLogInterval) {
const double seconds = std::max(0.001, std::chrono::duration<double>(now - last_render_rate_logged_at_).count());
logger_->Info("render.frame rate session=" + session_id +
" seen_per_second=" + std::to_string(static_cast<double>(render_frames_seen_since_log_) / seconds) +
" published_per_second=" + std::to_string(static_cast<double>(render_frames_published_since_log_) / seconds) +
" dropped_per_second=" + std::to_string(static_cast<double>(render_frames_dropped_since_log_) / seconds) +
" pending=" + std::to_string(pending_render_frames_.size()));
render_frames_seen_since_log_ = 0;
render_frames_published_since_log_ = 0;
render_frames_dropped_since_log_ = 0;
last_render_rate_logged_at_ = now;
}
}
void SessionRuntime::HandleEnvelope(const common::JsonObject& envelope) {
const auto type = common::GetString(envelope, "type").value_or("");
const common::JsonObject* payload = common::GetObject(envelope, "payload");
if (payload == nullptr) {
return;
}
if (auto attachment_id = common::GetString(*payload, "attachment_id"); attachment_id.has_value() && !attachment_id->empty()) {
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
if (*attachment_id != snapshot.attachment_id) {
logger_->Warn("direct data-plane envelope rejected because attachment is no longer current session=" +
snapshot.session_id +
" envelope_attachment=" + *attachment_id +
" current_attachment=" + snapshot.attachment_id +
" type=" + type);
return;
}
}
if (type == "control") {
const auto action = common::GetString(*payload, "action").value_or("");
if (action == "detach") {
attached_.store(false);
CleanupVisibleTransferDirectory(SessionId());
logger_->Info("detached controller from session " + SessionId());
return;
}
if (action == "terminate") {
stop_requested_.store(true);
rdp_adapter_.Disconnect(true);
}
return;
}
if (type == "clipboard") {
if (!attached_.load() || !rdp_adapter_.IsConnected()) {
logger_->Warn("clipboard ignored because session is not active/attached " + SessionId());
return;
}
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
if (!ClipboardAllowsClientToServer(snapshot.policy.clipboard_mode)) {
logger_->Warn("clipboard client_to_server blocked by worker policy mode=" + snapshot.policy.clipboard_mode + " session=" + SessionId());
return;
}
const auto direction = common::GetString(*payload, "direction").value_or("client_to_server");
const auto text = common::GetString(*payload, "text").value_or("");
const auto content_hash = common::GetString(*payload, "content_hash").value_or("");
if (direction != "client_to_server" || text.empty()) {
return;
}
if (rdp_adapter_.SetClipboardText(text)) {
logger_->Info("forwarded text clipboard to FreeRDP cliprdr boundary for session " + SessionId() + " content_hash=" + content_hash);
} else {
logger_->Warn("failed to forward text clipboard to FreeRDP boundary for session " + SessionId());
}
return;
}
if (type == "file_upload") {
HandleFileUpload(*payload);
return;
}
if (type == "file_download") {
HandleFileDownload(*payload);
return;
}
if (type != "input" || !attached_.load() || !rdp_adapter_.IsConnected()) {
if (type == "input") {
logger_->Warn("worker input envelope ignored for session " + SessionId() +
" attached=" + (attached_.load() ? std::string("true") : std::string("false")) +
" rdp_connected=" + (rdp_adapter_.IsConnected() ? std::string("true") : std::string("false")));
}
return;
}
const auto kind = common::GetString(*payload, "kind").value_or("");
const auto action = common::GetString(*payload, "action").value_or("");
const auto correlation_id = common::GetString(*payload, "correlation_id").value_or("");
const bool is_mouse_move = kind == "mouse" && action == "move";
if (!is_mouse_move) {
logger_->Info("worker input envelope received for session " + SessionId() +
" kind=" + kind +
" action=" + action +
" correlation_id=" + correlation_id +
" trace_stage=worker_receive" +
" worker_received_at=" + std::to_string(UnixMillisecondsNow()));
}
if (kind == "focus") {
const bool focused = common::GetBool(*payload, "focused").value_or(false);
if (!rdp_adapter_.SendFocusEvent(focused)) {
logger_->Warn("failed to forward focus event for session " + SessionId());
} else if (!focus_forward_logged_) {
focus_forward_logged_ = true;
logger_->Info("forwarded focus input for session " + SessionId());
}
return;
}
if (kind == "keyboard") {
const auto scan_code_value = GetNumberEither(*payload, "scan_code", "scanCode");
const uint16_t scan_code = static_cast<uint16_t>(scan_code_value.value_or(0));
const bool extended = GetBoolEither(*payload, "is_extended", "isExtended").value_or(false);
if (!scan_code_value.has_value() || scan_code == 0) {
logger_->Warn("keyboard event missing scanCode for session " + SessionId());
return;
}
const bool key_down = action == "key_down";
if (action == "key_down" || action == "key_up") {
const bool applied = rdp_adapter_.SendKeyboardInput(scan_code, key_down, extended);
if (!applied) {
logger_->Warn("failed to forward keyboard event for session " + SessionId() +
" action=" + action +
" scan_code=" + std::to_string(scan_code) +
" extended=" + (extended ? std::string("true") : std::string("false")));
} else {
last_input_correlation_id_ = correlation_id;
last_input_applied_at_ = std::chrono::steady_clock::now();
rdp_adapter_.MarkInputAppliedForGraphicsTrace(correlation_id);
logger_->Info("input.trace worker_apply correlation_id=" + correlation_id +
" session=" + SessionId() +
" kind=keyboard action=" + action +
" scan_code=" + std::to_string(scan_code) +
" applied_at=" + std::to_string(UnixMillisecondsNow()));
}
if (applied && !keyboard_forward_logged_) {
keyboard_forward_logged_ = true;
logger_->Info("forwarded keyboard input for session " + SessionId());
} else if (applied) {
logger_->Info("applied keyboard input to FreeRDP for session " + SessionId() +
" action=" + action +
" scan_code=" + std::to_string(scan_code) +
" extended=" + (extended ? std::string("true") : std::string("false")));
}
}
return;
}
if (kind == "mouse") {
const double normalized_x = std::clamp(GetNumberEither(*payload, "normalized_x", "normalizedX").value_or(0.0), 0.0, 1.0);
const double normalized_y = std::clamp(GetNumberEither(*payload, "normalized_y", "normalizedY").value_or(0.0), 0.0, 1.0);
bool forwarded = true;
if (action == "move") {
forwarded = rdp_adapter_.SendMouseMove(normalized_x, normalized_y);
} else if (action == "button_down" || action == "button_up") {
const auto button = common::GetString(*payload, "button").value_or("");
forwarded = rdp_adapter_.SendMouseButton(button, action == "button_down", normalized_x, normalized_y);
} else if (action == "wheel") {
const int delta = static_cast<int>(GetNumberEither(*payload, "wheel_delta", "wheelDelta").value_or(0));
const bool horizontal = GetBoolEither(*payload, "is_horizontal_wheel", "isHorizontalWheel").value_or(false);
if (delta == 0) {
return;
}
forwarded = rdp_adapter_.SendMouseWheel(delta, horizontal, normalized_x, normalized_y);
}
if (!forwarded) {
logger_->Warn("failed to forward mouse event for session " + SessionId() +
" action=" + action +
" x=" + std::to_string(normalized_x) +
" y=" + std::to_string(normalized_y));
} else if (action != "move") {
last_input_correlation_id_ = correlation_id;
last_input_applied_at_ = std::chrono::steady_clock::now();
rdp_adapter_.MarkInputAppliedForGraphicsTrace(correlation_id);
logger_->Info("input.trace worker_apply correlation_id=" + correlation_id +
" session=" + SessionId() +
" kind=mouse action=" + action +
" x=" + std::to_string(normalized_x) +
" y=" + std::to_string(normalized_y) +
" applied_at=" + std::to_string(UnixMillisecondsNow()));
}
if (forwarded && !mouse_forward_logged_) {
mouse_forward_logged_ = true;
logger_->Info("forwarded mouse input for session " + SessionId());
} else if (forwarded && action != "move") {
logger_->Info("applied mouse input to FreeRDP for session " + SessionId() +
" action=" + action +
" x=" + std::to_string(normalized_x) +
" y=" + std::to_string(normalized_y));
}
}
}
void SessionRuntime::HandleFileUpload(const common::JsonObject& payload) {
const auto action = common::GetString(payload, "action").value_or("");
const auto transfer_id = common::GetString(payload, "transfer_id").value_or("");
if (transfer_id.empty()) {
logger_->Warn("file upload ignored because transfer_id is empty for session " + SessionId());
return;
}
if (action == "cancel") {
auto it = uploads_.find(transfer_id);
if (it != uploads_.end()) {
std::error_code ignored;
std::filesystem::remove(it->second.temp_path, ignored);
uploads_.erase(it);
}
logger_->Info("file upload cancelled for session " + SessionId() + " transfer_id=" + transfer_id);
return;
}
if (!attached_.load() || !rdp_adapter_.IsConnected()) {
logger_->Warn("file upload ignored because session is not active/attached " + SessionId());
return;
}
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
if (!FileTransferAllowsClientToServer(snapshot.policy.file_transfer_mode)) {
logger_->Warn("file upload client_to_server blocked by worker policy mode=" + snapshot.policy.file_transfer_mode + " session=" + SessionId());
return;
}
if (action == "start") {
const auto file_name = common::GetString(payload, "file_name").value_or("");
const auto file_size = static_cast<std::int64_t>(common::GetNumber(payload, "file_size").value_or(0));
const auto total_chunks = static_cast<std::int64_t>(common::GetNumber(payload, "total_chunks").value_or(0));
const auto content_hash = common::GetString(payload, "content_hash").value_or("");
if (!IsSafeFileName(file_name) || file_size <= 0 || file_size > kMaxUploadBytes || total_chunks <= 0) {
logger_->Warn("file upload start rejected by worker validation for session " + SessionId() + " transfer_id=" + transfer_id);
return;
}
if (uploads_.find(transfer_id) != uploads_.end()) {
logger_->Warn("file upload start rejected duplicate transfer_id for session " + SessionId() + " transfer_id=" + transfer_id);
return;
}
const auto transfer_dir = PrepareVisibleTransferDirectory(snapshot.session_id);
if (transfer_dir.empty()) {
logger_->Warn("file upload start rejected because visible transfer directory is unavailable for session " + SessionId());
return;
}
std::error_code ec;
std::filesystem::create_directories(transfer_dir, ec);
if (ec) {
logger_->Warn("file upload failed to create transfer directory " + transfer_dir.string() + " error=" + ec.message());
return;
}
FileUploadState state;
state.transfer_id = transfer_id;
state.file_name = file_name;
state.file_size = file_size;
state.total_chunks = total_chunks;
state.expected_hash = content_hash;
state.temp_path = transfer_dir / ("." + transfer_id + "." + file_name + ".part");
state.final_path = transfer_dir / file_name;
if (PathExistsOrIsSymlink(state.final_path) || PathExistsOrIsSymlink(state.temp_path) ||
IsSymlinkPath(transfer_dir) || IsSymlinkPath(state.final_path) || IsSymlinkPath(state.temp_path)) {
logger_->Warn("file upload start rejected to avoid overwrite for session " + SessionId() + " path=" + state.final_path.string());
return;
}
std::ofstream create(state.temp_path, std::ios::binary | std::ios::trunc);
if (!create.good()) {
logger_->Warn("file upload failed to create temp file for session " + SessionId() + " path=" + state.temp_path.string());
return;
}
uploads_[transfer_id] = state;
logger_->Info("file upload started for session " + SessionId() +
" transfer_id=" + transfer_id +
" file_name=" + file_name +
" file_size=" + std::to_string(file_size) +
" storage_path=" + transfer_dir.string());
return;
}
if (action != "chunk") {
return;
}
auto it = uploads_.find(transfer_id);
if (it == uploads_.end()) {
logger_->Warn("file upload chunk ignored unknown transfer_id for session " + SessionId() + " transfer_id=" + transfer_id);
return;
}
auto& state = it->second;
const auto chunk_index = static_cast<std::int64_t>(common::GetNumber(payload, "chunk_index").value_or(-1));
const auto offset = static_cast<std::int64_t>(common::GetNumber(payload, "offset").value_or(-1));
const auto encoded = common::GetString(payload, "chunk_bytes").value_or("");
auto decoded = DecodeBase64(encoded);
if (!decoded.has_value() || decoded->empty() || static_cast<std::int64_t>(decoded->size()) > kMaxUploadChunkBytes ||
chunk_index != state.next_index || offset != state.received || state.received + static_cast<std::int64_t>(decoded->size()) > state.file_size) {
logger_->Warn("file upload chunk rejected by worker validation for session " + SessionId() + " transfer_id=" + transfer_id);
return;
}
std::ofstream out(state.temp_path, std::ios::binary | std::ios::app);
if (!out.good()) {
logger_->Warn("file upload chunk failed to open temp file for session " + SessionId() + " path=" + state.temp_path.string());
return;
}
out.write(reinterpret_cast<const char*>(decoded->data()), static_cast<std::streamsize>(decoded->size()));
if (!out.good()) {
logger_->Warn("file upload chunk failed to write temp file for session " + SessionId() + " path=" + state.temp_path.string());
return;
}
UpdateFnv1a64(state.hash, *decoded);
state.received += static_cast<std::int64_t>(decoded->size());
state.next_index++;
logger_->Info("file upload chunk written for session " + SessionId() +
" transfer_id=" + transfer_id +
" chunk_index=" + std::to_string(chunk_index) +
" received=" + std::to_string(state.received) +
" file_size=" + std::to_string(state.file_size));
if (state.received == state.file_size && state.next_index == state.total_chunks) {
const std::string actual_hash = Fnv1a64Hex(state.hash);
if (!state.expected_hash.empty() && state.expected_hash != actual_hash) {
logger_->Warn("file upload checksum mismatch for session " + SessionId() +
" transfer_id=" + transfer_id +
" expected=" + state.expected_hash +
" actual=" + actual_hash);
std::error_code ignored;
std::filesystem::remove(state.temp_path, ignored);
uploads_.erase(it);
return;
}
std::error_code ec;
std::filesystem::rename(state.temp_path, state.final_path, ec);
if (ec) {
logger_->Warn("file upload failed to finalize for session " + SessionId() + " error=" + ec.message());
return;
}
logger_->Info("file upload completed for session " + SessionId() +
" transfer_id=" + transfer_id +
" file_name=" + state.file_name +
" file_size=" + std::to_string(state.file_size) +
" content_hash=" + actual_hash +
" storage_path=" + state.final_path.string());
PublishEvent("session_file_upload_completed", "", common::JsonObject{
{"transfer_id", transfer_id},
{"file_name", state.file_name},
{"file_size", static_cast<double>(state.file_size)},
{"content_hash", actual_hash},
{"storage_path", state.final_path.string()},
{"visible_drive_name", "RAP_Transfers"},
{"visible_drive_path", state.final_path.parent_path().string()},
});
uploads_.erase(it);
}
}
void SessionRuntime::ScanOutboundDownloadDirectory(const std::string& session_id) {
if (!attached_.load() || !rdp_adapter_.IsConnected()) {
return;
}
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
if (!FileTransferAllowsServerToClient(snapshot.policy.file_transfer_mode)) {
return;
}
const auto outbound_dir = OutboundDownloadDirectory(session_id);
std::error_code ec;
std::filesystem::create_directories(outbound_dir, ec);
if (ec || IsSymlinkPath(outbound_dir)) {
logger_->Warn("file download scan skipped because ToClient directory is unavailable session=" +
session_id + " path=" + outbound_dir.string());
return;
}
for (const auto& entry : std::filesystem::directory_iterator(outbound_dir, ec)) {
if (ec) {
logger_->Warn("file download scan failed session=" + session_id + " error=" + ec.message());
return;
}
const auto path = entry.path();
const auto file_name = path.filename().string();
if (!IsSafeFileName(file_name) || IsIgnoredOutboundFileName(file_name) || entry.is_directory(ec) || IsSymlinkPath(path)) {
continue;
}
if (!entry.is_regular_file(ec)) {
continue;
}
const auto file_size = static_cast<std::int64_t>(entry.file_size(ec));
if (ec || file_size <= 0) {
continue;
}
if (file_size > kMaxDownloadBytes) {
PublishFileDownloadBlocked("", "", "file too large: " + file_name);
continue;
}
const auto modified_at = entry.last_write_time(ec);
if (ec) {
continue;
}
auto& candidate = download_candidates_[file_name];
if (candidate.last_observed_size == file_size && candidate.last_observed_modified_at == modified_at) {
candidate.stable_observations++;
} else {
candidate = FileDownloadCandidate{};
candidate.file_name = file_name;
candidate.path = path;
candidate.last_observed_size = file_size;
candidate.last_observed_modified_at = modified_at;
candidate.stable_observations = 1;
continue;
}
if (candidate.available_published || candidate.stable_observations < 2) {
continue;
}
auto content_hash = ComputeFileFnv1a64(path, kMaxDownloadBytes);
if (!content_hash.has_value()) {
PublishFileDownloadBlocked("", "", "file unavailable or too large: " + file_name);
continue;
}
std::uint64_t file_id_hash = 1469598103934665603ULL;
UpdateFnv1a64String(file_id_hash, session_id);
UpdateFnv1a64String(file_id_hash, file_name);
UpdateFnv1a64String(file_id_hash, std::to_string(file_size));
UpdateFnv1a64String(file_id_hash, content_hash->first);
candidate.file_id = Fnv1a64Hex(file_id_hash);
candidate.file_size = file_size;
candidate.modified_at = modified_at;
candidate.content_hash = content_hash->first;
candidate.content_hash_value = content_hash->second;
candidate.available_published = true;
const auto sequence = ++file_download_event_sequence_;
logger_->Info("file download available session=" + session_id +
" file_id=" + candidate.file_id +
" file_name=" + file_name +
" file_size=" + std::to_string(file_size) +
" content_hash=" + candidate.content_hash);
PublishEvent("session_file_download_available", "", common::JsonObject{
{"direction", "server_to_client"},
{"file_id", candidate.file_id},
{"file_name", file_name},
{"file_size", static_cast<double>(file_size)},
{"content_hash", candidate.content_hash},
{"sequence", static_cast<double>(sequence)},
{"status", "available"},
{"visible_drive_name", "RAP_Transfers"},
{"visible_drive_path", "ToClient"},
});
}
}
void SessionRuntime::HandleFileDownload(const common::JsonObject& payload) {
const auto action = common::GetString(payload, "action").value_or("");
const auto transfer_id = common::GetString(payload, "transfer_id").value_or("");
const auto file_id = common::GetString(payload, "file_id").value_or("");
if (transfer_id.empty()) {
PublishFileDownloadBlocked("", file_id, "invalid transfer_id");
return;
}
if (action == "cancel") {
downloads_.erase(transfer_id);
PublishEvent("session_file_download_progress", "", common::JsonObject{
{"direction", "server_to_client"},
{"transfer_id", transfer_id},
{"file_id", file_id},
{"status", "cancelled"},
{"sequence", static_cast<double>(++file_download_event_sequence_)},
});
return;
}
if (!attached_.load() || !rdp_adapter_.IsConnected()) {
PublishFileDownloadBlocked(transfer_id, file_id, "session is not active");
return;
}
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
if (!FileTransferAllowsServerToClient(snapshot.policy.file_transfer_mode)) {
PublishFileDownloadBlocked(transfer_id, file_id, "server_to_client file transfer blocked by policy");
return;
}
auto send_next_chunk = [&](FileDownloadState& state) {
std::error_code ec;
if (!std::filesystem::is_regular_file(state.path, ec) ||
IsSymlinkPath(state.path) ||
std::filesystem::file_size(state.path, ec) != static_cast<std::uintmax_t>(state.file_size) ||
std::filesystem::last_write_time(state.path, ec) != state.modified_at) {
PublishEvent("session_file_download_failed", "file changed during transfer", common::JsonObject{
{"direction", "server_to_client"},
{"transfer_id", state.transfer_id},
{"file_id", state.file_id},
{"file_name", state.file_name},
{"status", "failed"},
{"sequence", static_cast<double>(++file_download_event_sequence_)},
});
downloads_.erase(state.transfer_id);
return;
}
std::ifstream input(state.path, std::ios::binary);
if (!input.good()) {
PublishFileDownloadBlocked(state.transfer_id, state.file_id, "file unavailable");
downloads_.erase(state.transfer_id);
return;
}
input.seekg(state.sent);
std::vector<std::uint8_t> chunk(static_cast<std::size_t>(std::min<std::int64_t>(kMaxDownloadChunkBytes, state.file_size - state.sent)));
input.read(reinterpret_cast<char*>(chunk.data()), static_cast<std::streamsize>(chunk.size()));
const auto read = input.gcount();
if (read <= 0) {
PublishFileDownloadBlocked(state.transfer_id, state.file_id, "empty download chunk");
downloads_.erase(state.transfer_id);
return;
}
chunk.resize(static_cast<std::size_t>(read));
const auto offset = state.sent;
state.sent += static_cast<std::int64_t>(chunk.size());
const auto sequence = state.next_sequence++;
PublishEvent("session_file_download_chunk", "", common::JsonObject{
{"direction", "server_to_client"},
{"transfer_id", state.transfer_id},
{"file_id", state.file_id},
{"file_name", state.file_name},
{"file_size", static_cast<double>(state.file_size)},
{"total", static_cast<double>(state.file_size)},
{"offset", static_cast<double>(offset)},
{"chunk_size", static_cast<double>(chunk.size())},
{"chunk_bytes", Base64Encode(chunk.data(), chunk.size())},
{"content_hash", state.content_hash},
{"sequence", static_cast<double>(sequence)},
{"status", state.sent >= state.file_size ? "last_chunk" : "transferring"},
});
};
if (action == "start") {
if (file_id.empty()) {
PublishFileDownloadBlocked(transfer_id, file_id, "invalid file_id");
return;
}
if (downloads_.find(transfer_id) != downloads_.end()) {
PublishFileDownloadBlocked(transfer_id, file_id, "duplicate transfer_id");
return;
}
auto candidate = std::find_if(download_candidates_.begin(), download_candidates_.end(), [&](const auto& item) {
return item.second.file_id == file_id && item.second.available_published;
});
if (candidate == download_candidates_.end()) {
PublishFileDownloadBlocked(transfer_id, file_id, "unknown file_id");
return;
}
FileDownloadState state;
state.transfer_id = transfer_id;
state.file_id = candidate->second.file_id;
state.file_name = candidate->second.file_name;
state.path = candidate->second.path;
state.modified_at = candidate->second.modified_at;
state.file_size = candidate->second.file_size;
state.content_hash = candidate->second.content_hash;
state.active = true;
downloads_[transfer_id] = state;
PublishEvent("session_file_download_progress", "", common::JsonObject{
{"direction", "server_to_client"},
{"transfer_id", transfer_id},
{"file_id", state.file_id},
{"file_name", state.file_name},
{"file_size", static_cast<double>(state.file_size)},
{"total", static_cast<double>(state.file_size)},
{"received", 0},
{"content_hash", state.content_hash},
{"sequence", static_cast<double>(++file_download_event_sequence_)},
{"status", "started"},
});
send_next_chunk(downloads_[transfer_id]);
return;
}
if (action != "ack") {
PublishFileDownloadBlocked(transfer_id, file_id, "invalid download action");
return;
}
auto active = downloads_.find(transfer_id);
if (active == downloads_.end()) {
PublishFileDownloadBlocked(transfer_id, file_id, "unknown transfer_id");
return;
}
const auto acknowledged_offset = static_cast<std::int64_t>(common::GetNumber(payload, "offset").value_or(-1));
if (acknowledged_offset >= 0 && acknowledged_offset != active->second.sent) {
PublishFileDownloadBlocked(transfer_id, active->second.file_id, "invalid ack offset");
return;
}
if (active->second.sent >= active->second.file_size) {
PublishEvent("session_file_download_completed", "", common::JsonObject{
{"direction", "server_to_client"},
{"transfer_id", active->second.transfer_id},
{"file_id", active->second.file_id},
{"file_name", active->second.file_name},
{"file_size", static_cast<double>(active->second.file_size)},
{"received", static_cast<double>(active->second.file_size)},
{"total", static_cast<double>(active->second.file_size)},
{"content_hash", active->second.content_hash},
{"sequence", static_cast<double>(++file_download_event_sequence_)},
{"status", "completed"},
});
downloads_.erase(active);
return;
}
send_next_chunk(active->second);
}
void SessionRuntime::PublishFileDownloadBlocked(const std::string& transfer_id, const std::string& file_id, const std::string& reason) {
PublishEvent("session_file_download_blocked", reason, common::JsonObject{
{"direction", "server_to_client"},
{"transfer_id", transfer_id},
{"file_id", file_id},
{"reason", reason},
{"sequence", static_cast<double>(++file_download_event_sequence_)},
{"status", "blocked"},
});
}
std::filesystem::path SessionRuntime::PrepareVisibleTransferDirectory(const std::string& session_id) {
if (!IsSafePathSegment(session_id)) {
logger_->Warn("refusing unsafe session transfer directory segment session_id=" + session_id);
return {};
}
const auto session_root = SessionTransferRoot(session_id);
const auto visible_dir = VisibleTransferDirectory(session_id);
const auto outbound_dir = OutboundDownloadDirectory(session_id);
std::error_code ec;
std::filesystem::create_directories(visible_dir, ec);
if (ec) {
logger_->Warn("failed to create visible transfer directory path=" + visible_dir.string() + " error=" + ec.message());
return {};
}
std::filesystem::create_directories(outbound_dir, ec);
if (ec) {
logger_->Warn("failed to create outbound download directory path=" + outbound_dir.string() + " error=" + ec.message());
return {};
}
if (IsSymlinkPath(session_root) || IsSymlinkPath(visible_dir) || IsSymlinkPath(outbound_dir)) {
logger_->Warn("refusing symlinked visible transfer directory path=" + visible_dir.string());
return {};
}
const auto readme_path = visible_dir / "README.txt";
if (!PathExistsOrIsSymlink(readme_path)) {
std::ofstream readme(readme_path, std::ios::binary | std::ios::trunc);
if (readme.good()) {
readme << "RAP_Transfers\n\n"
<< "Files uploaded from the Access Client appear in this drive.\n"
<< "To send a file back to the Access Client, copy it into the ToClient folder.\n"
<< "Only regular files up to 25 MiB are accepted in this MVP stage.\n";
}
}
logger_->Info("visible transfer directory ready session=" + session_id +
" drive=RAP_Transfers path=" + visible_dir.string() +
" outbound_path=" + outbound_dir.string());
return visible_dir;
}
void SessionRuntime::CleanupVisibleTransferDirectory(const std::string& session_id) {
if (!IsSafePathSegment(session_id)) {
return;
}
const auto visible_dir = VisibleTransferDirectory(session_id);
std::error_code ec;
std::filesystem::remove_all(visible_dir, ec);
if (ec) {
logger_->Warn("failed to cleanup visible transfer directory path=" + visible_dir.string() + " error=" + ec.message());
} else {
logger_->Info("visible transfer directory cleaned up session=" + session_id +
" path=" + visible_dir.string());
}
}
void SessionRuntime::CleanupSessionTransferDirectory(const std::string& session_id) {
if (!IsSafePathSegment(session_id)) {
return;
}
const auto session_root = SessionTransferRoot(session_id);
std::error_code ec;
std::filesystem::remove_all(session_root, ec);
if (ec) {
logger_->Warn("failed to cleanup session transfer directory path=" + session_root.string() + " error=" + ec.message());
} else {
logger_->Info("session transfer directory cleaned up session=" + session_id +
" path=" + session_root.string());
}
}
void SessionRuntime::PublishEvent(const std::string& type, const std::string& reason) {
PublishEvent(type, reason, {});
}
void SessionRuntime::PublishEvent(const std::string& type, const std::string& reason, common::JsonObject payload) {
Assignment snapshot;
{
std::lock_guard<std::mutex> lock(mutex_);
snapshot = assignment_;
}
WorkerEvent event;
event.type = type;
event.session_id = snapshot.session_id;
event.worker_id = snapshot.worker_id;
event.reason = reason;
event.width = rdp_adapter_.DesktopWidth();
event.height = rdp_adapter_.DesktopHeight();
event.payload = std::move(payload);
control_plane_->PublishEvent(event);
DispatchDirectEvent(event);
}
void SessionRuntime::DispatchDirectEvent(const WorkerEvent& event) {
std::vector<std::shared_ptr<DirectEventSink>> sinks;
std::string current_attachment_id;
const std::string takeover_of = common::GetString(event.payload, "takeover_of").value_or("");
{
std::lock_guard<std::mutex> lock(mutex_);
current_attachment_id = assignment_.attachment_id;
for (auto iterator = direct_event_sinks_.begin(); iterator != direct_event_sinks_.end();) {
if (auto sink = iterator->lock()) {
const bool deliver_takeover = IsTakeoverDirectEvent(event) && sink->AttachmentId() == takeover_of;
const bool deliver_current = !IsTakeoverDirectEvent(event) && sink->AttachmentId() == current_attachment_id;
if (deliver_takeover || deliver_current) {
sinks.push_back(std::move(sink));
}
++iterator;
} else {
iterator = direct_event_sinks_.erase(iterator);
}
}
}
for (const auto& sink : sinks) {
sink->EnqueueEvent(event);
}
}
bool SessionRuntime::HasCurrentDirectEventSink() {
std::lock_guard<std::mutex> lock(mutex_);
for (auto iterator = direct_event_sinks_.begin(); iterator != direct_event_sinks_.end();) {
if (auto sink = iterator->lock()) {
if (sink->AttachmentId() == assignment_.attachment_id) {
return true;
}
++iterator;
} else {
iterator = direct_event_sinks_.erase(iterator);
}
}
return false;
}
} // namespace rdp_worker::runtime