fixed design fetch pulling right ID and not crashing on missing design, downloader sleeps if no tasks available, doesn't crash if no tasks available
This commit is contained in:
parent
5f013f9efa
commit
f444f5e026
1 changed files with 55 additions and 28 deletions
|
|
@ -29,6 +29,7 @@
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <pqxx/pqxx>
|
#include <pqxx/pqxx>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <chrono>
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "downloader.hpp"
|
#include "downloader.hpp"
|
||||||
|
|
||||||
|
|
@ -50,6 +51,8 @@ void Downloader::join() {
|
||||||
|
|
||||||
void Downloader::thread_run() {
|
void Downloader::thread_run() {
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
// connect to the database
|
// connect to the database
|
||||||
if (!this->conn->connect()) {
|
if (!this->conn->connect()) {
|
||||||
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
|
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
|
||||||
|
|
@ -68,7 +71,8 @@ void Downloader::thread_run() {
|
||||||
// if the download buffer is not full, fetch some more tasks
|
// if the download buffer is not full, fetch some more tasks
|
||||||
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
|
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
|
||||||
// we can sleep for a certain amount of time, nothing to do
|
// we can sleep for a certain amount of time, nothing to do
|
||||||
// TODO implement
|
DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit...");
|
||||||
|
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -84,7 +88,7 @@ void Downloader::thread_run() {
|
||||||
while (!empty) {
|
while (!empty) {
|
||||||
auto task = this->interface.pop_fresh(empty);
|
auto task = this->interface.pop_fresh(empty);
|
||||||
if (empty) continue;
|
if (empty) continue;
|
||||||
this->reopen_task(task->id);
|
this->reopen_task(task->id, false);
|
||||||
this->interface.decrement_design(task->design);
|
this->interface.decrement_design(task->design);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -111,7 +115,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
// 2.) Passes that are already in progress are preferred
|
// 2.) Passes that are already in progress are preferred
|
||||||
// 3.) New passes are started in the order they were added to the database
|
// 3.) New passes are started in the order they were added to the database
|
||||||
// 4.) Passes are only started if all their dependencies are fulfilled
|
// 4.) Passes are only started if all their dependencies are fulfilled
|
||||||
pqxx::row res = txn->exec1(
|
auto res = txn->exec(
|
||||||
"SELECT "
|
"SELECT "
|
||||||
" ap.design_file AS design, "
|
" ap.design_file AS design, "
|
||||||
" ap.top_proc, "
|
" ap.top_proc, "
|
||||||
|
|
@ -148,20 +152,22 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
);
|
);
|
||||||
|
|
||||||
// seems like there is nothing to do right now
|
// seems like there is nothing to do right now
|
||||||
if (res.size() > 0) {
|
if (res.size() < 1) {
|
||||||
*task_avail = false;
|
*task_avail = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto row = res[0];
|
||||||
|
|
||||||
// we got something back, sort the data
|
// we got something back, sort the data
|
||||||
*task_avail = true;
|
*task_avail = true;
|
||||||
*target_artifact = res["target_artifact"].as<db::uuid_t>();
|
*target_artifact = row["target_artifact"].as<db::uuid_t>();
|
||||||
*source_pass = res["source_pass"].as<db::uuid_t>();
|
*source_pass = row["source_pass"].as<db::uuid_t>();
|
||||||
*design = res["design"].as<db::uuid_t>();
|
*design = row["design"].as<db::uuid_t>();
|
||||||
*source_config = res["source_config"].as<db::uuid_t>();
|
*source_config = row["source_config"].as<db::uuid_t>();
|
||||||
|
|
||||||
std::vector<std::string> commands;
|
std::vector<std::string> commands;
|
||||||
auto arr = res["sim_commands"].as_array();
|
auto arr = row["sim_commands"].as_array();
|
||||||
std::pair<pqxx::array_parser::juncture, std::string> elem;
|
std::pair<pqxx::array_parser::juncture, std::string> elem;
|
||||||
do {
|
do {
|
||||||
elem = arr.get_next();
|
elem = arr.get_next();
|
||||||
|
|
@ -172,12 +178,12 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
|
|
||||||
*testcase = {
|
*testcase = {
|
||||||
commands,
|
commands,
|
||||||
res["top_proc"].as<std::string>(),
|
row["top_proc"].as<std::string>(),
|
||||||
false,
|
false,
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
|
||||||
if (res["id"].is_null()) {
|
if (row["id"].is_null()) {
|
||||||
// create a new sim output row in the database
|
// create a new sim output row in the database
|
||||||
pqxx::row new_row = txn->exec_params1(
|
pqxx::row new_row = txn->exec_params1(
|
||||||
"INSERT INTO sim_outputs (artifact, source_pass, sim_config) "
|
"INSERT INTO sim_outputs (artifact, source_pass, sim_config) "
|
||||||
|
|
@ -192,11 +198,11 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
*id = new_row["id"].as<db::uuid_t>();
|
*id = new_row["id"].as<db::uuid_t>();
|
||||||
} else {
|
} else {
|
||||||
// get the ID
|
// get the ID
|
||||||
*id = res["id"].as<db::uuid_t>();
|
*id = row["id"].as<db::uuid_t>();
|
||||||
|
|
||||||
// and since the sim output row already exists, update its status
|
// and since the sim output row already exists, update its status
|
||||||
txn->exec_params0(
|
txn->exec_params0(
|
||||||
"UPDATE sim_outputs SET part_status = 'in_progress WHERE id = $1;",
|
"UPDATE sim_outputs SET part_status = 'in_progress' WHERE id = $1;",
|
||||||
*id
|
*id
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -250,7 +256,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
// if we could not load the design, reopen it in the database
|
// if we could not load the design, reopen it in the database
|
||||||
if (!this->fetch_design(design, design_path)) {
|
if (!this->fetch_design(design, design_path)) {
|
||||||
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
|
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
|
||||||
this->reopen_task(task->id);
|
this->reopen_task(task->id, true);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -265,23 +271,30 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string& design) {
|
bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
|
||||||
design = "test design";
|
design = "test design";
|
||||||
|
|
||||||
DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database.");
|
DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database.");
|
||||||
|
|
||||||
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design) {
|
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design, bool *found) {
|
||||||
pqxx::row res = txn->exec_params1("SELECT content FROM act_files WHERE id = $1;", *design_id);
|
try {
|
||||||
|
auto res = txn->exec_params1("SELECT content FROM act_files WHERE artifact = $1;", *design_id);
|
||||||
// load design into string variable
|
// load design into string variable
|
||||||
*design = res["content"].as<std::string>();
|
*design = res["content"].as<std::string>();
|
||||||
|
*found = true;
|
||||||
|
} catch (pqxx::unexpected_rows& e) {
|
||||||
|
std::cerr << "Error: Failed to fetch design " << *design_id << std::endl;
|
||||||
|
*found = false;
|
||||||
|
*design = "";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
std::function<void(pqxx::work*, const db::uuid_t*, std::string*)> fetch_design_func = fetch_design_lambda;
|
std::function<void(pqxx::work*, const db::uuid_t*, std::string*, bool*)> fetch_design_func = fetch_design_lambda;
|
||||||
|
|
||||||
std::string design_content;
|
std::string design_content;
|
||||||
|
bool design_found;
|
||||||
|
|
||||||
if (!this->conn->send_request(&fetch_design_func, &id, &design_content)) {
|
if (!this->conn->send_request(&fetch_design_func, &id, &design_content, &design_found)) {
|
||||||
// if we lost connection, there's nothing else we can really do
|
// if we lost connection, there's nothing else we can really do
|
||||||
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
|
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
|
||||||
this->interface.stop();
|
this->interface.stop();
|
||||||
|
|
@ -289,6 +302,10 @@ bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string&
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!design_found) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// now we have to store the design content in a temporary file on the disk
|
// now we have to store the design content in a temporary file on the disk
|
||||||
// not the most elegant way, but actsim expects to read a file in its current state
|
// not the most elegant way, but actsim expects to read a file in its current state
|
||||||
|
|
||||||
|
|
@ -297,11 +314,12 @@ bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string&
|
||||||
|
|
||||||
std::string temp_template = std::string(std::filesystem::temp_directory_path()) + "/XXXXXX.act";
|
std::string temp_template = std::string(std::filesystem::temp_directory_path()) + "/XXXXXX.act";
|
||||||
char *temp_name = new char[temp_template.length() + 1];
|
char *temp_name = new char[temp_template.length() + 1];
|
||||||
|
std::strcpy(temp_name, temp_template.c_str());
|
||||||
|
|
||||||
// create a new temporary file
|
// create a new temporary file
|
||||||
int fd = mkstemps(temp_name, 4);
|
int fd = mkstemps(temp_name, 4);
|
||||||
|
|
||||||
DEBUG_PRINT("Writing design to file " + temp_name);
|
DEBUG_PRINT("Writing design to file " << temp_name);
|
||||||
|
|
||||||
// write the design contents into it
|
// write the design contents into it
|
||||||
FILE* fp = fdopen(fd, "w");
|
FILE* fp = fdopen(fd, "w");
|
||||||
|
|
@ -316,17 +334,26 @@ bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string&
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Downloader::reopen_task(const db::uuid_t& id) {
|
void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
|
||||||
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
|
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
|
||||||
|
|
||||||
// open up the status of this partial output in the database again
|
// open up the status of this partial output in the database again
|
||||||
auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task) {
|
auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task, db::JobStatusType *status) {
|
||||||
txn->exec_params0("UPDATE sim_outputs SET part_status = 'not_started' WHERE id = $1 AND status = 'in_progress';", *task);
|
txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", *task, *status);
|
||||||
};
|
};
|
||||||
|
|
||||||
std::function<void(pqxx::work*, const db::uuid_t*)> task_reopen_func = task_reopen_lambda;
|
std::function<void(pqxx::work*, const db::uuid_t*, db::JobStatusType*)> task_reopen_func = task_reopen_lambda;
|
||||||
|
|
||||||
if (!this->conn->send_request(&task_reopen_func, &id)) {
|
db::JobStatusType status;
|
||||||
|
|
||||||
|
if (halt) {
|
||||||
|
status = db::JobStatusType::HALTED;
|
||||||
|
DEBUG_PRINT("Halting the task.");
|
||||||
|
} else {
|
||||||
|
status = db::JobStatusType::NOT_STARTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this->conn->send_request(&task_reopen_func, &id, &status)) {
|
||||||
// if we lost connection, there's nothing else we can really do
|
// if we lost connection, there's nothing else we can really do
|
||||||
std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl;
|
std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl;
|
||||||
this->interface.stop();
|
this->interface.stop();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue