diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index 8f5fcca..f5476e7 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include "util.h" #include "downloader.hpp" @@ -50,6 +51,8 @@ void Downloader::join() { void Downloader::thread_run() { + using namespace std::chrono_literals; + // connect to the database if (!this->conn->connect()) { std::cerr << "Error: Upload thread could not connect to the database!" << std::endl; @@ -68,7 +71,8 @@ void Downloader::thread_run() { // if the download buffer is not full, fetch some more tasks if (!this->fetch_tasks(this->interface.get_buffer_space())) { // we can sleep for a certain amount of time, nothing to do - // TODO implement + DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit..."); + std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME); } } @@ -84,7 +88,7 @@ void Downloader::thread_run() { while (!empty) { auto task = this->interface.pop_fresh(empty); if (empty) continue; - this->reopen_task(task->id); + this->reopen_task(task->id, false); this->interface.decrement_design(task->design); } } @@ -111,7 +115,7 @@ bool Downloader::fetch_tasks(size_t n) { // 2.) Passes that are already in progress are preferred // 3.) New passes are started in the order they were added to the database // 4.) Passes are only started if all their dependencies are fulfilled - pqxx::row res = txn->exec1( + auto res = txn->exec( "SELECT " " ap.design_file AS design, " " ap.top_proc, " @@ -148,20 +152,22 @@ bool Downloader::fetch_tasks(size_t n) { ); // seems like there is nothing to do right now - if (res.size() > 0) { + if (res.size() < 1) { *task_avail = false; return; } + auto row = res[0]; + // we got something back, sort the data *task_avail = true; - *target_artifact = res["target_artifact"].as(); - *source_pass = res["source_pass"].as(); - *design = res["design"].as(); - *source_config = res["source_config"].as(); + *target_artifact = row["target_artifact"].as(); + *source_pass = row["source_pass"].as(); + *design = row["design"].as(); + *source_config = row["source_config"].as(); std::vector commands; - auto arr = res["sim_commands"].as_array(); + auto arr = row["sim_commands"].as_array(); std::pair elem; do { elem = arr.get_next(); @@ -172,12 +178,12 @@ bool Downloader::fetch_tasks(size_t n) { *testcase = { commands, - res["top_proc"].as(), + row["top_proc"].as(), false, 0 }; - if (res["id"].is_null()) { + if (row["id"].is_null()) { // create a new sim output row in the database pqxx::row new_row = txn->exec_params1( "INSERT INTO sim_outputs (artifact, source_pass, sim_config) " @@ -192,11 +198,11 @@ bool Downloader::fetch_tasks(size_t n) { *id = new_row["id"].as(); } else { // get the ID - *id = res["id"].as(); + *id = row["id"].as(); // and since the sim output row already exists, update its status txn->exec_params0( - "UPDATE sim_outputs SET part_status = 'in_progress WHERE id = $1;", + "UPDATE sim_outputs SET part_status = 'in_progress' WHERE id = $1;", *id ); } @@ -250,7 +256,7 @@ bool Downloader::fetch_tasks(size_t n) { // if we could not load the design, reopen it in the database if (!this->fetch_design(design, design_path)) { std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl; - this->reopen_task(task->id); + this->reopen_task(task->id, true); continue; } @@ -265,23 +271,30 @@ bool Downloader::fetch_tasks(size_t n) { return true; } -bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string& design) { +bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) { design = "test design"; DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database."); - auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design) { - pqxx::row res = txn->exec_params1("SELECT content FROM act_files WHERE id = $1;", *design_id); - - // load design into string variable - *design = res["content"].as(); + auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design, bool *found) { + try { + auto res = txn->exec_params1("SELECT content FROM act_files WHERE artifact = $1;", *design_id); + // load design into string variable + *design = res["content"].as(); + *found = true; + } catch (pqxx::unexpected_rows& e) { + std::cerr << "Error: Failed to fetch design " << *design_id << std::endl; + *found = false; + *design = ""; + } }; - std::function fetch_design_func = fetch_design_lambda; + std::function fetch_design_func = fetch_design_lambda; std::string design_content; + bool design_found; - if (!this->conn->send_request(&fetch_design_func, &id, &design_content)) { + if (!this->conn->send_request(&fetch_design_func, &id, &design_content, &design_found)) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl; this->interface.stop(); @@ -289,6 +302,10 @@ bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string& return false; } + if (!design_found) { + return false; + } + // now we have to store the design content in a temporary file on the disk // not the most elegant way, but actsim expects to read a file in its current state @@ -297,11 +314,12 @@ bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string& std::string temp_template = std::string(std::filesystem::temp_directory_path()) + "/XXXXXX.act"; char *temp_name = new char[temp_template.length() + 1]; + std::strcpy(temp_name, temp_template.c_str()); // create a new temporary file int fd = mkstemps(temp_name, 4); - DEBUG_PRINT("Writing design to file " + temp_name); + DEBUG_PRINT("Writing design to file " << temp_name); // write the design contents into it FILE* fp = fdopen(fd, "w"); @@ -316,17 +334,26 @@ bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string& return true; } -void Downloader::reopen_task(const db::uuid_t& id) { +void Downloader::reopen_task(const db::uuid_t& id, bool halt) { DEBUG_PRINT("Reopening task with ID " + db::to_string(id)); // open up the status of this partial output in the database again - auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task) { - txn->exec_params0("UPDATE sim_outputs SET part_status = 'not_started' WHERE id = $1 AND status = 'in_progress';", *task); + auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task, db::JobStatusType *status) { + txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", *task, *status); }; - std::function task_reopen_func = task_reopen_lambda; + std::function task_reopen_func = task_reopen_lambda; - if (!this->conn->send_request(&task_reopen_func, &id)) { + db::JobStatusType status; + + if (halt) { + status = db::JobStatusType::HALTED; + DEBUG_PRINT("Halting the task."); + } else { + status = db::JobStatusType::NOT_STARTED; + } + + if (!this->conn->send_request(&task_reopen_func, &id, &status)) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl; this->interface.stop();