add pulling token info and fix non-working downloader
This commit is contained in:
parent
182be82ac2
commit
7f8efe0e73
1 changed files with 40 additions and 26 deletions
|
|
@ -23,6 +23,7 @@
|
||||||
**************************************************************************
|
**************************************************************************
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <string>
|
||||||
#include <cluster/artifact.hpp>
|
#include <cluster/artifact.hpp>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
|
|
@ -72,7 +73,7 @@ void Downloader::thread_run() {
|
||||||
// if the download buffer is not full, fetch some more tasks
|
// if the download buffer is not full, fetch some more tasks
|
||||||
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
|
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
|
||||||
// we can sleep for a certain amount of time, nothing to do
|
// we can sleep for a certain amount of time, nothing to do
|
||||||
DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit...");
|
// DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit...");
|
||||||
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
|
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -102,7 +103,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
for (size_t i = 0; i < n; ++i) {
|
for (size_t i = 0; i < n; ++i) {
|
||||||
|
|
||||||
// fetch a new task from the database
|
// fetch a new task from the database
|
||||||
auto fetch_task_lambda = [](
|
auto fetch_task_lambda = [n](
|
||||||
pqxx::work *txn,
|
pqxx::work *txn,
|
||||||
bool *task_avail,
|
bool *task_avail,
|
||||||
pl::testcase_t *testcase,
|
pl::testcase_t *testcase,
|
||||||
|
|
@ -119,7 +120,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
// 2.) Passes that are already in progress are preferred
|
// 2.) Passes that are already in progress are preferred
|
||||||
// 3.) New passes are started in the order they were added to the database
|
// 3.) New passes are started in the order they were added to the database
|
||||||
// 4.) Passes are only started if all their dependencies are fulfilled
|
// 4.) Passes are only started if all their dependencies are fulfilled
|
||||||
auto res = txn->exec(
|
auto res = txn->exec_params(
|
||||||
"SELECT "
|
"SELECT "
|
||||||
" ap.design_file AS design, "
|
" ap.design_file AS design, "
|
||||||
" ap.top_proc, "
|
" ap.top_proc, "
|
||||||
|
|
@ -127,6 +128,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
" ap.id AS source_pass, "
|
" ap.id AS source_pass, "
|
||||||
" sc.id AS source_config, "
|
" sc.id AS source_config, "
|
||||||
" sc.sim_commands, "
|
" sc.sim_commands, "
|
||||||
|
" sc.has_reference AS reference, "
|
||||||
" so.id AS id "
|
" so.id AS id "
|
||||||
"FROM "
|
"FROM "
|
||||||
" actsim_passes ap "
|
" actsim_passes ap "
|
||||||
|
|
@ -150,7 +152,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
" ) "
|
" ) "
|
||||||
" ) "
|
" ) "
|
||||||
" AND (sc.has_reference IS NULL OR "
|
" AND (sc.has_reference IS NULL OR "
|
||||||
" WHERE EXISTS ("
|
" EXISTS ("
|
||||||
" SELECT 1 "
|
" SELECT 1 "
|
||||||
" FROM sim_outputs out "
|
" FROM sim_outputs out "
|
||||||
" WHERE out.sim_config = sc.has_reference "
|
" WHERE out.sim_config = sc.has_reference "
|
||||||
|
|
@ -160,7 +162,8 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
"ORDER BY "
|
"ORDER BY "
|
||||||
" ap.pass_status = 'in_progress' DESC, "
|
" ap.pass_status = 'in_progress' DESC, "
|
||||||
" j.time_added ASC "
|
" j.time_added ASC "
|
||||||
"LIMIT 1;"
|
"LIMIT $1;",
|
||||||
|
n
|
||||||
);
|
);
|
||||||
|
|
||||||
// seems like there is nothing to do right now
|
// seems like there is nothing to do right now
|
||||||
|
|
@ -291,19 +294,19 @@ bool Downloader::fetch_tasks(size_t n) {
|
||||||
std::shared_ptr<pl::SimOutputArtifact> reference_run;
|
std::shared_ptr<pl::SimOutputArtifact> reference_run;
|
||||||
|
|
||||||
// if we could not load the reference run, reopen the task in the database
|
// if we could not load the reference run, reopen the task in the database
|
||||||
if ((reference_run = this->fetch_reference_run(design)) == nullptr) {
|
if ((reference_run = this->fetch_reference_run(reference)) == nullptr) {
|
||||||
std::cerr << "Error: Could not load reference run for task " << task->id << ", reopening it." << std::endl;
|
std::cerr << "Error: Could not load reference run for task " << task->id << ", reopening it." << std::endl;
|
||||||
this->reopen_task(task->id, true);
|
this->reopen_task(task->id, true);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
this->interface.store_reference(design, reference_run);
|
this->interface.store_reference(reference, reference_run);
|
||||||
}
|
}
|
||||||
|
|
||||||
// push the task to the list of open tasks
|
|
||||||
this->interface.push_fresh(std::move(task));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// push the task to the list of open tasks
|
||||||
|
this->interface.push_fresh(std::move(task));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -375,9 +378,14 @@ std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db:
|
||||||
|
|
||||||
DEBUG_PRINT("Loading reference run with ID " + db::to_string(id) + " from database.");
|
DEBUG_PRINT("Loading reference run with ID " + db::to_string(id) + " from database.");
|
||||||
|
|
||||||
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *ref_run_id, std::vector<std::string> *sim_log, std::vector<std::string> *error_log, bool *found) {
|
auto fetch_design_lambda = [id](
|
||||||
|
pqxx::work *txn, std::vector<std::string> *sim_log,
|
||||||
|
std::vector<std::string> *error_log,
|
||||||
|
long *output_tokens,
|
||||||
|
bool *found
|
||||||
|
) {
|
||||||
try {
|
try {
|
||||||
auto res = txn->exec_params1("SELECT sim_log, error_log FROM sim_outputs WHERE sim_config = $1;", *ref_run_id);
|
auto res = txn->exec_params1("SELECT sim_log, error_log, output_tokens FROM sim_outputs WHERE sim_config = $1;", id);
|
||||||
|
|
||||||
// load sim_log into string vector
|
// load sim_log into string vector
|
||||||
*sim_log = std::vector<std::string>();
|
*sim_log = std::vector<std::string>();
|
||||||
|
|
@ -400,27 +408,30 @@ std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db:
|
||||||
}
|
}
|
||||||
} while (elem.first != pqxx::array_parser::juncture::done);
|
} while (elem.first != pqxx::array_parser::juncture::done);
|
||||||
|
|
||||||
|
*output_tokens = res["output_tokens"].as<long>();
|
||||||
|
|
||||||
*found = true;
|
*found = true;
|
||||||
|
|
||||||
} catch (pqxx::unexpected_rows& e) {
|
} catch (pqxx::unexpected_rows& e) {
|
||||||
std::cerr << "Error: Failed to fetch design " << *ref_run_id << std::endl;
|
std::cerr << "Error: Failed to fetch reference run " << id << ": " << e.what() << std::endl;
|
||||||
*found = false;
|
*found = false;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
std::function<void(
|
std::function<void(
|
||||||
pqxx::work *txn,
|
pqxx::work *txn,
|
||||||
const db::uuid_t *ref_run_id,
|
|
||||||
std::vector<std::string> *sim_log,
|
std::vector<std::string> *sim_log,
|
||||||
std::vector<std::string> *error_log,
|
std::vector<std::string> *error_log,
|
||||||
|
long *output_tokens,
|
||||||
bool *found
|
bool *found
|
||||||
)> fetch_design_func = fetch_design_lambda;
|
)> fetch_design_func = fetch_design_lambda;
|
||||||
|
|
||||||
std::vector<std::string> sim_log;
|
std::vector<std::string> sim_log;
|
||||||
std::vector<std::string> error_log;
|
std::vector<std::string> error_log;
|
||||||
|
long output_tokens;
|
||||||
bool ref_run_found;
|
bool ref_run_found;
|
||||||
|
|
||||||
if (!this->conn->send_request(&fetch_design_func, &id, &sim_log, &error_log, &ref_run_found)) {
|
if (!this->conn->send_request(&fetch_design_func, &sim_log, &error_log, &output_tokens, &ref_run_found)) {
|
||||||
// if we lost connection, there's nothing else we can really do
|
// if we lost connection, there's nothing else we can really do
|
||||||
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
|
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
|
||||||
this->interface.stop();
|
this->interface.stop();
|
||||||
|
|
@ -432,19 +443,15 @@ std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db:
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_shared<pl::SimOutputArtifact>(sim_log, error_log);
|
auto reference = std::make_shared<pl::SimOutputArtifact>(sim_log, error_log);
|
||||||
|
reference->set_output_tokens(output_tokens);
|
||||||
|
|
||||||
|
return reference;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
|
void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
|
||||||
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
|
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
|
||||||
|
|
||||||
// open up the status of this partial output in the database again
|
|
||||||
auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task, db::JobStatusType *status) {
|
|
||||||
txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", *task, *status);
|
|
||||||
};
|
|
||||||
|
|
||||||
std::function<void(pqxx::work*, const db::uuid_t*, db::JobStatusType*)> task_reopen_func = task_reopen_lambda;
|
|
||||||
|
|
||||||
db::JobStatusType status;
|
db::JobStatusType status;
|
||||||
|
|
||||||
if (halt) {
|
if (halt) {
|
||||||
|
|
@ -454,7 +461,14 @@ void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
|
||||||
status = db::JobStatusType::NOT_STARTED;
|
status = db::JobStatusType::NOT_STARTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this->conn->send_request(&task_reopen_func, &id, &status)) {
|
// open up the status of this partial output in the database again
|
||||||
|
auto task_reopen_lambda = [id, status](pqxx::work *txn) {
|
||||||
|
txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", id, status);
|
||||||
|
};
|
||||||
|
|
||||||
|
std::function<void(pqxx::work*)> task_reopen_func = task_reopen_lambda;
|
||||||
|
|
||||||
|
if (!this->conn->send_request(&task_reopen_func)) {
|
||||||
// if we lost connection, there's nothing else we can really do
|
// if we lost connection, there's nothing else we can really do
|
||||||
std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl;
|
std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl;
|
||||||
this->interface.stop();
|
this->interface.stop();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue