diff --git a/include/actsim_agent/agent_artifact.hpp b/include/actsim_agent/agent_artifact.hpp index f5fadfd..da3ee8d 100644 --- a/include/actsim_agent/agent_artifact.hpp +++ b/include/actsim_agent/agent_artifact.hpp @@ -38,6 +38,7 @@ class DBSimArtifact : public db::DBArtifact { const db::uuid_t& source_pass, const db::uuid_t& target_artifact, const db::uuid_t& design, + const db::uuid_t& reference, const db::uuid_t& source_config ); @@ -45,6 +46,11 @@ class DBSimArtifact : public db::DBArtifact { * @brief The UUID of the design this simulation uses */ const db::uuid_t& design = design_; + + /** + * @brief The UUID of the reference run this simulation uses + */ + const db::uuid_t& reference = reference_; /** * @brief The UUID of the simulator configuration @@ -53,6 +59,7 @@ class DBSimArtifact : public db::DBArtifact { private: + db::uuid_t reference_; db::uuid_t design_; db::uuid_t source_config_; }; @@ -65,8 +72,9 @@ class DBSimConfigArtifact : public DBSimArtifact, public pl::SimConfigArtifact { const db::uuid_t& source_pass, const db::uuid_t& target_artifact, const db::uuid_t& design, + const db::uuid_t& reference, const db::uuid_t& source_config - ) : DBSimArtifact(id, source_pass, target_artifact, design,source_config) {}; + ) : DBSimArtifact(id, source_pass, target_artifact, design, reference, source_config) {}; }; class DBSimOutputArtifact : public DBSimArtifact, public pl::SimOutputArtifact { public: @@ -76,8 +84,9 @@ class DBSimOutputArtifact : public DBSimArtifact, public pl::SimOutputArtifact { const db::uuid_t& source_pass, const db::uuid_t& target_artifact, const db::uuid_t& design, + const db::uuid_t& reference, const db::uuid_t& source_config - ) : DBSimArtifact(id, source_pass, target_artifact, design,source_config) {}; + ) : DBSimArtifact(id, source_pass, target_artifact, design, reference, source_config) {}; }; diff --git a/include/actsim_agent/downloader.hpp b/include/actsim_agent/downloader.hpp index b2c2084..ba0273d 100644 --- a/include/actsim_agent/downloader.hpp +++ b/include/actsim_agent/downloader.hpp @@ -47,6 +47,7 @@ class Downloader { void thread_run(); bool fetch_tasks(size_t n); bool fetch_design(const db::uuid_t& id, std::string& design); + std::shared_ptr fetch_reference_run(const db::uuid_t& id); void reopen_task(const db::uuid_t& id, bool halt); std::unique_ptr downloader_thread; diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp index 4381264..ef52e9e 100644 --- a/include/actsim_agent/task_interface.hpp +++ b/include/actsim_agent/task_interface.hpp @@ -76,8 +76,8 @@ class TaskInterface { */ bool increment_reference(const db::uuid_t& id); void decrement_reference(const db::uuid_t& id); - pl::SimOutputArtifact get_reference(const db::uuid_t& id); - void store_reference(const db::uuid_t&, pl::SimOutputArtifact); + std::shared_ptr get_reference(const db::uuid_t& id); + void store_reference(const db::uuid_t&, std::shared_ptr reference_run); bool running() { return this->running_.load(std::memory_order_relaxed); }; bool is_stop_immediate() { return this->immediate_stop.load(std::memory_order_relaxed); }; @@ -98,7 +98,7 @@ class TaskInterface { volatile std::atomic_bool immediate_stop; std::unordered_map> designs; - std::unordered_map> references; + std::unordered_map>> references; ////// Mutexes ////// diff --git a/src/actsim_agent/agent_artifact.cpp b/src/actsim_agent/agent_artifact.cpp index 1fed4fd..dcf040b 100644 --- a/src/actsim_agent/agent_artifact.cpp +++ b/src/actsim_agent/agent_artifact.cpp @@ -30,8 +30,10 @@ DBSimArtifact::DBSimArtifact( const db::uuid_t& source_pass, const db::uuid_t& target_artifact, const db::uuid_t& design, + const db::uuid_t& reference, const db::uuid_t& source_config ) : DBArtifact(id, source_pass, target_artifact) { this->design_ = design; + this->reference_ = reference; this->source_config_ = source_config; } diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index 7e8717a..33faa95 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -109,6 +109,7 @@ bool Downloader::fetch_tasks(size_t n) { db::uuid_t *target_artifact, db::uuid_t *source_pass, db::uuid_t *design, + db::uuid_t *reference, db::uuid_t *source_config, db::uuid_t *id ) { @@ -148,6 +149,14 @@ bool Downloader::fetch_tasks(size_t n) { " AND dep.pass_status != 'finished' " " ) " " ) " + " AND (sc.has_reference IS NULL OR " + " WHERE EXISTS (" + " SELECT 1 " + " FROM sim_outputs out " + " WHERE out.sim_config = sc.has_reference " + " AND out.part_status = 'finished' " + " ) " + " ) " "ORDER BY " " ap.pass_status = 'in_progress' DESC, " " j.time_added ASC " @@ -167,6 +176,11 @@ bool Downloader::fetch_tasks(size_t n) { *target_artifact = row["target_artifact"].as(); *source_pass = row["source_pass"].as(); *design = row["design"].as(); + if (!row["reference"].is_null()) { + *reference = row["reference"].as(); + } else { + *reference = db::uuid_t(); + } *source_config = row["source_config"].as(); std::vector commands; @@ -211,13 +225,14 @@ bool Downloader::fetch_tasks(size_t n) { }; - std::function fetch_task_func = fetch_task_lambda; + std::function fetch_task_func = fetch_task_lambda; bool task_avail; pl::testcase_t testcase; db::uuid_t target_artifact; db::uuid_t source_pass; db::uuid_t design; + db::uuid_t reference; db::uuid_t source_config; db::uuid_t id; @@ -228,6 +243,7 @@ bool Downloader::fetch_tasks(size_t n) { &target_artifact, &source_pass, &design, + &reference, &source_config, &id )) { @@ -246,7 +262,7 @@ bool Downloader::fetch_tasks(size_t n) { DEBUG_PRINT("Fetched task with id " + db::to_string(id) + ", stemming from pass " + db::to_string(source_pass) + ", outputting to artifact " + db::to_string(target_artifact)); DEBUG_PRINT("Design used is " + db::to_string(design) + ", simulation config " + db::to_string(source_config)); - auto task = std::make_unique(id, source_pass, target_artifact, design, source_config); + auto task = std::make_unique(id, source_pass, target_artifact, design, reference, source_config); task->add_testcase(testcase); // see if we already have the design locally; if not, load it @@ -265,16 +281,35 @@ bool Downloader::fetch_tasks(size_t n) { this->interface.store_design(design, design_path); } - // push the task to the list of open tasks - this->interface.push_fresh(std::move(task)); - + // if we have a reference for this run, we have to see if it is loaded + if (reference != 0) { + + // see if we already have the reference run locally; if not, load it + if (!this->interface.increment_reference(reference)) { + + DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference)); + std::shared_ptr reference_run; + + // if we could not load the reference run, reopen the task in the database + if ((reference_run = this->fetch_reference_run(design)) == nullptr) { + std::cerr << "Error: Could not load reference run for task " << task->id << ", reopening it." << std::endl; + this->reopen_task(task->id, true); + continue; + } + + this->interface.store_reference(design, reference_run); + } + + // push the task to the list of open tasks + this->interface.push_fresh(std::move(task)); + + } } return true; } bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) { - design = "test design"; DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database."); @@ -336,6 +371,70 @@ bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) { return true; } +std::shared_ptr Downloader::fetch_reference_run(const db::uuid_t& id) { + + DEBUG_PRINT("Loading reference run with ID " + db::to_string(id) + " from database."); + + auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *ref_run_id, std::vector *sim_log, std::vector *error_log, bool *found) { + try { + auto res = txn->exec_params1("SELECT sim_log, error_log FROM sim_outputs WHERE sim_config = $1;", *ref_run_id); + + // load sim_log into string vector + *sim_log = std::vector(); + auto arr_l = res["sim_log"].as_array(); + std::pair elem; + do { + elem = arr_l.get_next(); + if (elem.first == pqxx::array_parser::juncture::string_value) { + (*sim_log).push_back(elem.second); + } + } while (elem.first != pqxx::array_parser::juncture::done); + + // parse the error log into string vector + *error_log = std::vector(); + auto arr_e = res["error_log"].as_array(); + do { + elem = arr_e.get_next(); + if (elem.first == pqxx::array_parser::juncture::string_value) { + (*error_log).push_back(elem.second); + } + } while (elem.first != pqxx::array_parser::juncture::done); + + *found = true; + + } catch (pqxx::unexpected_rows& e) { + std::cerr << "Error: Failed to fetch design " << *ref_run_id << std::endl; + *found = false; + } + }; + + std::function *sim_log, + std::vector *error_log, + bool *found + )> fetch_design_func = fetch_design_lambda; + + std::vector sim_log; + std::vector error_log; + bool ref_run_found; + + if (!this->conn->send_request(&fetch_design_func, &id, &sim_log, &error_log, &ref_run_found)) { + // if we lost connection, there's nothing else we can really do + std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl; + this->interface.stop(); + this->interface.stop_immediately(); + return nullptr; + } + + if (!ref_run_found) { + return nullptr; + } + + return std::make_shared(sim_log, error_log); +} + void Downloader::reopen_task(const db::uuid_t& id, bool halt) { DEBUG_PRINT("Reopening task with ID " + db::to_string(id)); diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp index d695e97..2ac7a7b 100644 --- a/src/actsim_agent/task_interface.cpp +++ b/src/actsim_agent/task_interface.cpp @@ -216,7 +216,7 @@ bool TaskInterface::increment_reference(const db::uuid_t& id) { return false; } - std::pair& reference_run = references[id]; + auto& reference_run = references[id]; // if so, increment its reference counter ++reference_run.first; @@ -235,7 +235,7 @@ void TaskInterface::decrement_reference(const db::uuid_t& id) { return; } - std::pair& reference_run = references[id]; + auto& reference_run = references[id]; // if so, decrement its reference counters --reference_run.first; @@ -250,19 +250,19 @@ void TaskInterface::decrement_reference(const db::uuid_t& id) { } } -pl::SimOutputArtifact TaskInterface::get_reference(const db::uuid_t& id) { +std::shared_ptr TaskInterface::get_reference(const db::uuid_t& id) { std::lock_guard lock (this->references_mutex); // make sure the requested reference run is in the list of available reference runs if (this->references.find(id) == this->references.end()) { std::cerr << "Error: Reference run was somehow deleted before it could reach the execution stage. This should really never happen!" << std::endl; - return pl::SimOutputArtifact(); + return std::make_shared(); } return this->references[id].second; } -void TaskInterface::store_reference(const db::uuid_t& id, pl::SimOutputArtifact reference) { +void TaskInterface::store_reference(const db::uuid_t& id, std::shared_ptr reference) { std::lock_guard lock (this->references_mutex); DEBUG_PRINT("Storing new design with ID " + db::to_string(id)); diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index 24e57f4..0ab9dbc 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -73,6 +73,9 @@ void Worker::thread_run() { // get the design this task uses; we'll need that later auto design = task->design; + // get the reference as well; here it's not yet important if the test actually has one + auto reference = task->reference; + // everything is good, perform the given task bool complete; auto output = this->perform_task(task, complete); @@ -89,6 +92,11 @@ void Worker::thread_run() { // if this succeeded, we can decrease the number of // tasks that require the design we needed for this task this->interface.decrement_design(design); + + // in case this run was compared to a reference, handle that ref counter too + if (reference != 0) { + this->interface.decrement_reference(reference); + } } else { // there are two possible reasons the task was not finished @@ -98,6 +106,11 @@ void Worker::thread_run() { // we got interrupted since, the current task was halted; in this case // we only wanna decrease our reference counter this->interface.decrement_design(task->design); + + if (reference != 0) { + this->interface.decrement_reference(reference); + } + this->task_interrupted.store(false, std::memory_order_relaxed); } else { DEBUG_PRINT("Something went wrong during task execution"); @@ -121,7 +134,7 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas if (task->get_content().size() != 1) { std::cerr << "Error: Simulation configuration in worker thread has more than one testcases to run!" << std::endl; finished = false; - return std::make_unique(task->id, task->source_pass, task->target_artifact, task->design, task->source_config); + return std::make_unique(task->id, task->source_pass, task->target_artifact, task->design, task->reference, task->source_config); } auto testcase = task->get_content().front(); @@ -311,6 +324,7 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas task->source_pass, task->target_artifact, task->design, + task->reference, task->source_config );