From 25b468466915e6dad890f8261dea309dbaf40065 Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Tue, 28 Jan 2025 10:58:30 +0100 Subject: [PATCH] optimize dbase interaction --- include/actsim_agent/actsim_agent.hpp | 2 +- include/actsim_agent/task_interface.hpp | 2 + src/actsim_agent/actsim_agent.cpp | 1 - src/actsim_agent/downloader.cpp | 335 ++++++++++++------------ src/actsim_agent/task_interface.cpp | 18 ++ src/actsim_agent/uploader.cpp | 1 + 6 files changed, 189 insertions(+), 170 deletions(-) diff --git a/include/actsim_agent/actsim_agent.hpp b/include/actsim_agent/actsim_agent.hpp index d723349..276cd7e 100644 --- a/include/actsim_agent/actsim_agent.hpp +++ b/include/actsim_agent/actsim_agent.hpp @@ -30,7 +30,7 @@ #define DATABASE_VERSION 3 -#define DOWNLOAD_BUFFER 5 +#define DOWNLOAD_BUFFER 200 int start_agent(db::db_credentials_t db_cred, size_t worker_processes); diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp index ef52e9e..31296a0 100644 --- a/include/actsim_agent/task_interface.hpp +++ b/include/actsim_agent/task_interface.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include "agent_artifact.hpp" /** @@ -58,6 +59,7 @@ class TaskInterface { void notify_download_halt(); void push_fresh(std::unique_ptr task); + void push_fresh(std::vector>& tasks); std::unique_ptr pop_fresh(bool& empty); void push_finished(std::unique_ptr task); std::unique_ptr pop_finished(bool& empty); diff --git a/src/actsim_agent/actsim_agent.cpp b/src/actsim_agent/actsim_agent.cpp index f2869bb..58496da 100644 --- a/src/actsim_agent/actsim_agent.cpp +++ b/src/actsim_agent/actsim_agent.cpp @@ -24,7 +24,6 @@ */ #include -#include #include #include #include diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index 252fb4a..94f13ad 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -25,6 +25,8 @@ #include #include +#include +#include #include #include #include @@ -32,7 +34,12 @@ #include #include #include +#include #include +#include "agent_artifact.hpp" +#include "cluster/db_types.hpp" +#include "pqxx/prepared_statement.hxx" +#include "task_interface.hpp" #include "util.h" #include "downloader.hpp" @@ -101,167 +108,155 @@ void Downloader::thread_run() { bool Downloader::fetch_tasks(size_t n) { DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks..."); - - for (size_t i = 0; i < n; ++i) { - - // fetch a new task from the database - auto fetch_task_lambda = [n]( - pqxx::work *txn, - bool *task_avail, - pl::testcase_t *testcase, - db::uuid_t *target_artifact, - db::uuid_t *source_pass, - db::uuid_t *design, - db::uuid_t *reference, - db::uuid_t *source_config, - db::uuid_t *id - ) { - // Fetch a new task by a couple rules: - // 1.) It has not been started yet or the output row doesn't exist - // 2.) Passes that are already in progress are preferred - // 3.) New passes are started in the order they were added to the database - // 4.) Passes are only started if all their dependencies are fulfilled - auto res = txn->exec_params( - "SELECT " - " ap.design_file AS design, " - " ap.top_proc, " - " ap.outputs AS target_artifact, " - " ap.id AS source_pass, " - " sc.id AS source_config, " - " sc.sim_commands, " - " sc.has_reference AS reference, " - " so.id AS id " - "FROM " - " actsim_passes ap " - "JOIN " - " sim_configs sc ON ap.sim_configs = sc.artifact " - "LEFT JOIN " - " sim_outputs so ON sc.id = so.sim_config " - "JOIN " - " jobs j ON ap.job = j.id " - "WHERE " - " (so.id IS NULL OR so.part_status = 'not_started') " - " AND ap.id IN ( " - " SELECT ap_dep.id " - " FROM actsim_passes ap_dep " - " JOIN passes p ON ap_dep.id = p.id " - " WHERE NOT EXISTS ( " - " SELECT 1 " - " FROM passes dep " - " WHERE dep.id = ANY(p.depends_on) " - " AND dep.pass_status != 'finished' " - " ) " - " ) " - " AND (sc.has_reference IS NULL OR " - " EXISTS (" - " SELECT 1 " - " FROM sim_outputs out " - " WHERE out.sim_config = sc.has_reference " - " AND out.part_status = 'finished' " - " ) " - " ) " - "ORDER BY " - " ap.pass_status = 'in_progress' DESC, " - " j.time_added ASC " - "LIMIT $1;", - n - ); + // fetch a new task from the database + auto fetch_task_lambda = [n]( + pqxx::work *txn, + bool *task_avail, + std::unordered_set *references, + std::vector> *tasks + ) { - // seems like there is nothing to do right now - if (res.size() < 1) { - *task_avail = false; - return; - } + // Fetch a new task by a couple rules: + // 1.) It has not been started yet or the output row doesn't exist + // 2.) Passes that are already in progress are preferred + // 3.) New passes are started in the order they were added to the database + // 4.) Passes are only started if all their dependencies are fulfilled + auto res = txn->exec( + "SELECT " + " ap.design_file AS design, " + " ap.top_proc, " + " ap.outputs AS target_artifact, " + " ap.id AS source_pass, " + " sc.id AS source_config, " + " sc.sim_commands, " + " sc.has_reference AS reference, " + " so.id AS id " + "FROM " + " actsim_passes ap " + "JOIN " + " sim_configs sc ON ap.sim_configs = sc.artifact " + "LEFT JOIN " + " sim_outputs so ON sc.id = so.sim_config " + "JOIN " + " jobs j ON ap.job = j.id " + "WHERE " + " (so.id IS NULL OR so.part_status = 'not_started') " + " AND ap.id IN ( " + " SELECT ap_dep.id " + " FROM actsim_passes ap_dep " + " JOIN passes p ON ap_dep.id = p.id " + " WHERE NOT EXISTS ( " + " SELECT 1 " + " FROM passes dep " + " WHERE dep.id = ANY(p.depends_on) " + " AND dep.pass_status != 'finished' " + " ) " + " ) " + " AND (sc.has_reference IS NULL OR " + " EXISTS (" + " SELECT 1 " + " FROM sim_outputs out " + " WHERE out.sim_config = sc.has_reference " + " AND out.part_status = 'finished' " + " ) " + " ) " + // "ORDER BY " + // " ap.pass_status = 'in_progress' DESC, " + // " j.time_added ASC " + "LIMIT $1;", + n + ); - auto row = res[0]; + // seems like there is nothing to do right now + if (res.size() < 1) { + *task_avail = false; + return; + } + + *task_avail = true; + + // generate the output rows + std::string sql = R"( + INSERT INTO sim_outputs (artifact, source_pass, sim_config) + VALUES ($1, $2, $3) + ON CONFLICT (sim_config) + DO UPDATE SET part_status = 'in_progress' + RETURNING id; + )"; + + txn->conn().prepare("insert_output_rows", sql); + + for (auto row : res) { + auto target_artifact = row["target_artifact"].as(); + auto source_pass = row["source_pass"].as(); + auto design = row["design"].as(); + + db::uuid_t reference; - // we got something back, sort the data - *task_avail = true; - *target_artifact = row["target_artifact"].as(); - *source_pass = row["source_pass"].as(); - *design = row["design"].as(); if (!row["reference"].is_null()) { - *reference = row["reference"].as(); - } else { - *reference = db::uuid_t(); + reference = row["reference"].as(); + (*references).emplace(reference); } - *source_config = row["source_config"].as(); + + auto source_config = row["source_config"].as(); auto com_arr = row["sim_commands"].as_sql_array(); std::vector commands(com_arr.cbegin(), com_arr.cend()); - *testcase = { + pl::testcase_t testcase = { commands, row["top_proc"].as(), false }; - if (row["id"].is_null()) { - // create a new sim output row in the database - pqxx::row new_row = txn->exec_params1( - "INSERT INTO sim_outputs (artifact, source_pass, sim_config) " - " VALUES ($1, $2, $3) " - " RETURNING id;", - *target_artifact, - *source_pass, - *source_config - ); + // get the id + auto id = txn->exec(pqxx::prepped{"insert_output_rows"}, {target_artifact, source_pass, source_config})[0]["id"].as(); - // and get the auto-generated ID - *id = new_row["id"].as(); - } else { - // get the ID - *id = row["id"].as(); + auto task = std::make_unique(id, source_pass, target_artifact, design, reference, source_config); + task->add_testcase(testcase); - // and since the sim output row already exists, update its status - txn->exec_params0( - "UPDATE sim_outputs SET part_status = 'in_progress' WHERE id = $1;", - *id - ); - } - - }; - - std::function fetch_task_func = fetch_task_lambda; - - bool task_avail; - pl::testcase_t testcase; - db::uuid_t target_artifact; - db::uuid_t source_pass; - db::uuid_t design; - db::uuid_t reference; - db::uuid_t source_config; - db::uuid_t id; - - if (!this->conn->send_request( - &fetch_task_func, - &task_avail, - &testcase, - &target_artifact, - &source_pass, - &design, - &reference, - &source_config, - &id - )) { - // if we lost connection, there's nothing else we can really do - std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl; - this->interface.stop(); - this->interface.stop_immediately(); + (*tasks).emplace_back(std::move(task)); } - // seems like there are no tasks to do right now; tell the calling function - // that there is still space in the buffer and we should just wait a while - if (!task_avail) { - DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!"); - return false; - } + txn->conn().unprepare("insert_output_rows"); - DEBUG_PRINT("Fetched task with id " + db::to_string(id) + ", stemming from pass " + db::to_string(source_pass) + ", outputting to artifact " + db::to_string(target_artifact)); - DEBUG_PRINT("Design used is " + db::to_string(design) + ", simulation config " + db::to_string(source_config)); - auto task = std::make_unique(id, source_pass, target_artifact, design, reference, source_config); - task->add_testcase(testcase); + }; + + std::function *references, + std::vector> *tasks + )> fetch_task_func = fetch_task_lambda; + + + std::unordered_set references; + bool task_avail; + std::vector> tasks; + + if (!this->conn->send_request( + &fetch_task_func, + &task_avail, + &references, + &tasks + )) { + // if we lost connection, there's nothing else we can really do + std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl; + this->interface.stop(); + this->interface.stop_immediately(); + } + + // seems like there are no tasks to do right now; tell the calling function + // that there is still space in the buffer and we should just wait a while + if (!task_avail) { + DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!"); + return false; + } + + for (auto&& task : tasks) { + + auto design = task->design; // see if we already have the design locally; if not, load it if (!this->interface.increment_design(design)) { @@ -278,32 +273,31 @@ bool Downloader::fetch_tasks(size_t n) { this->interface.store_design(design, design_path); } - - // if we have a reference for this run, we have to see if it is loaded - if (reference != 0) { - - // see if we already have the reference run locally; if not, load it - if (!this->interface.increment_reference(reference)) { - - DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference)); - std::shared_ptr reference_run; - - // if we could not load the reference run, reopen the task in the database - if ((reference_run = this->fetch_reference_run(reference)) == nullptr) { - std::cerr << "Error: Could not load reference run for task " << task->id << ", reopening it." << std::endl; - this->reopen_task(task->id, true); - continue; - } - - this->interface.store_reference(reference, reference_run); - } - } - - // push the task to the list of open tasks - this->interface.push_fresh(std::move(task)); - } + // if we have a reference for this run, we have to see if it is loaded + for (auto&& reference : references) { + + // see if we already have the reference run locally; if not, load it + if (!this->interface.increment_reference(reference)) { + + DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference)); + std::shared_ptr reference_run; + + // if we could not load the reference run, reopen the task in the database + if ((reference_run = this->fetch_reference_run(reference)) == nullptr) { + std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl; + // this->reopen_task(task->id, true); + continue; + } + + this->interface.store_reference(reference, reference_run); + } + } + + // push the task to the list of open tasks + this->interface.push_fresh(tasks); + return true; } @@ -313,7 +307,7 @@ bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) { auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design, bool *found) { try { - auto res = txn->exec_params1("SELECT content FROM act_files WHERE artifact = $1;", *design_id); + auto res = txn->exec("SELECT content FROM act_files WHERE artifact = $1;", *design_id)[0]; // load design into string variable *design = res["content"].as(); *found = true; @@ -381,7 +375,7 @@ std::shared_ptr Downloader::fetch_reference_run(const db: bool *found ) { try { - auto res = txn->exec_params1("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id); + auto res = txn->exec("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id)[0]; // load the output token timings auto arr_ott = res["output_token_timings"].as_sql_array(); @@ -423,6 +417,11 @@ std::shared_ptr Downloader::fetch_reference_run(const db: return nullptr; } + std::cout << "Output tokens:" << std::endl; + for (auto&& token: output_token_timings) { + std::cout << token << std::endl; + } + auto reference = std::make_shared(); reference->set_output_token_timings(output_token_timings); reference->set_output_tokens(output_tokens); @@ -445,7 +444,7 @@ void Downloader::reopen_task(const db::uuid_t& id, bool halt) { // open up the status of this partial output in the database again auto task_reopen_lambda = [id, status](pqxx::work *txn) { - txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", id, status); + txn->exec("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status}); }; std::function task_reopen_func = task_reopen_lambda; diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp index 2dcb84c..99bd5cc 100644 --- a/src/actsim_agent/task_interface.cpp +++ b/src/actsim_agent/task_interface.cpp @@ -25,6 +25,8 @@ #include "util.h" #include +#include +#include #include "task_interface.hpp" TaskInterface::TaskInterface(size_t buffer_size) { @@ -52,6 +54,22 @@ void TaskInterface::push_fresh(std::unique_ptr task) { this->fresh_queue_empty_condition.notify_one(); } +void TaskInterface::push_fresh(std::vector>& tasks) { + + DEBUG_PRINT("New task in the queue!"); + + // lock the queue and insert into it + std::lock_guard lock(this->fresh_queue_mutex); + + while (!tasks.empty()) { + this->fresh_queue.push(std::move(tasks.front())); + tasks.erase(tasks.begin()); + } + + // we put one task in there so we notify one worker thread + this->fresh_queue_empty_condition.notify_one(); +} + bool TaskInterface::fresh_queue_empty() { std::lock_guard lock(this->fresh_queue_mutex); return this->fresh_queue.empty(); diff --git a/src/actsim_agent/uploader.cpp b/src/actsim_agent/uploader.cpp index 07dce0b..eb63aba 100644 --- a/src/actsim_agent/uploader.cpp +++ b/src/actsim_agent/uploader.cpp @@ -23,6 +23,7 @@ ************************************************************************** */ +#include #include #include #include