implement reference run handling in threading; actual output parsing is missing as of now
This commit is contained in:
parent
9ddcd8ea71
commit
711808eafb
7 changed files with 142 additions and 17 deletions
|
|
@ -38,6 +38,7 @@ class DBSimArtifact : public db::DBArtifact {
|
|||
const db::uuid_t& source_pass,
|
||||
const db::uuid_t& target_artifact,
|
||||
const db::uuid_t& design,
|
||||
const db::uuid_t& reference,
|
||||
const db::uuid_t& source_config
|
||||
);
|
||||
|
||||
|
|
@ -45,6 +46,11 @@ class DBSimArtifact : public db::DBArtifact {
|
|||
* @brief The UUID of the design this simulation uses
|
||||
*/
|
||||
const db::uuid_t& design = design_;
|
||||
|
||||
/**
|
||||
* @brief The UUID of the reference run this simulation uses
|
||||
*/
|
||||
const db::uuid_t& reference = reference_;
|
||||
|
||||
/**
|
||||
* @brief The UUID of the simulator configuration
|
||||
|
|
@ -53,6 +59,7 @@ class DBSimArtifact : public db::DBArtifact {
|
|||
|
||||
private:
|
||||
|
||||
db::uuid_t reference_;
|
||||
db::uuid_t design_;
|
||||
db::uuid_t source_config_;
|
||||
};
|
||||
|
|
@ -65,8 +72,9 @@ class DBSimConfigArtifact : public DBSimArtifact, public pl::SimConfigArtifact {
|
|||
const db::uuid_t& source_pass,
|
||||
const db::uuid_t& target_artifact,
|
||||
const db::uuid_t& design,
|
||||
const db::uuid_t& reference,
|
||||
const db::uuid_t& source_config
|
||||
) : DBSimArtifact(id, source_pass, target_artifact, design,source_config) {};
|
||||
) : DBSimArtifact(id, source_pass, target_artifact, design, reference, source_config) {};
|
||||
};
|
||||
class DBSimOutputArtifact : public DBSimArtifact, public pl::SimOutputArtifact {
|
||||
public:
|
||||
|
|
@ -76,8 +84,9 @@ class DBSimOutputArtifact : public DBSimArtifact, public pl::SimOutputArtifact {
|
|||
const db::uuid_t& source_pass,
|
||||
const db::uuid_t& target_artifact,
|
||||
const db::uuid_t& design,
|
||||
const db::uuid_t& reference,
|
||||
const db::uuid_t& source_config
|
||||
) : DBSimArtifact(id, source_pass, target_artifact, design,source_config) {};
|
||||
) : DBSimArtifact(id, source_pass, target_artifact, design, reference, source_config) {};
|
||||
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ class Downloader {
|
|||
void thread_run();
|
||||
bool fetch_tasks(size_t n);
|
||||
bool fetch_design(const db::uuid_t& id, std::string& design);
|
||||
std::shared_ptr<pl::SimOutputArtifact> fetch_reference_run(const db::uuid_t& id);
|
||||
void reopen_task(const db::uuid_t& id, bool halt);
|
||||
|
||||
std::unique_ptr<std::thread> downloader_thread;
|
||||
|
|
|
|||
|
|
@ -76,8 +76,8 @@ class TaskInterface {
|
|||
*/
|
||||
bool increment_reference(const db::uuid_t& id);
|
||||
void decrement_reference(const db::uuid_t& id);
|
||||
pl::SimOutputArtifact get_reference(const db::uuid_t& id);
|
||||
void store_reference(const db::uuid_t&, pl::SimOutputArtifact);
|
||||
std::shared_ptr<pl::SimOutputArtifact> get_reference(const db::uuid_t& id);
|
||||
void store_reference(const db::uuid_t&, std::shared_ptr<pl::SimOutputArtifact> reference_run);
|
||||
|
||||
bool running() { return this->running_.load(std::memory_order_relaxed); };
|
||||
bool is_stop_immediate() { return this->immediate_stop.load(std::memory_order_relaxed); };
|
||||
|
|
@ -98,7 +98,7 @@ class TaskInterface {
|
|||
volatile std::atomic_bool immediate_stop;
|
||||
|
||||
std::unordered_map<db::uuid_t, std::pair<size_t, std::string>> designs;
|
||||
std::unordered_map<db::uuid_t, std::pair<size_t, pl::SimOutputArtifact>> references;
|
||||
std::unordered_map<db::uuid_t, std::pair<size_t, std::shared_ptr<pl::SimOutputArtifact>>> references;
|
||||
|
||||
////// Mutexes //////
|
||||
|
||||
|
|
|
|||
|
|
@ -30,8 +30,10 @@ DBSimArtifact::DBSimArtifact(
|
|||
const db::uuid_t& source_pass,
|
||||
const db::uuid_t& target_artifact,
|
||||
const db::uuid_t& design,
|
||||
const db::uuid_t& reference,
|
||||
const db::uuid_t& source_config
|
||||
) : DBArtifact(id, source_pass, target_artifact) {
|
||||
this->design_ = design;
|
||||
this->reference_ = reference;
|
||||
this->source_config_ = source_config;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,6 +109,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
db::uuid_t *target_artifact,
|
||||
db::uuid_t *source_pass,
|
||||
db::uuid_t *design,
|
||||
db::uuid_t *reference,
|
||||
db::uuid_t *source_config,
|
||||
db::uuid_t *id
|
||||
) {
|
||||
|
|
@ -148,6 +149,14 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
" AND dep.pass_status != 'finished' "
|
||||
" ) "
|
||||
" ) "
|
||||
" AND (sc.has_reference IS NULL OR "
|
||||
" WHERE EXISTS ("
|
||||
" SELECT 1 "
|
||||
" FROM sim_outputs out "
|
||||
" WHERE out.sim_config = sc.has_reference "
|
||||
" AND out.part_status = 'finished' "
|
||||
" ) "
|
||||
" ) "
|
||||
"ORDER BY "
|
||||
" ap.pass_status = 'in_progress' DESC, "
|
||||
" j.time_added ASC "
|
||||
|
|
@ -167,6 +176,11 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
*target_artifact = row["target_artifact"].as<db::uuid_t>();
|
||||
*source_pass = row["source_pass"].as<db::uuid_t>();
|
||||
*design = row["design"].as<db::uuid_t>();
|
||||
if (!row["reference"].is_null()) {
|
||||
*reference = row["reference"].as<db::uuid_t>();
|
||||
} else {
|
||||
*reference = db::uuid_t();
|
||||
}
|
||||
*source_config = row["source_config"].as<db::uuid_t>();
|
||||
|
||||
std::vector<std::string> commands;
|
||||
|
|
@ -211,13 +225,14 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
|
||||
};
|
||||
|
||||
std::function<void(pqxx::work*, bool*, pl::testcase_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*)> fetch_task_func = fetch_task_lambda;
|
||||
std::function<void(pqxx::work*, bool*, pl::testcase_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*)> fetch_task_func = fetch_task_lambda;
|
||||
|
||||
bool task_avail;
|
||||
pl::testcase_t testcase;
|
||||
db::uuid_t target_artifact;
|
||||
db::uuid_t source_pass;
|
||||
db::uuid_t design;
|
||||
db::uuid_t reference;
|
||||
db::uuid_t source_config;
|
||||
db::uuid_t id;
|
||||
|
||||
|
|
@ -228,6 +243,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
&target_artifact,
|
||||
&source_pass,
|
||||
&design,
|
||||
&reference,
|
||||
&source_config,
|
||||
&id
|
||||
)) {
|
||||
|
|
@ -246,7 +262,7 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
|
||||
DEBUG_PRINT("Fetched task with id " + db::to_string(id) + ", stemming from pass " + db::to_string(source_pass) + ", outputting to artifact " + db::to_string(target_artifact));
|
||||
DEBUG_PRINT("Design used is " + db::to_string(design) + ", simulation config " + db::to_string(source_config));
|
||||
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, source_config);
|
||||
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, reference, source_config);
|
||||
task->add_testcase(testcase);
|
||||
|
||||
// see if we already have the design locally; if not, load it
|
||||
|
|
@ -265,16 +281,35 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
this->interface.store_design(design, design_path);
|
||||
}
|
||||
|
||||
// push the task to the list of open tasks
|
||||
this->interface.push_fresh(std::move(task));
|
||||
|
||||
// if we have a reference for this run, we have to see if it is loaded
|
||||
if (reference != 0) {
|
||||
|
||||
// see if we already have the reference run locally; if not, load it
|
||||
if (!this->interface.increment_reference(reference)) {
|
||||
|
||||
DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference));
|
||||
std::shared_ptr<pl::SimOutputArtifact> reference_run;
|
||||
|
||||
// if we could not load the reference run, reopen the task in the database
|
||||
if ((reference_run = this->fetch_reference_run(design)) == nullptr) {
|
||||
std::cerr << "Error: Could not load reference run for task " << task->id << ", reopening it." << std::endl;
|
||||
this->reopen_task(task->id, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
this->interface.store_reference(design, reference_run);
|
||||
}
|
||||
|
||||
// push the task to the list of open tasks
|
||||
this->interface.push_fresh(std::move(task));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
|
||||
design = "test design";
|
||||
|
||||
DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database.");
|
||||
|
||||
|
|
@ -336,6 +371,70 @@ bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
|
|||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db::uuid_t& id) {
|
||||
|
||||
DEBUG_PRINT("Loading reference run with ID " + db::to_string(id) + " from database.");
|
||||
|
||||
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *ref_run_id, std::vector<std::string> *sim_log, std::vector<std::string> *error_log, bool *found) {
|
||||
try {
|
||||
auto res = txn->exec_params1("SELECT sim_log, error_log FROM sim_outputs WHERE sim_config = $1;", *ref_run_id);
|
||||
|
||||
// load sim_log into string vector
|
||||
*sim_log = std::vector<std::string>();
|
||||
auto arr_l = res["sim_log"].as_array();
|
||||
std::pair<pqxx::array_parser::juncture, std::string> elem;
|
||||
do {
|
||||
elem = arr_l.get_next();
|
||||
if (elem.first == pqxx::array_parser::juncture::string_value) {
|
||||
(*sim_log).push_back(elem.second);
|
||||
}
|
||||
} while (elem.first != pqxx::array_parser::juncture::done);
|
||||
|
||||
// parse the error log into string vector
|
||||
*error_log = std::vector<std::string>();
|
||||
auto arr_e = res["error_log"].as_array();
|
||||
do {
|
||||
elem = arr_e.get_next();
|
||||
if (elem.first == pqxx::array_parser::juncture::string_value) {
|
||||
(*error_log).push_back(elem.second);
|
||||
}
|
||||
} while (elem.first != pqxx::array_parser::juncture::done);
|
||||
|
||||
*found = true;
|
||||
|
||||
} catch (pqxx::unexpected_rows& e) {
|
||||
std::cerr << "Error: Failed to fetch design " << *ref_run_id << std::endl;
|
||||
*found = false;
|
||||
}
|
||||
};
|
||||
|
||||
std::function<void(
|
||||
pqxx::work *txn,
|
||||
const db::uuid_t *ref_run_id,
|
||||
std::vector<std::string> *sim_log,
|
||||
std::vector<std::string> *error_log,
|
||||
bool *found
|
||||
)> fetch_design_func = fetch_design_lambda;
|
||||
|
||||
std::vector<std::string> sim_log;
|
||||
std::vector<std::string> error_log;
|
||||
bool ref_run_found;
|
||||
|
||||
if (!this->conn->send_request(&fetch_design_func, &id, &sim_log, &error_log, &ref_run_found)) {
|
||||
// if we lost connection, there's nothing else we can really do
|
||||
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
|
||||
this->interface.stop();
|
||||
this->interface.stop_immediately();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!ref_run_found) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return std::make_shared<pl::SimOutputArtifact>(sim_log, error_log);
|
||||
}
|
||||
|
||||
void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
|
||||
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
|
||||
|
||||
|
|
|
|||
|
|
@ -216,7 +216,7 @@ bool TaskInterface::increment_reference(const db::uuid_t& id) {
|
|||
return false;
|
||||
}
|
||||
|
||||
std::pair<size_t, pl::SimOutputArtifact>& reference_run = references[id];
|
||||
auto& reference_run = references[id];
|
||||
|
||||
// if so, increment its reference counter
|
||||
++reference_run.first;
|
||||
|
|
@ -235,7 +235,7 @@ void TaskInterface::decrement_reference(const db::uuid_t& id) {
|
|||
return;
|
||||
}
|
||||
|
||||
std::pair<size_t, pl::SimOutputArtifact>& reference_run = references[id];
|
||||
auto& reference_run = references[id];
|
||||
|
||||
// if so, decrement its reference counters
|
||||
--reference_run.first;
|
||||
|
|
@ -250,19 +250,19 @@ void TaskInterface::decrement_reference(const db::uuid_t& id) {
|
|||
}
|
||||
}
|
||||
|
||||
pl::SimOutputArtifact TaskInterface::get_reference(const db::uuid_t& id) {
|
||||
std::shared_ptr<pl::SimOutputArtifact> TaskInterface::get_reference(const db::uuid_t& id) {
|
||||
std::lock_guard<std::mutex> lock (this->references_mutex);
|
||||
|
||||
// make sure the requested reference run is in the list of available reference runs
|
||||
if (this->references.find(id) == this->references.end()) {
|
||||
std::cerr << "Error: Reference run was somehow deleted before it could reach the execution stage. This should really never happen!" << std::endl;
|
||||
return pl::SimOutputArtifact();
|
||||
return std::make_shared<pl::SimOutputArtifact>();
|
||||
}
|
||||
|
||||
return this->references[id].second;
|
||||
}
|
||||
|
||||
void TaskInterface::store_reference(const db::uuid_t& id, pl::SimOutputArtifact reference) {
|
||||
void TaskInterface::store_reference(const db::uuid_t& id, std::shared_ptr<pl::SimOutputArtifact> reference) {
|
||||
std::lock_guard<std::mutex> lock (this->references_mutex);
|
||||
|
||||
DEBUG_PRINT("Storing new design with ID " + db::to_string(id));
|
||||
|
|
|
|||
|
|
@ -73,6 +73,9 @@ void Worker::thread_run() {
|
|||
// get the design this task uses; we'll need that later
|
||||
auto design = task->design;
|
||||
|
||||
// get the reference as well; here it's not yet important if the test actually has one
|
||||
auto reference = task->reference;
|
||||
|
||||
// everything is good, perform the given task
|
||||
bool complete;
|
||||
auto output = this->perform_task(task, complete);
|
||||
|
|
@ -89,6 +92,11 @@ void Worker::thread_run() {
|
|||
// if this succeeded, we can decrease the number of
|
||||
// tasks that require the design we needed for this task
|
||||
this->interface.decrement_design(design);
|
||||
|
||||
// in case this run was compared to a reference, handle that ref counter too
|
||||
if (reference != 0) {
|
||||
this->interface.decrement_reference(reference);
|
||||
}
|
||||
} else {
|
||||
|
||||
// there are two possible reasons the task was not finished
|
||||
|
|
@ -98,6 +106,11 @@ void Worker::thread_run() {
|
|||
// we got interrupted since, the current task was halted; in this case
|
||||
// we only wanna decrease our reference counter
|
||||
this->interface.decrement_design(task->design);
|
||||
|
||||
if (reference != 0) {
|
||||
this->interface.decrement_reference(reference);
|
||||
}
|
||||
|
||||
this->task_interrupted.store(false, std::memory_order_relaxed);
|
||||
} else {
|
||||
DEBUG_PRINT("Something went wrong during task execution");
|
||||
|
|
@ -121,7 +134,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
|
|||
if (task->get_content().size() != 1) {
|
||||
std::cerr << "Error: Simulation configuration in worker thread has more than one testcases to run!" << std::endl;
|
||||
finished = false;
|
||||
return std::make_unique<OutputType>(task->id, task->source_pass, task->target_artifact, task->design, task->source_config);
|
||||
return std::make_unique<OutputType>(task->id, task->source_pass, task->target_artifact, task->design, task->reference, task->source_config);
|
||||
}
|
||||
|
||||
auto testcase = task->get_content().front();
|
||||
|
|
@ -311,6 +324,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
|
|||
task->source_pass,
|
||||
task->target_artifact,
|
||||
task->design,
|
||||
task->reference,
|
||||
task->source_config
|
||||
);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue