actsim-cluster-agent/src/actsim_agent/downloader.cpp

365 lines
13 KiB
C++

/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include <cluster/artifact.hpp>
#include <filesystem>
#include <cstdio>
#include <cstdlib>
#include <pqxx/pqxx>
#include <functional>
#include <chrono>
#include "util.h"
#include "downloader.hpp"
Downloader::Downloader(db::db_credentials_t db_cred, TaskInterface& interface) :
interface(interface)
{
this->conn = std::make_unique<db::Connection>(db_cred);
}
void Downloader::start() {
DEBUG_PRINT("Starting downloader thread...");
this->downloader_thread = std::make_unique<std::thread>([this]() { thread_run(); });
}
void Downloader::join() {
if (this->downloader_thread == nullptr) return;
this->downloader_thread->join();
}
void Downloader::thread_run() {
using namespace std::chrono_literals;
// connect to the database
if (!this->conn->connect()) {
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
this->interface.stop();
this->interface.notify_download_halt();
return;
}
while (this->interface.running()) {
// wait until there is more space in the buffer to fill
this->interface.wait_for_buffer_consume();
// make sure we weren't woken up because the program closed
if (!this->interface.running()) break;
// if the download buffer is not full, fetch some more tasks
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
// we can sleep for a certain amount of time, nothing to do
DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit...");
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
}
}
DEBUG_PRINT("Shutting down downloader");
// notify the control thread that download has halted
this->interface.notify_download_halt();
// wait for the workers to finish their thing
this->interface.wait_for_cleanup_ready();
// then reopen all tasks that were still left to do
bool empty = false;
while (!empty) {
auto task = this->interface.pop_fresh(empty);
if (empty) continue;
this->reopen_task(task->id, false);
this->interface.decrement_design(task->design);
}
}
bool Downloader::fetch_tasks(size_t n) {
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
for (size_t i = 0; i < n; ++i) {
// fetch a new task from the database
auto fetch_task_lambda = [](
pqxx::work *txn,
bool *task_avail,
pl::testcase_t *testcase,
db::uuid_t *target_artifact,
db::uuid_t *source_pass,
db::uuid_t *design,
db::uuid_t *source_config,
db::uuid_t *id
) {
// Fetch a new task by a couple rules:
// 1.) It has not been started yet or the output row doesn't exist
// 2.) Passes that are already in progress are preferred
// 3.) New passes are started in the order they were added to the database
// 4.) Passes are only started if all their dependencies are fulfilled
auto res = txn->exec(
"SELECT "
" ap.design_file AS design, "
" ap.top_proc, "
" ap.outputs AS target_artifact, "
" ap.id AS source_pass, "
" sc.id AS source_config, "
" sc.sim_commands, "
" so.id AS id "
"FROM "
" actsim_passes ap "
"JOIN "
" sim_configs sc ON ap.sim_configs = sc.artifact "
"LEFT JOIN "
" sim_outputs so ON sc.id = so.sim_config "
"JOIN "
" jobs j ON ap.job = j.id "
"WHERE "
" (so.id IS NULL OR so.part_status = 'not_started') "
" AND ap.id IN ( "
" SELECT ap_dep.id "
" FROM actsim_passes ap_dep "
" JOIN passes p ON ap_dep.id = p.id "
" WHERE NOT EXISTS ( "
" SELECT 1 "
" FROM passes dep "
" WHERE dep.id = ANY(p.depends_on) "
" AND dep.pass_status != 'finished' "
" ) "
" ) "
"ORDER BY "
" ap.pass_status = 'in_progress' DESC, "
" j.time_added ASC "
"LIMIT 1;"
);
// seems like there is nothing to do right now
if (res.size() < 1) {
*task_avail = false;
return;
}
auto row = res[0];
// we got something back, sort the data
*task_avail = true;
*target_artifact = row["target_artifact"].as<db::uuid_t>();
*source_pass = row["source_pass"].as<db::uuid_t>();
*design = row["design"].as<db::uuid_t>();
*source_config = row["source_config"].as<db::uuid_t>();
std::vector<std::string> commands;
auto arr = row["sim_commands"].as_array();
std::pair<pqxx::array_parser::juncture, std::string> elem;
do {
elem = arr.get_next();
if (elem.first == pqxx::array_parser::juncture::string_value) {
commands.push_back(elem.second);
}
} while (elem.first != pqxx::array_parser::juncture::done);
*testcase = {
commands,
row["top_proc"].as<std::string>(),
false,
0
};
if (row["id"].is_null()) {
// create a new sim output row in the database
pqxx::row new_row = txn->exec_params1(
"INSERT INTO sim_outputs (artifact, source_pass, sim_config) "
" VALUES ($1, $2, $3) "
" RETURNING id;",
*target_artifact,
*source_pass,
*source_config
);
// and get the auto-generated ID
*id = new_row["id"].as<db::uuid_t>();
} else {
// get the ID
*id = row["id"].as<db::uuid_t>();
// and since the sim output row already exists, update its status
txn->exec_params0(
"UPDATE sim_outputs SET part_status = 'in_progress' WHERE id = $1;",
*id
);
}
};
std::function<void(pqxx::work*, bool*, pl::testcase_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*)> fetch_task_func = fetch_task_lambda;
bool task_avail;
pl::testcase_t testcase;
db::uuid_t target_artifact;
db::uuid_t source_pass;
db::uuid_t design;
db::uuid_t source_config;
db::uuid_t id;
if (!this->conn->send_request(
&fetch_task_func,
&task_avail,
&testcase,
&target_artifact,
&source_pass,
&design,
&source_config,
&id
)) {
// if we lost connection, there's nothing else we can really do
std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl;
this->interface.stop();
this->interface.stop_immediately();
}
// seems like there are no tasks to do right now; tell the calling function
// that there is still space in the buffer and we should just wait a while
if (!task_avail) {
DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!");
return false;
}
DEBUG_PRINT("Fetched task with id " + db::to_string(id) + ", stemming from pass " + db::to_string(source_pass) + ", outputting to artifact " + db::to_string(target_artifact));
DEBUG_PRINT("Design used is " + db::to_string(design) + ", simulation config " + db::to_string(source_config));
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, source_config);
task->add_testcase(testcase);
// see if we already have the desing locally; if not, load it
if (!this->interface.increment_design(design)) {
DEBUG_PRINT("Fetching new design with ID " + db::to_string(design));
std::string design_path;
// if we could not load the design, reopen it in the database
if (!this->fetch_design(design, design_path)) {
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
this->reopen_task(task->id, true);
continue;
}
this->interface.store_design(design, design_path);
}
// push the task to the list of open tasks
this->interface.push_fresh(std::move(task));
}
return true;
}
bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
design = "test design";
DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database.");
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design, bool *found) {
try {
auto res = txn->exec_params1("SELECT content FROM act_files WHERE artifact = $1;", *design_id);
// load design into string variable
*design = res["content"].as<std::string>();
*found = true;
} catch (pqxx::unexpected_rows& e) {
std::cerr << "Error: Failed to fetch design " << *design_id << std::endl;
*found = false;
*design = "";
}
};
std::function<void(pqxx::work*, const db::uuid_t*, std::string*, bool*)> fetch_design_func = fetch_design_lambda;
std::string design_content;
bool design_found;
if (!this->conn->send_request(&fetch_design_func, &id, &design_content, &design_found)) {
// if we lost connection, there's nothing else we can really do
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
this->interface.stop();
this->interface.stop_immediately();
return false;
}
if (!design_found) {
return false;
}
// now we have to store the design content in a temporary file on the disk
// not the most elegant way, but actsim expects to read a file in its current state
// Since we want to create a temporary file on a UNIX system, this uses some non-portable C
// stuff. Not the most elegant, but we have to create a file for now.
std::string temp_template = std::string(std::filesystem::temp_directory_path()) + "/XXXXXX.act";
char *temp_name = new char[temp_template.length() + 1];
std::strcpy(temp_name, temp_template.c_str());
// create a new temporary file
int fd = mkstemps(temp_name, 4);
DEBUG_PRINT("Writing design to file " << temp_name);
// write the design contents into it
FILE* fp = fdopen(fd, "w");
fprintf(fp, "%s", design_content.c_str());
fclose(fp);
// and return the name of the file we just created
design = std::string(temp_name);
delete[] temp_name;
return true;
}
void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
// open up the status of this partial output in the database again
auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task, db::JobStatusType *status) {
txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", *task, *status);
};
std::function<void(pqxx::work*, const db::uuid_t*, db::JobStatusType*)> task_reopen_func = task_reopen_lambda;
db::JobStatusType status;
if (halt) {
status = db::JobStatusType::HALTED;
DEBUG_PRINT("Halting the task.");
} else {
status = db::JobStatusType::NOT_STARTED;
}
if (!this->conn->send_request(&task_reopen_func, &id, &status)) {
// if we lost connection, there's nothing else we can really do
std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl;
this->interface.stop();
this->interface.stop_immediately();
}
}