implement actual database download, wait needs to be implemented
This commit is contained in:
parent
f1cca2506b
commit
8855d2f191
1 changed files with 194 additions and 17 deletions
|
|
@ -24,6 +24,11 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <cluster/artifact.hpp>
|
#include <cluster/artifact.hpp>
|
||||||
|
#include <filesystem>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <pqxx/pqxx>
|
||||||
|
#include <functional>
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "downloader.hpp"
|
#include "downloader.hpp"
|
||||||
|
|
||||||
|
|
@ -61,7 +66,10 @@ void Downloader::thread_run() {
|
||||||
if (!this->interface.running()) break;
|
if (!this->interface.running()) break;
|
||||||
|
|
||||||
// if the download buffer is not full, fetch some more tasks
|
// if the download buffer is not full, fetch some more tasks
|
||||||
this->fetch_tasks(this->interface.get_buffer_space());
|
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
|
||||||
|
// we can sleep for a certain amount of time, nothing to do
|
||||||
|
// TODO implement
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -83,29 +91,138 @@ void Downloader::thread_run() {
|
||||||
|
|
||||||
bool Downloader::fetch_tasks(size_t n) {
|
bool Downloader::fetch_tasks(size_t n) {
|
||||||
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
|
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
|
||||||
std::cout << "[DOWNLOAGER] Downloading " << n << " tasks, implement me!" << std::endl;
|
|
||||||
|
|
||||||
auto id = db::uuid_t();
|
|
||||||
id = 1;
|
|
||||||
auto job_id = "deadbeef";
|
|
||||||
|
|
||||||
for (size_t i = 0; i < n; ++i) {
|
|
||||||
auto task = std::make_unique<pl::SimConfigArtifact>(id, id);
|
|
||||||
|
|
||||||
// see if we already have the desing locally; if not, load it
|
for (size_t i = 0; i < n; ++i) {
|
||||||
if (!this->interface.increment_design(task->design)) {
|
|
||||||
|
// fetch a new task from the database
|
||||||
|
auto fetch_task_lambda = [](
|
||||||
|
pqxx::work *txn,
|
||||||
|
bool *task_avail,
|
||||||
|
pl::testcase_t *testcase,
|
||||||
|
db::uuid_t *target_artifact,
|
||||||
|
db::uuid_t *source_pass,
|
||||||
|
db::uuid_t *design,
|
||||||
|
db::uuid_t *source_config,
|
||||||
|
db::uuid_t *id
|
||||||
|
) {
|
||||||
|
|
||||||
DEBUG_PRINT("Fetching new design with ID " + db::to_string(task->design));
|
// Fetch a new task by a couple rules:
|
||||||
std::string design;
|
// 1.) It has not been started yet or the output row doesn't exist
|
||||||
|
// 2.) Passes that are already in progress are preferred
|
||||||
|
// 3.) New passes are started in the order they were added to the database
|
||||||
|
pqxx::row res = txn->exec1(
|
||||||
|
"SELECT "
|
||||||
|
" ap.design_file AS design, "
|
||||||
|
" ap.outputs AS target_artifact, "
|
||||||
|
" ap.id AS source_pass, "
|
||||||
|
" sc.id AS source_config, "
|
||||||
|
" sc.sim_commands, "
|
||||||
|
" so.id AS id "
|
||||||
|
"FROM "
|
||||||
|
" actsim_passes ap "
|
||||||
|
"JOIN "
|
||||||
|
" sim_configs sc ON ap.sim_configs = sc.artifact "
|
||||||
|
"LEFT JOIN "
|
||||||
|
" sim_outputs so ON sc.id = so.sim_config "
|
||||||
|
"JOIN "
|
||||||
|
" jobs j ON ap.job = j.id "
|
||||||
|
"WHERE "
|
||||||
|
" so.id IS NULL OR so.part_status = 'not_started' "
|
||||||
|
"ORDER BY "
|
||||||
|
" ap.pass_status = 'in_progress' DESC, "
|
||||||
|
" j.time_added ASC "
|
||||||
|
"LIMIT 1;"
|
||||||
|
);
|
||||||
|
|
||||||
|
// seems like there is nothing to do right now
|
||||||
|
if (res.empty()) {
|
||||||
|
*task_avail = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we got something back, sort the data
|
||||||
|
*task_avail = true;
|
||||||
|
*target_artifact = res["target_artifact"].as<db::uuid_t>();
|
||||||
|
*source_pass = res["source_pass"].as<db::uuid_t>();
|
||||||
|
*design = res["design"].as<db::uuid_t>();
|
||||||
|
*source_config = res["source_config"].as<db::uuid_t>();
|
||||||
|
|
||||||
|
if (res["id"].is_null()) {
|
||||||
|
// create a new sim output row in the database
|
||||||
|
pqxx::row new_row = txn->exec_params1(
|
||||||
|
"INSERT INTO sim_outputs (artifact, source_pass, sim_config) "
|
||||||
|
" VALUES ($1, $2, $3) "
|
||||||
|
" RETURNING id;",
|
||||||
|
*target_artifact,
|
||||||
|
*source_pass,
|
||||||
|
*source_config
|
||||||
|
);
|
||||||
|
|
||||||
|
// and get the auto-generated ID
|
||||||
|
*id = new_row["id"].as<db::uuid_t>();
|
||||||
|
} else {
|
||||||
|
// get the ID
|
||||||
|
*id = res["id"].as<db::uuid_t>();
|
||||||
|
|
||||||
|
// and since the sim output row already exists, update its status
|
||||||
|
txn->exec_params0(
|
||||||
|
"UPDATE sim_outputs SET part_status = 'in_progress WHERE id = $1;",
|
||||||
|
*id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
std::function<void(pqxx::work*, bool*, pl::testcase_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*)> fetch_task_func = fetch_task_lambda;
|
||||||
|
|
||||||
|
bool task_avail;
|
||||||
|
pl::testcase_t testcase;
|
||||||
|
db::uuid_t target_artifact;
|
||||||
|
db::uuid_t source_pass;
|
||||||
|
db::uuid_t design;
|
||||||
|
db::uuid_t source_config;
|
||||||
|
db::uuid_t id;
|
||||||
|
|
||||||
|
if (!this->conn->send_request(
|
||||||
|
&fetch_task_func,
|
||||||
|
&task_avail,
|
||||||
|
&testcase,
|
||||||
|
&target_artifact,
|
||||||
|
&source_pass,
|
||||||
|
&design,
|
||||||
|
&source_config,
|
||||||
|
&id
|
||||||
|
)) {
|
||||||
|
// if we lost connection, there's nothing else we can really do
|
||||||
|
std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl;
|
||||||
|
this->interface.stop();
|
||||||
|
this->interface.stop_immediately();
|
||||||
|
}
|
||||||
|
|
||||||
|
// seems like there are no tasks to do right now; tell the calling function
|
||||||
|
// that there is still space in the buffer and we should just wait a while
|
||||||
|
if (!task_avail) {
|
||||||
|
DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, source_config);
|
||||||
|
task->add_testcase(testcase);
|
||||||
|
|
||||||
|
// see if we already have the desing locally; if not, load it
|
||||||
|
if (!this->interface.increment_design(design)) {
|
||||||
|
|
||||||
|
DEBUG_PRINT("Fetching new design with ID " + db::to_string(design));
|
||||||
|
std::string design_path;
|
||||||
|
|
||||||
// if we could not load the design, reopen it in the database
|
// if we could not load the design, reopen it in the database
|
||||||
if (!this->fetch_design(task->design, design)) {
|
if (!this->fetch_design(design, design_path)) {
|
||||||
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
|
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
|
||||||
this->reopen_task(task->id);
|
this->reopen_task(task->id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
this->interface.store_design(task->design, design);
|
this->interface.store_design(design, design_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
// push the task to the list of open tasks
|
// push the task to the list of open tasks
|
||||||
|
|
@ -116,11 +233,71 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
|
bool Downloader::fetch_design([[maybe_unused]]const db::uuid_t& id, std::string& design) {
|
||||||
design = "test design";
|
design = "test design";
|
||||||
|
|
||||||
|
DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database.");
|
||||||
|
|
||||||
|
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design) {
|
||||||
|
pqxx::row res = txn->exec_params1("SELECT content FROM act_files WHERE id = $1;", *design_id);
|
||||||
|
|
||||||
|
// load design into string variable
|
||||||
|
*design = res["content"].as<std::string>();
|
||||||
|
};
|
||||||
|
|
||||||
|
std::function<void(pqxx::work*, const db::uuid_t*, std::string*)> fetch_design_func = fetch_design_lambda;
|
||||||
|
|
||||||
|
std::string design_content;
|
||||||
|
|
||||||
|
if (!this->conn->send_request(&fetch_design_func, &id, &design_content)) {
|
||||||
|
// if we lost connection, there's nothing else we can really do
|
||||||
|
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
|
||||||
|
this->interface.stop();
|
||||||
|
this->interface.stop_immediately();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we have to store the design content in a temporary file on the disk
|
||||||
|
// not the most elegant way, but actsim expects to read a file in its current state
|
||||||
|
|
||||||
|
// Since we want to create a temporary file on a UNIX system, this uses some non-portable C
|
||||||
|
// stuff. Not the most elegant, but we have to create a file for now.
|
||||||
|
|
||||||
|
std::string temp_template = std::string(std::filesystem::temp_directory_path()) + "/XXXXXX.act";
|
||||||
|
char *temp_name = new char[temp_template.length() + 1];
|
||||||
|
|
||||||
|
// create a new temporary file
|
||||||
|
int fd = mkstemps(temp_name, 4);
|
||||||
|
|
||||||
|
DEBUG_PRINT("Writing design to file " + temp_name);
|
||||||
|
|
||||||
|
// write the design contents into it
|
||||||
|
FILE* fp = fdopen(fd, "w");
|
||||||
|
fprintf(fp, "%s", design_content.c_str());
|
||||||
|
|
||||||
|
fclose(fp);
|
||||||
|
|
||||||
|
// and return the name of the file we just created
|
||||||
|
design = std::string(temp_name);
|
||||||
|
delete temp_name;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Downloader::reopen_task(const db::uuid_t& id) {
|
void Downloader::reopen_task(const db::uuid_t& id) {
|
||||||
std::cout << "[DOWNLOADER] Reopening task!" << std::endl;
|
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
|
||||||
|
|
||||||
|
// open up the status of this partial output in the database again
|
||||||
|
auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task) {
|
||||||
|
txn->exec_params0("UPDATE sim_outputs SET part_status = 'not_started' WHERE id = $1 AND status = 'in_progress';", *task);
|
||||||
|
};
|
||||||
|
|
||||||
|
std::function<void(pqxx::work*, const db::uuid_t*)> task_reopen_func = task_reopen_lambda;
|
||||||
|
|
||||||
|
if (!this->conn->send_request(&task_reopen_func, &id)) {
|
||||||
|
// if we lost connection, there's nothing else we can really do
|
||||||
|
std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl;
|
||||||
|
this->interface.stop();
|
||||||
|
this->interface.stop_immediately();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue