From 7dcadc1ca2a39c326945caaacd1bd5820a2f6ced Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Sun, 2 Feb 2025 16:33:52 +0100 Subject: [PATCH] final performant version --- include/actsim_agent/downloader.hpp | 2 - include/actsim_agent/task_interface.hpp | 14 +- include/actsim_agent/uploader.hpp | 5 +- include/cluster/db_client.hpp | 26 +++ src/actsim_agent/CMakeLists.txt | 1 - src/actsim_agent/downloader.cpp | 228 +++++++----------------- src/actsim_agent/task_interface.cpp | 51 ++++-- src/actsim_agent/uploader.cpp | 84 +++------ src/cluster/db_client.cpp | 2 +- 9 files changed, 176 insertions(+), 237 deletions(-) diff --git a/include/actsim_agent/downloader.hpp b/include/actsim_agent/downloader.hpp index ba0273d..9337595 100644 --- a/include/actsim_agent/downloader.hpp +++ b/include/actsim_agent/downloader.hpp @@ -31,8 +31,6 @@ #include #include "task_interface.hpp" -#define NOTHING_AVAILABLE_SLEEP_TIME 500ms - class Downloader { public: diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp index 31296a0..fac5b18 100644 --- a/include/actsim_agent/task_interface.hpp +++ b/include/actsim_agent/task_interface.hpp @@ -26,6 +26,7 @@ #ifndef __TASK_INTERFACE__ #define __TASK_INTERFACE__ +#include #include #include #include @@ -33,6 +34,7 @@ #include #include #include "agent_artifact.hpp" +#include /** * If you want to use this interface for different types, you only need to change it here. @@ -41,6 +43,8 @@ using InputType = DBSimConfigArtifact; using OutputType = DBSimOutputArtifact; +#define NOTHING_AVAILABLE_SLEEP_TIME 500ms + class TaskInterface { public: @@ -49,10 +53,11 @@ class TaskInterface { ~TaskInterface(); void wait_for_fresh(); - void wait_for_finished(); + void wait_for_finished(size_t min_size); void wait_for_buffer_consume(); void wait_for_cleanup_ready(); void wait_for_download_halt(); + void wait_for_available(); void notify_cleanup_ready(); void notify_workers_program_halt(); @@ -62,7 +67,7 @@ class TaskInterface { void push_fresh(std::vector>& tasks); std::unique_ptr pop_fresh(bool& empty); void push_finished(std::unique_ptr task); - std::unique_ptr pop_finished(bool& empty); + std::vector> pop_finished(bool& empty); size_t get_buffer_space(); /* @@ -87,7 +92,7 @@ class TaskInterface { void stop_immediately() { this->immediate_stop.store(true, std::memory_order_relaxed); }; bool fresh_queue_empty(); - bool finished_queue_empty(); + bool finished_queue_empty(size_t min_size); private: @@ -102,6 +107,9 @@ class TaskInterface { std::unordered_map> designs; std::unordered_map>> references; + bool worker_waiting = false; + bool downloader_waiting = false; + ////// Mutexes ////// // access to task queues diff --git a/include/actsim_agent/uploader.hpp b/include/actsim_agent/uploader.hpp index 387fa1c..7dfeb9e 100644 --- a/include/actsim_agent/uploader.hpp +++ b/include/actsim_agent/uploader.hpp @@ -32,6 +32,8 @@ #include #include "task_interface.hpp" +std::string build_fault_flags(const std::unique_ptr& task); + class Uploader { public: @@ -44,8 +46,7 @@ class Uploader { private: void thread_run(); - bool upload_task(std::unique_ptr task); - std::string build_fault_flags(const std::unique_ptr& task); + bool upload_task(std::vector>& tasks); std::unique_ptr uploader_thread; std::unique_ptr conn; diff --git a/include/cluster/db_client.hpp b/include/cluster/db_client.hpp index 02f05de..577e70d 100644 --- a/include/cluster/db_client.hpp +++ b/include/cluster/db_client.hpp @@ -26,6 +26,7 @@ #define __DB_CLIENT_H__ +#include #include #include #include @@ -69,6 +70,31 @@ class Connection { bool prepare_statement(std::string name, std::string statement); bool unprepare_statement(std::string name); + pqxx::work open_transaction(bool& success) { + + retry_send: + + if (!ensure_connected()) { + cerr << "Could contact database. Broken database connection." << endl; + success = false; + } + + try { + + return pqxx::work(*c); + + } catch (const pqxx::broken_connection& e) { + cerr << "Pipe broke during database operation... Retrying..." << endl; + cerr << e.what() << endl; + + // using a goto here since it'll gracefully retry connection + goto retry_send; + } + + success = true; + + } + // Send request takes an arbitrary request method to the database and wraps error handling // around it. It takes a function with an arbitrary set of arguments and the arguments to give // that function and returns whatever the function returns. diff --git a/src/actsim_agent/CMakeLists.txt b/src/actsim_agent/CMakeLists.txt index b13245e..3652aed 100644 --- a/src/actsim_agent/CMakeLists.txt +++ b/src/actsim_agent/CMakeLists.txt @@ -1,5 +1,4 @@ include_directories(../../include) -include_directories(../../include/db_lib) include_directories(../../include/actsim_agent) include_directories($ENV{ACT_HOME}/include) include_directories(../../deps/libpqxx/include) diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index 3817764..3202503 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -23,6 +23,7 @@ ************************************************************************** */ +#include "../../include/cluster/db_client.hpp" #include #include #include @@ -34,11 +35,11 @@ #include #include #include -#include #include #include "actsim_agent.hpp" #include "agent_artifact.hpp" #include "cluster/db_types.hpp" +#include "pqxx/internal/statement_parameters.hxx" #include "pqxx/prepared_statement.hxx" #include "task_interface.hpp" #include "util.h" @@ -62,8 +63,6 @@ void Downloader::join() { void Downloader::thread_run() { - using namespace std::chrono_literals; - // connect to the database if (!this->conn->connect()) { std::cerr << "Error: Upload thread could not connect to the database!" << std::endl; @@ -72,6 +71,10 @@ void Downloader::thread_run() { return; } + this->conn->prepare_statement("mark_tasks", + "UPDATE sim_configs SET part_status = 'finished' WHERE id = $1;" + ); + while (this->interface.running()) { // wait until there is more space in the buffer to fill @@ -86,7 +89,7 @@ void Downloader::thread_run() { if (!this->fetch_tasks(this->interface.get_buffer_space())) { // we can sleep for a certain amount of time, nothing to do // DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit..."); - std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME); + this->interface.wait_for_available(); } } @@ -112,155 +115,46 @@ void Downloader::thread_run() { bool Downloader::fetch_tasks(size_t n) { DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks..."); - // fetch a new task from the database - auto fetch_task_lambda = [n]( - pqxx::work *txn, - bool *task_avail, - std::unordered_set *references, - std::vector> *tasks - ) { + bool txn_opened; + auto txn = this->conn->open_transaction(txn_opened); + pqxx::result res; - // Fetch a new task by a couple rules: - // 1.) It has not been started yet or the output row doesn't exist - // 2.) Passes that are already in progress are preferred - // 3.) New passes are started in the order they were added to the database - // 4.) Passes are only started if all their dependencies are fulfilled - auto res = txn->exec( - "SELECT " - " ap.design_file AS design, " - " ap.top_proc, " - " ap.outputs AS target_artifact, " - " ap.id AS source_pass, " - " sc.id AS source_config, " - " sc.sim_commands, " - " sc.has_reference AS reference, " - " so.id AS id " - "FROM " - " sim_configs sc " - "JOIN " - " actsim_passes ap ON ap.sim_configs = sc.artifact " - "LEFT JOIN " - " sim_outputs so ON sc.has_reference = so.sim_config " - "JOIN " - " jobs j ON ap.job = j.id " - "WHERE " - " (so.part_status IS NULL OR so.part_status = 'finished') " - " AND ap.id IN ( " - " SELECT ap_dep.id " - " FROM actsim_passes ap_dep " - " JOIN passes p ON ap_dep.id = p.id " - " WHERE NOT EXISTS ( " - " SELECT 1 " - " FROM passes dep " - " WHERE dep.id = ANY(p.depends_on) " - " AND dep.pass_status != 'finished' " - " ) " - " ) " - // " AND (sc.has_reference IS NULL OR " - // " EXISTS (" - // " SELECT 1 " - // " FROM sim_outputs out " - // " WHERE out.sim_config = sc.has_reference " - // " AND out.part_status = 'finished' " - // " ) " - // " ) " - // "ORDER BY " - // " ap.pass_status = 'in_progress' DESC, " - // " j.time_added ASC " + try { + res = txn.exec( + "SELECT * " + "FROM open_tasks " "LIMIT $1;", n ); - // seems like there is nothing to do right now - if (res.size() < 1) { - *task_avail = false; - return; + for (auto&& row : res) { + txn.exec(pqxx::prepped("mark_tasks"), {row["source_config"].as()}); } - *task_avail = true; + txn.commit(); + } catch (const std::exception& e) { - // generate the output rows - std::string sql = R"( - INSERT INTO sim_outputs (artifact, source_pass, sim_config) - VALUES ($1, $2, $3) - ON CONFLICT (sim_config) - DO UPDATE SET part_status = 'in_progress' - RETURNING id; - )"; + // if something happened during the transaction, we roll it back + txn.abort(); - txn->conn().prepare("insert_output_rows", sql); - - for (auto row : res) { - auto target_artifact = row["target_artifact"].as(); - auto source_pass = row["source_pass"].as(); - auto design = row["design"].as(); - - db::uuid_t reference; - reference = 0; - - if (!row["reference"].is_null()) { - reference = row["reference"].as(); - (*references).emplace(reference); - } - - auto source_config = row["source_config"].as(); - - auto com_arr = row["sim_commands"].as_sql_array(); - std::vector commands(com_arr.cbegin(), com_arr.cend()); - - pl::testcase_t testcase = { - commands, - row["top_proc"].as(), - false - }; - - // get the id - auto id = txn->exec(pqxx::prepped{"insert_output_rows"}, {target_artifact, source_pass, source_config})[0]["id"].as(); - - auto task = std::make_unique(id, source_pass, target_artifact, design, reference, source_config); - task->add_testcase(testcase); - - (*tasks).emplace_back(std::move(task)); - } - - txn->conn().unprepare("insert_output_rows"); - - }; - - std::function *references, - std::vector> *tasks - )> fetch_task_func = fetch_task_lambda; - - - std::unordered_set references; - bool task_avail; - std::vector> tasks; - - if (!this->conn->send_request( - &fetch_task_func, - &task_avail, - &references, - &tasks - )) { - // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl; + std::cerr << e.what() << std::endl; this->interface.stop(); this->interface.stop_immediately(); - } - - // seems like there are no tasks to do right now; tell the calling function - // that there is still space in the buffer and we should just wait a while - if (!task_avail) { - DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!"); return false; } - for (auto&& task : tasks) { + // seems like there is nothing to do right now + if (res.size() < 1) { + return false; + } - auto design = task->design; + for (auto row : res) { + auto target_artifact = row["target_artifact"].as(); + auto source_pass = row["source_pass"].as(); + auto design = row["design"].as(); + + auto source_config = row["source_config"].as(); // see if we already have the design locally; if not, load it if (!this->interface.increment_design(design)) { @@ -270,37 +164,51 @@ bool Downloader::fetch_tasks(size_t n) { // if we could not load the design, reopen it in the database if (!this->fetch_design(design, design_path)) { - std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl; - this->reopen_task(task->id, true); + std::cerr << "Error: Could not load design for task " << source_config << ", reopening it." << std::endl; + this->reopen_task(source_config, true); continue; } this->interface.store_design(design, design_path); } - } + + db::uuid_t reference; + reference = 0; - // if we have a reference for this run, we have to see if it is loaded - for (auto&& reference : references) { + if (!row["reference"].is_null()) { + reference = row["reference"].as(); + + // see if we already have the reference run locally; if not, load it + if (!this->interface.increment_reference(reference)) { - // see if we already have the reference run locally; if not, load it - if (!this->interface.increment_reference(reference)) { + DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference)); + std::shared_ptr reference_run; + + // if we could not load the reference run, reopen the task in the database + if ((reference_run = this->fetch_reference_run(reference)) == nullptr) { + std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl; + // this->reopen_task(task->id, true); + continue; + } - DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference)); - std::shared_ptr reference_run; - - // if we could not load the reference run, reopen the task in the database - if ((reference_run = this->fetch_reference_run(reference)) == nullptr) { - std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl; - // this->reopen_task(task->id, true); - continue; + this->interface.store_reference(reference, reference_run); } - - this->interface.store_reference(reference, reference_run); } - } - // push the task to the list of open tasks - this->interface.push_fresh(tasks); + auto com_arr = row["sim_commands"].as_sql_array(); + std::vector commands(com_arr.cbegin(), com_arr.cend()); + + pl::testcase_t testcase = { + commands, + row["top_proc"].as(), + false + }; + + auto task = std::make_unique(db::uuid_t(), source_pass, target_artifact, design, reference, source_config); + task->add_testcase(testcase); + + this->interface.push_fresh(std::move(task)); + } return true; } @@ -379,14 +287,14 @@ std::shared_ptr Downloader::fetch_reference_run(const db: bool *found ) { try { - auto res = txn->exec("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id)[0]; + auto res = txn->exec("SELECT output_tokens, output_token_timings FROM sim_outputs WHERE sim_config = $1;", id)[0]; // load the output token timings auto arr_ott = res["output_token_timings"].as_sql_array(); *output_token_timings = std::vector(arr_ott.cbegin(), arr_ott.cend()); *output_tokens = res["output_tokens"].as(); - *log_size = res["log_size"].as(); + *log_size = 0; *found = true; @@ -443,7 +351,7 @@ void Downloader::reopen_task(const db::uuid_t& id, bool halt) { // open up the status of this partial output in the database again auto task_reopen_lambda = [id, status](pqxx::work *txn) { - txn->exec("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status}); + txn->exec("UPDATE sim_configs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status}); }; std::function task_reopen_func = task_reopen_lambda; diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp index 99bd5cc..eccf6ac 100644 --- a/src/actsim_agent/task_interface.cpp +++ b/src/actsim_agent/task_interface.cpp @@ -24,7 +24,10 @@ */ #include "util.h" +#include +#include #include +#include #include #include #include "task_interface.hpp" @@ -75,9 +78,9 @@ bool TaskInterface::fresh_queue_empty() { return this->fresh_queue.empty(); } -bool TaskInterface::finished_queue_empty() { +bool TaskInterface::finished_queue_empty(size_t min_size) { std::lock_guard lock(this->finished_queue_mutex); - return this->finished_queue.empty(); + return this->finished_queue.size() < min_size; } std::unique_ptr TaskInterface::pop_fresh(bool& empty) { @@ -105,17 +108,21 @@ std::unique_ptr TaskInterface::pop_fresh(bool& empty) { void TaskInterface::wait_for_fresh() { std::unique_lock lock (this->fresh_queue_empty_mutex); + worker_waiting = true; + finished_queue_empty_condition.notify_all(); + // we will be notified either when there is new data or the program has been stopped this->fresh_queue_empty_condition.wait(lock, [&] { return !this->fresh_queue_empty() || !running(); }); DEBUG_PRINT("Worker released from block"); + worker_waiting = false; } -void TaskInterface::wait_for_finished() { +void TaskInterface::wait_for_finished(size_t min_size) { std::unique_lock lock (this->finished_queue_empty_mutex); // we will be notified either when there is new data or the program has been stopped - this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); }); + this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty(min_size) || !running() || (worker_waiting && downloader_waiting);}); } void TaskInterface::wait_for_buffer_consume() { @@ -125,6 +132,18 @@ void TaskInterface::wait_for_buffer_consume() { this->fresh_queue_full_condition.wait(lock, [&] { return this->get_buffer_space() > 0 || !running(); }); } +void TaskInterface::wait_for_available() { + + downloader_waiting = true; + finished_queue_empty_condition.notify_all(); + + using namespace std::chrono_literals; + + std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME); + + downloader_waiting = false; +} + void TaskInterface::push_finished(std::unique_ptr task) { std::lock_guard lock(this->finished_queue_mutex); this->finished_queue.push(std::move(task)); @@ -133,21 +152,27 @@ void TaskInterface::push_finished(std::unique_ptr task) { this->finished_queue_empty_condition.notify_one(); } -std::unique_ptr TaskInterface::pop_finished(bool& empty) { +std::vector> TaskInterface::pop_finished(bool& empty) { + + std::vector> tasks; // we first need exclusive access to the queue std::lock_guard lock (this->finished_queue_mutex); - // we have to make sure the queue wasn't emptied since the last empty call was made - // or at least inform the calling thread that the queue is currently empty - std::unique_ptr task; - - if (!(empty = this->finished_queue.empty())) { - task = std::move(this->finished_queue.front()); - this->finished_queue.pop(); + // check if there is a minimum amount of tasks in the queue + if (this->finished_queue.empty()) { + empty = true; + return tasks; } - return task; + empty = false; + + while (!this->finished_queue.empty()) { + tasks.emplace_back(std::move(this->finished_queue.front())); + this->finished_queue.pop(); + } + + return tasks; } bool TaskInterface::increment_design(const db::uuid_t& id) { diff --git a/src/actsim_agent/uploader.cpp b/src/actsim_agent/uploader.cpp index 7b05929..9ec3f8a 100644 --- a/src/actsim_agent/uploader.cpp +++ b/src/actsim_agent/uploader.cpp @@ -23,6 +23,7 @@ ************************************************************************** */ +#include #include #include #include @@ -57,12 +58,15 @@ void Uploader::thread_run() { this->interface.stop(); return; } + + size_t min_size = 1; while (this->interface.running()) { + // this blocks until either a new task is available for upload or the // program was halted - this->interface.wait_for_finished(); + this->interface.wait_for_finished(min_size); DEBUG_PRINT("Uploader was woken up"); @@ -71,7 +75,7 @@ void Uploader::thread_run() { // we're still good to go! get a task from the fresh queue bool queue_empty; - auto task = this->interface.pop_finished(queue_empty); + auto tasks = this->interface.pop_finished(queue_empty); // we need to make sure the queue wasn't emptied between waiting and getting new data if (queue_empty) continue; @@ -79,7 +83,7 @@ void Uploader::thread_run() { DEBUG_PRINT("Uploader dequeued new task"); // everything is good, upload the given task - bool success = this->upload_task(std::move(task)); + bool success = this->upload_task(tasks); // Uh oh, seems like we lost database connection! Close the program. if (!success) { @@ -89,6 +93,7 @@ void Uploader::thread_run() { } DEBUG_PRINT("Task successfully uploaded"); + min_size = 200; } DEBUG_PRINT("Uploader is starting the shutdown procedure, waiting for cleanup ready"); @@ -100,75 +105,44 @@ void Uploader::thread_run() { DEBUG_PRINT("Uploader has received go for cleanup"); // upload all the remaining tasks - while (!this->interface.finished_queue_empty()) { - bool queue_empty; - auto task = this->interface.pop_finished(queue_empty); + bool queue_empty; + auto tasks = this->interface.pop_finished(queue_empty); - // in case there are ever multiple upload threads, - // the same issues apply as before - if (!queue_empty) { - DEBUG_PRINT("Uploading finished task"); - if (!this->upload_task(std::move(task))) { - std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl; - } + // in case there are ever multiple upload threads, + // the same issues apply as before + if (!queue_empty) { + DEBUG_PRINT("Uploading finished task"); + if (!this->upload_task(tasks)) { + std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl; } - } + DEBUG_PRINT("Uploader is done"); } -bool Uploader::upload_task(std::unique_ptr task) { +bool Uploader::upload_task(std::vector>& tasks) { - auto&& task_id = task->id; - auto&& sim_log = task->get_content().first; - auto&& sim_error = task->get_content().second; - auto&& output_token_timings = task->get_output_token_timings(); - auto&& output_tokens = task->output_tokens; - auto log_size = sim_log.size() + sim_error.size(); + auto upload_results_lambda = [] (pqxx::work* txn, std::vector>* tasks) { - const auto&& fault_flags = build_fault_flags(task); + pqxx::stream_to stream = pqxx::stream_to::table(*txn, {"sim_outputs"}, {"artifact","source_pass", "sim_config", "output_tokens", "output_token_timings", "fault_flags", "part_status"}); - // make sure any task that is uploaded isn't halted in the database - auto task_upload_lambda = [ task_id, - sim_log, - sim_error, - output_tokens, - output_token_timings, - fault_flags, - log_size - ]( - pqxx::work *txn - ) { - txn->exec( - "UPDATE sim_outputs SET " - " sim_log = $1, " - " error_log = $2, " - " output_tokens = $3, " - " output_token_timings = $4, " - " fault_flags = $5, " - " log_size = $6, " - " part_status = 'finished' " - "WHERE id = $7 AND part_status != 'halted';", - {sim_log, - sim_error, - output_tokens, - output_token_timings, - fault_flags, - log_size, - task_id} - ); + for (auto&& task : (*tasks)) { + stream.write_values(task->target_artifact, task->source_pass, task->source_config, task->output_tokens, task->get_output_token_timings(), build_fault_flags(task), "finished"); + } + + stream.complete(); }; - std::function task_upload_func = task_upload_lambda; + std::function>*)> task_upload_func = upload_results_lambda; - DEBUG_PRINT("Updating task " + db::to_string(task->id)); + DEBUG_PRINT("Updating tasks"); - return this->conn->send_request(&task_upload_func); + return this->conn->send_request(&task_upload_func, &tasks); } -std::string Uploader::build_fault_flags(const std::unique_ptr& task) { +std::string build_fault_flags(const std::unique_ptr& task) { // bit mask for faults is // 0: timing diff --git a/src/cluster/db_client.cpp b/src/cluster/db_client.cpp index 0ab83d1..cb1c124 100644 --- a/src/cluster/db_client.cpp +++ b/src/cluster/db_client.cpp @@ -125,7 +125,7 @@ bool Connection::prepare_statement(std::string name, std::string statement) { try { this->c->prepare(name, statement); } catch (const exception &e) { - cerr << "Error: Could not prepare statements!" << endl; + cerr << "Error: Could not prepare statements! " << e.what() << endl; return false; }