/************************************************************************* * * This file is part of the ACT library * * Copyright (c) 2024 Fabian Posch * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. * ************************************************************************** */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "agent_artifact.hpp" #include "cluster/db_types.hpp" #include "pqxx/prepared_statement.hxx" #include "task_interface.hpp" #include "util.h" #include "downloader.hpp" Downloader::Downloader(db::db_credentials_t db_cred, TaskInterface& interface) : interface(interface) { this->conn = std::make_unique(db_cred); } void Downloader::start() { DEBUG_PRINT("Starting downloader thread..."); this->downloader_thread = std::make_unique([this]() { thread_run(); }); } void Downloader::join() { if (this->downloader_thread == nullptr) return; this->downloader_thread->join(); } void Downloader::thread_run() { using namespace std::chrono_literals; // connect to the database if (!this->conn->connect()) { std::cerr << "Error: Upload thread could not connect to the database!" << std::endl; this->interface.stop(); this->interface.notify_download_halt(); return; } while (this->interface.running()) { // wait until there is more space in the buffer to fill this->interface.wait_for_buffer_consume(); // make sure we weren't woken up because the program closed if (!this->interface.running()) break; // if the download buffer is not full, fetch some more tasks if (!this->fetch_tasks(this->interface.get_buffer_space())) { // we can sleep for a certain amount of time, nothing to do // DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit..."); std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME); } } DEBUG_PRINT("Shutting down downloader"); // notify the control thread that download has halted this->interface.notify_download_halt(); // wait for the workers to finish their thing this->interface.wait_for_cleanup_ready(); // then reopen all tasks that were still left to do bool empty = false; while (!empty) { auto task = this->interface.pop_fresh(empty); if (empty) continue; this->reopen_task(task->id, false); this->interface.decrement_design(task->design); } } bool Downloader::fetch_tasks(size_t n) { DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks..."); // fetch a new task from the database auto fetch_task_lambda = [n]( pqxx::work *txn, bool *task_avail, std::unordered_set *references, std::vector> *tasks ) { // Fetch a new task by a couple rules: // 1.) It has not been started yet or the output row doesn't exist // 2.) Passes that are already in progress are preferred // 3.) New passes are started in the order they were added to the database // 4.) Passes are only started if all their dependencies are fulfilled auto res = txn->exec( "SELECT " " ap.design_file AS design, " " ap.top_proc, " " ap.outputs AS target_artifact, " " ap.id AS source_pass, " " sc.id AS source_config, " " sc.sim_commands, " " sc.has_reference AS reference, " " so.id AS id " "FROM " " sim_configs sc " "JOIN " " actsim_passes ap ON ap.sim_configs = sc.artifact " "LEFT JOIN " " sim_outputs so ON sc.id = so.sim_config " "JOIN " " jobs j ON ap.job = j.id " "WHERE " " (so.id IS NULL OR so.part_status = 'not_started') " " AND ap.id IN ( " " SELECT ap_dep.id " " FROM actsim_passes ap_dep " " JOIN passes p ON ap_dep.id = p.id " " WHERE NOT EXISTS ( " " SELECT 1 " " FROM passes dep " " WHERE dep.id = ANY(p.depends_on) " " AND dep.pass_status != 'finished' " " ) " " ) " " AND (sc.has_reference IS NULL OR " " EXISTS (" " SELECT 1 " " FROM sim_outputs out " " WHERE out.sim_config = sc.has_reference " " AND out.part_status = 'finished' " " ) " " ) " // "ORDER BY " // " ap.pass_status = 'in_progress' DESC, " // " j.time_added ASC " "LIMIT $1;", n ); // seems like there is nothing to do right now if (res.size() < 1) { *task_avail = false; return; } *task_avail = true; // generate the output rows std::string sql = R"( INSERT INTO sim_outputs (artifact, source_pass, sim_config) VALUES ($1, $2, $3) ON CONFLICT (sim_config) DO UPDATE SET part_status = 'in_progress' RETURNING id; )"; txn->conn().prepare("insert_output_rows", sql); for (auto row : res) { auto target_artifact = row["target_artifact"].as(); auto source_pass = row["source_pass"].as(); auto design = row["design"].as(); db::uuid_t reference; reference = 0; if (!row["reference"].is_null()) { reference = row["reference"].as(); (*references).emplace(reference); } auto source_config = row["source_config"].as(); auto com_arr = row["sim_commands"].as_sql_array(); std::vector commands(com_arr.cbegin(), com_arr.cend()); pl::testcase_t testcase = { commands, row["top_proc"].as(), false }; // get the id auto id = txn->exec(pqxx::prepped{"insert_output_rows"}, {target_artifact, source_pass, source_config})[0]["id"].as(); auto task = std::make_unique(id, source_pass, target_artifact, design, reference, source_config); task->add_testcase(testcase); (*tasks).emplace_back(std::move(task)); } txn->conn().unprepare("insert_output_rows"); }; std::function *references, std::vector> *tasks )> fetch_task_func = fetch_task_lambda; std::unordered_set references; bool task_avail; std::vector> tasks; if (!this->conn->send_request( &fetch_task_func, &task_avail, &references, &tasks )) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl; this->interface.stop(); this->interface.stop_immediately(); } // seems like there are no tasks to do right now; tell the calling function // that there is still space in the buffer and we should just wait a while if (!task_avail) { DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!"); return false; } for (auto&& task : tasks) { auto design = task->design; // see if we already have the design locally; if not, load it if (!this->interface.increment_design(design)) { DEBUG_PRINT("Fetching new design with ID " + db::to_string(design)); std::string design_path; // if we could not load the design, reopen it in the database if (!this->fetch_design(design, design_path)) { std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl; this->reopen_task(task->id, true); continue; } this->interface.store_design(design, design_path); } } // if we have a reference for this run, we have to see if it is loaded for (auto&& reference : references) { // see if we already have the reference run locally; if not, load it if (!this->interface.increment_reference(reference)) { DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference)); std::shared_ptr reference_run; // if we could not load the reference run, reopen the task in the database if ((reference_run = this->fetch_reference_run(reference)) == nullptr) { std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl; // this->reopen_task(task->id, true); continue; } this->interface.store_reference(reference, reference_run); } } // push the task to the list of open tasks this->interface.push_fresh(tasks); return true; } bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) { DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database."); auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design, bool *found) { try { auto res = txn->exec("SELECT content FROM act_files WHERE artifact = $1;", *design_id)[0]; // load design into string variable *design = res["content"].as(); *found = true; } catch (pqxx::unexpected_rows& e) { std::cerr << "Error: Failed to fetch design " << *design_id << std::endl; *found = false; *design = ""; } }; std::function fetch_design_func = fetch_design_lambda; std::string design_content; bool design_found; if (!this->conn->send_request(&fetch_design_func, &id, &design_content, &design_found)) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl; this->interface.stop(); this->interface.stop_immediately(); return false; } if (!design_found) { return false; } // now we have to store the design content in a temporary file on the disk // not the most elegant way, but actsim expects to read a file in its current state // Since we want to create a temporary file on a UNIX system, this uses some non-portable C // stuff. Not the most elegant, but we have to create a file for now. std::string temp_template = std::string(std::filesystem::temp_directory_path()) + "/XXXXXX.act"; char *temp_name = new char[temp_template.length() + 1]; std::strcpy(temp_name, temp_template.c_str()); // create a new temporary file int fd = mkstemps(temp_name, 4); DEBUG_PRINT("Writing design to file " << temp_name); // write the design contents into it FILE* fp = fdopen(fd, "w"); fprintf(fp, "%s", design_content.c_str()); fclose(fp); // and return the name of the file we just created design = std::string(temp_name); delete[] temp_name; return true; } std::shared_ptr Downloader::fetch_reference_run(const db::uuid_t& id) { DEBUG_PRINT("Loading reference run with ID " + db::to_string(id) + " from database."); auto fetch_design_lambda = [id]( pqxx::work *txn, std::vector *output_token_timings, long *output_tokens, long *log_size, bool *found ) { try { auto res = txn->exec("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id)[0]; // load the output token timings auto arr_ott = res["output_token_timings"].as_sql_array(); *output_token_timings = std::vector(arr_ott.cbegin(), arr_ott.cend()); *output_tokens = res["output_tokens"].as(); *log_size = res["log_size"].as(); *found = true; } catch (pqxx::unexpected_rows& e) { std::cerr << "Error: Failed to fetch reference run " << id << ": " << e.what() << std::endl; *found = false; } }; std::function *output_token_timings, long *output_tokens, long *log_size, bool *found )> fetch_design_func = fetch_design_lambda; std::vector output_token_timings; long output_tokens; long log_size; bool ref_run_found; if (!this->conn->send_request(&fetch_design_func, &output_token_timings, &output_tokens, &log_size, &ref_run_found)) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl; this->interface.stop(); this->interface.stop_immediately(); return nullptr; } if (!ref_run_found) { return nullptr; } auto reference = std::make_shared(); reference->set_output_token_timings(output_token_timings); reference->set_output_tokens(output_tokens); reference->set_size(log_size); return reference; } void Downloader::reopen_task(const db::uuid_t& id, bool halt) { DEBUG_PRINT("Reopening task with ID " + db::to_string(id)); db::JobStatusType status; if (halt) { status = db::JobStatusType::HALTED; DEBUG_PRINT("Halting the task."); } else { status = db::JobStatusType::NOT_STARTED; } // open up the status of this partial output in the database again auto task_reopen_lambda = [id, status](pqxx::work *txn) { txn->exec("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status}); }; std::function task_reopen_func = task_reopen_lambda; if (!this->conn->send_request(&task_reopen_func)) { // if we lost connection, there's nothing else we can really do std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl; this->interface.stop(); this->interface.stop_immediately(); } }