diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index 2c0a906..2b09102 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -24,6 +24,11 @@ */ #include +#include +#include +#include +#include +#include #include "util.h" #include "downloader.hpp" @@ -61,7 +66,10 @@ void Downloader::thread_run() { if (!this->interface.running()) break; // if the download buffer is not full, fetch some more tasks - this->fetch_tasks(this->interface.get_buffer_space()); + if (!this->fetch_tasks(this->interface.get_buffer_space())) { + // we can sleep for a certain amount of time, nothing to do + // TODO implement + } } @@ -83,29 +91,138 @@ void Downloader::thread_run() { bool Downloader::fetch_tasks(size_t n) { DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks..."); - std::cout << "[DOWNLOAGER] Downloading " << n << " tasks, implement me!" << std::endl; - - auto id = db::uuid_t(); - id = 1; - auto job_id = "deadbeef"; - - for (size_t i = 0; i < n; ++i) { - auto task = std::make_unique(id, id); - // see if we already have the desing locally; if not, load it - if (!this->interface.increment_design(task->design)) { + for (size_t i = 0; i < n; ++i) { + + // fetch a new task from the database + auto fetch_task_lambda = []( + pqxx::work *txn, + bool *task_avail, + pl::testcase_t *testcase, + db::uuid_t *target_artifact, + db::uuid_t *source_pass, + db::uuid_t *design, + db::uuid_t *source_config, + db::uuid_t *id + ) { - DEBUG_PRINT("Fetching new design with ID " + db::to_string(task->design)); - std::string design; + // Fetch a new task by a couple rules: + // 1.) It has not been started yet or the output row doesn't exist + // 2.) Passes that are already in progress are preferred + // 3.) New passes are started in the order they were added to the database + pqxx::row res = txn->exec1( + "SELECT " + " ap.design_file AS design, " + " ap.outputs AS target_artifact, " + " ap.id AS source_pass, " + " sc.id AS source_config, " + " sc.sim_commands, " + " so.id AS id " + "FROM " + " actsim_passes ap " + "JOIN " + " sim_configs sc ON ap.sim_configs = sc.artifact " + "LEFT JOIN " + " sim_outputs so ON sc.id = so.sim_config " + "JOIN " + " jobs j ON ap.job = j.id " + "WHERE " + " so.id IS NULL OR so.part_status = 'not_started' " + "ORDER BY " + " ap.pass_status = 'in_progress' DESC, " + " j.time_added ASC " + "LIMIT 1;" + ); + + // seems like there is nothing to do right now + if (res.empty()) { + *task_avail = false; + return; + } + + // we got something back, sort the data + *task_avail = true; + *target_artifact = res["target_artifact"].as(); + *source_pass = res["source_pass"].as(); + *design = res["design"].as(); + *source_config = res["source_config"].as(); + + if (res["id"].is_null()) { + // create a new sim output row in the database + pqxx::row new_row = txn->exec_params1( + "INSERT INTO sim_outputs (artifact, source_pass, sim_config) " + " VALUES ($1, $2, $3) " + " RETURNING id;", + *target_artifact, + *source_pass, + *source_config + ); + + // and get the auto-generated ID + *id = new_row["id"].as(); + } else { + // get the ID + *id = res["id"].as(); + + // and since the sim output row already exists, update its status + txn->exec_params0( + "UPDATE sim_outputs SET part_status = 'in_progress WHERE id = $1;", + *id + ); + } + + }; + + std::function fetch_task_func = fetch_task_lambda; + + bool task_avail; + pl::testcase_t testcase; + db::uuid_t target_artifact; + db::uuid_t source_pass; + db::uuid_t design; + db::uuid_t source_config; + db::uuid_t id; + + if (!this->conn->send_request( + &fetch_task_func, + &task_avail, + &testcase, + &target_artifact, + &source_pass, + &design, + &source_config, + &id + )) { + // if we lost connection, there's nothing else we can really do + std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl; + this->interface.stop(); + this->interface.stop_immediately(); + } + + // seems like there are no tasks to do right now; tell the calling function + // that there is still space in the buffer and we should just wait a while + if (!task_avail) { + DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!"); + return false; + } + + auto task = std::make_unique(id, source_pass, target_artifact, design, source_config); + task->add_testcase(testcase); + + // see if we already have the desing locally; if not, load it + if (!this->interface.increment_design(design)) { + + DEBUG_PRINT("Fetching new design with ID " + db::to_string(design)); + std::string design_path; // if we could not load the design, reopen it in the database - if (!this->fetch_design(task->design, design)) { + if (!this->fetch_design(design, design_path)) { std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl; this->reopen_task(task->id); continue; } - this->interface.store_design(task->design, design); + this->interface.store_design(design, design_path); } // push the task to the list of open tasks @@ -116,11 +233,71 @@ bool Downloader::fetch_tasks(size_t n) { return true; } -bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) { +bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string& design) { design = "test design"; + + DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database."); + + auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design) { + pqxx::row res = txn->exec_params1("SELECT content FROM act_files WHERE id = $1;", *design_id); + + // load design into string variable + *design = res["content"].as(); + }; + + std::function fetch_design_func = fetch_design_lambda; + + std::string design_content; + + if (!this->conn->send_request(&fetch_design_func, &id, &design_content)) { + // if we lost connection, there's nothing else we can really do + std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl; + this->interface.stop(); + this->interface.stop_immediately(); + return false; + } + + // now we have to store the design content in a temporary file on the disk + // not the most elegant way, but actsim expects to read a file in its current state + + // Since we want to create a temporary file on a UNIX system, this uses some non-portable C + // stuff. Not the most elegant, but we have to create a file for now. + + std::string temp_template = std::string(std::filesystem::temp_directory_path()) + "/XXXXXX.act"; + char *temp_name = new char[temp_template.length() + 1]; + + // create a new temporary file + int fd = mkstemps(temp_name, 4); + + DEBUG_PRINT("Writing design to file " + temp_name); + + // write the design contents into it + FILE* fp = fdopen(fd, "w"); + fprintf(fp, "%s", design_content.c_str()); + + fclose(fp); + + // and return the name of the file we just created + design = std::string(temp_name); + delete temp_name; + return true; } void Downloader::reopen_task(const db::uuid_t& id) { - std::cout << "[DOWNLOADER] Reopening task!" << std::endl; + DEBUG_PRINT("Reopening task with ID " + db::to_string(id)); + + // open up the status of this partial output in the database again + auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task) { + txn->exec_params0("UPDATE sim_outputs SET part_status = 'not_started' WHERE id = $1 AND status = 'in_progress';", *task); + }; + + std::function task_reopen_func = task_reopen_lambda; + + if (!this->conn->send_request(&task_reopen_func, &id)) { + // if we lost connection, there's nothing else we can really do + std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl; + this->interface.stop(); + this->interface.stop_immediately(); + } }