From 7f8efe0e730b60da3e424680ff859789667e1ed2 Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Fri, 3 Jan 2025 12:53:59 +0100 Subject: [PATCH] add pulling token info and fix non-working downloader --- src/actsim_agent/downloader.cpp | 66 ++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index 33faa95..2377e19 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -23,6 +23,7 @@ ************************************************************************** */ +#include #include #include #include @@ -72,7 +73,7 @@ void Downloader::thread_run() { // if the download buffer is not full, fetch some more tasks if (!this->fetch_tasks(this->interface.get_buffer_space())) { // we can sleep for a certain amount of time, nothing to do - DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit..."); + // DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit..."); std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME); } @@ -102,7 +103,7 @@ bool Downloader::fetch_tasks(size_t n) { for (size_t i = 0; i < n; ++i) { // fetch a new task from the database - auto fetch_task_lambda = []( + auto fetch_task_lambda = [n]( pqxx::work *txn, bool *task_avail, pl::testcase_t *testcase, @@ -119,7 +120,7 @@ bool Downloader::fetch_tasks(size_t n) { // 2.) Passes that are already in progress are preferred // 3.) New passes are started in the order they were added to the database // 4.) Passes are only started if all their dependencies are fulfilled - auto res = txn->exec( + auto res = txn->exec_params( "SELECT " " ap.design_file AS design, " " ap.top_proc, " @@ -127,6 +128,7 @@ bool Downloader::fetch_tasks(size_t n) { " ap.id AS source_pass, " " sc.id AS source_config, " " sc.sim_commands, " + " sc.has_reference AS reference, " " so.id AS id " "FROM " " actsim_passes ap " @@ -150,7 +152,7 @@ bool Downloader::fetch_tasks(size_t n) { " ) " " ) " " AND (sc.has_reference IS NULL OR " - " WHERE EXISTS (" + " EXISTS (" " SELECT 1 " " FROM sim_outputs out " " WHERE out.sim_config = sc.has_reference " @@ -160,7 +162,8 @@ bool Downloader::fetch_tasks(size_t n) { "ORDER BY " " ap.pass_status = 'in_progress' DESC, " " j.time_added ASC " - "LIMIT 1;" + "LIMIT $1;", + n ); // seems like there is nothing to do right now @@ -291,19 +294,19 @@ bool Downloader::fetch_tasks(size_t n) { std::shared_ptr reference_run; // if we could not load the reference run, reopen the task in the database - if ((reference_run = this->fetch_reference_run(design)) == nullptr) { + if ((reference_run = this->fetch_reference_run(reference)) == nullptr) { std::cerr << "Error: Could not load reference run for task " << task->id << ", reopening it." << std::endl; this->reopen_task(task->id, true); continue; } - this->interface.store_reference(design, reference_run); + this->interface.store_reference(reference, reference_run); } - - // push the task to the list of open tasks - this->interface.push_fresh(std::move(task)); - } + + // push the task to the list of open tasks + this->interface.push_fresh(std::move(task)); + } return true; @@ -375,9 +378,14 @@ std::shared_ptr Downloader::fetch_reference_run(const db: DEBUG_PRINT("Loading reference run with ID " + db::to_string(id) + " from database."); - auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *ref_run_id, std::vector *sim_log, std::vector *error_log, bool *found) { + auto fetch_design_lambda = [id]( + pqxx::work *txn, std::vector *sim_log, + std::vector *error_log, + long *output_tokens, + bool *found + ) { try { - auto res = txn->exec_params1("SELECT sim_log, error_log FROM sim_outputs WHERE sim_config = $1;", *ref_run_id); + auto res = txn->exec_params1("SELECT sim_log, error_log, output_tokens FROM sim_outputs WHERE sim_config = $1;", id); // load sim_log into string vector *sim_log = std::vector(); @@ -400,27 +408,30 @@ std::shared_ptr Downloader::fetch_reference_run(const db: } } while (elem.first != pqxx::array_parser::juncture::done); + *output_tokens = res["output_tokens"].as(); + *found = true; } catch (pqxx::unexpected_rows& e) { - std::cerr << "Error: Failed to fetch design " << *ref_run_id << std::endl; + std::cerr << "Error: Failed to fetch reference run " << id << ": " << e.what() << std::endl; *found = false; } }; std::function *sim_log, - std::vector *error_log, + std::vector *error_log, + long *output_tokens, bool *found )> fetch_design_func = fetch_design_lambda; std::vector sim_log; std::vector error_log; + long output_tokens; bool ref_run_found; - if (!this->conn->send_request(&fetch_design_func, &id, &sim_log, &error_log, &ref_run_found)) { + if (!this->conn->send_request(&fetch_design_func, &sim_log, &error_log, &output_tokens, &ref_run_found)) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl; this->interface.stop(); @@ -432,19 +443,15 @@ std::shared_ptr Downloader::fetch_reference_run(const db: return nullptr; } - return std::make_shared(sim_log, error_log); + auto reference = std::make_shared(sim_log, error_log); + reference->set_output_tokens(output_tokens); + + return reference; } void Downloader::reopen_task(const db::uuid_t& id, bool halt) { DEBUG_PRINT("Reopening task with ID " + db::to_string(id)); - // open up the status of this partial output in the database again - auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task, db::JobStatusType *status) { - txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", *task, *status); - }; - - std::function task_reopen_func = task_reopen_lambda; - db::JobStatusType status; if (halt) { @@ -454,7 +461,14 @@ void Downloader::reopen_task(const db::uuid_t& id, bool halt) { status = db::JobStatusType::NOT_STARTED; } - if (!this->conn->send_request(&task_reopen_func, &id, &status)) { + // open up the status of this partial output in the database again + auto task_reopen_lambda = [id, status](pqxx::work *txn) { + txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", id, status); + }; + + std::function task_reopen_func = task_reopen_lambda; + + if (!this->conn->send_request(&task_reopen_func)) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl; this->interface.stop();