diff --git a/include/actsim_agent/downloader.hpp b/include/actsim_agent/downloader.hpp index 19c6dfd..4ba4d63 100644 --- a/include/actsim_agent/downloader.hpp +++ b/include/actsim_agent/downloader.hpp @@ -44,6 +44,8 @@ class Downloader { void thread_run(); bool fetch_tasks(size_t n); + bool fetch_design(const db::uuid_t& id); + void reopen_task(const db::uuid_t& id); std::unique_ptr downloader_thread; std::unique_ptr conn; diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp index 0746f6c..4ddb68a 100644 --- a/include/actsim_agent/task_interface.hpp +++ b/include/actsim_agent/task_interface.hpp @@ -51,10 +51,12 @@ class TaskInterface { void wait_for_finished(); void wait_for_buffer_consume(); void wait_for_cleanup_ready(); + void wait_for_download_halt(); void notify_cleanup_ready(); void notify_download_program_halt(); void notify_workers_program_halt(); + void notify_download_halt(); void push_fresh(std::unique_ptr task); std::unique_ptr pop_fresh(bool& empty); @@ -62,14 +64,15 @@ class TaskInterface { std::unique_ptr pop_finished(bool& empty); size_t get_buffer_space(); - bool search_and_increment(db::uuid_t id, std::string& design); - void decrement(db::uuid_t id); - void store(db::uuid_t, std::string& design); + bool increment_design(const db::uuid_t& id); + void decrement_design(const db::uuid_t& id); + std::string get_design(const db::uuid_t& id); + void store_design(const db::uuid_t&, std::string& design); bool running() { return this->running_.load(std::memory_order_relaxed); }; - bool is_stop_immediate() { return !this->immediate_stop.load(std::memory_order_relaxed); }; + bool is_stop_immediate() { return this->immediate_stop.load(std::memory_order_relaxed); }; void stop() { this->running_.store(false, std::memory_order_relaxed); }; - void stop_immediately() { this->immediate_stop.store(false, std::memory_order_relaxed); }; + void stop_immediately() { this->immediate_stop.store(true, std::memory_order_relaxed); }; bool fresh_queue_empty(); bool finished_queue_empty(); @@ -110,6 +113,10 @@ class TaskInterface { // inform the download thread that data was consumed std::mutex fresh_queue_full_mutex; std::condition_variable fresh_queue_full_condition; + + // inform the main thread that the program is ending + std::mutex download_halt_mutex; + std::condition_variable download_halt_condition; }; #endif diff --git a/include/actsim_agent/worker.hpp b/include/actsim_agent/worker.hpp index 0cb4cb8..0f7187c 100644 --- a/include/actsim_agent/worker.hpp +++ b/include/actsim_agent/worker.hpp @@ -46,11 +46,14 @@ class Worker { private: void thread_run(); - std::unique_ptr perform_task(std::unique_ptr task, bool& finished); + std::unique_ptr perform_task(std::unique_ptr& task, bool& finished); + void run_actsim(); std::unique_ptr worker_thread; std::atomic current_task; + std::atomic_bool task_interrupted; + TaskInterface& interface; }; diff --git a/src/actsim_agent/actsim_agent.cpp b/src/actsim_agent/actsim_agent.cpp index a68d676..2ff0c5c 100644 --- a/src/actsim_agent/actsim_agent.cpp +++ b/src/actsim_agent/actsim_agent.cpp @@ -91,7 +91,7 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { // this is where the threads do their thing // anything past here means that the program was asked to close - download_thread->join(); + interface.wait_for_download_halt(); // inform all threads about the program ending interface.notify_workers_program_halt(); @@ -114,7 +114,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { // now we can cleanup what was generated after stop was called interface.notify_cleanup_ready(); - // wait for the upload to finish + // wait for the net-threads to finish + download_thread->join(); upload_thread->join(); DEBUG_PRINT("Done."); diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index eafa970..63b18cc 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -23,6 +23,7 @@ ************************************************************************** */ +#include #include "util.h" #include "downloader.hpp" @@ -63,6 +64,20 @@ void Downloader::thread_run() { this->fetch_tasks(this->interface.get_buffer_space()); } + + // notify the control thread that download has halted + this->interface.notify_download_halt(); + + // wait for the workers to finish their thing + this->interface.wait_for_cleanup_ready(); + + // then reopen all tasks that were still left to do + bool empty = false; + while (!empty) { + auto task = this->interface.pop_fresh(empty); + if (empty) continue; + this->reopen_task(task->id); + } } bool Downloader::fetch_tasks(size_t n) { @@ -73,10 +88,32 @@ bool Downloader::fetch_tasks(size_t n) { id = 1; auto job_id = "deadbeef"; - //for (size_t i = 0; i < n; ++i) { - // auto task = std::make_unique(id, job_id, TaskTypeType::SINGLE, false, db::uuid_t(), 2); - // this->interface.push_fresh(std::move(task)); - //} + for (size_t i = 0; i < n; ++i) { + auto task = std::make_unique(id, id); + + // see if we already have the desing locally; if not, load it + if (!this->interface.increment_design(task->design)) { + + // if we could not load the design, reopen it in the database + if (!this->fetch_design(task->design)) { + std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl; + this->reopen_task(task->id); + continue; + } + } + + // push the task to the list of open tasks + this->interface.push_fresh(std::move(task)); + + } return true; } + +bool Downloader::fetch_design(const db::uuid_t& id) { + return true; +} + +void Downloader::reopen_task(const db::uuid_t& id) { + std::cout << "[DOWNLOADER] Reopening task!" << std::endl; +} diff --git a/src/actsim_agent/monitor.cpp b/src/actsim_agent/monitor.cpp index 8acfce5..e66ae1e 100644 --- a/src/actsim_agent/monitor.cpp +++ b/src/actsim_agent/monitor.cpp @@ -89,6 +89,7 @@ void Monitor::thread_run() { } if (this->interface.is_stop_immediate()) { + DEBUG_PRINT("Immediate stop requested, cancelling all tasks!"); for (auto& worker : workers) worker->cancel_current(); } diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp index 24c2066..0165e71 100644 --- a/src/actsim_agent/task_interface.cpp +++ b/src/actsim_agent/task_interface.cpp @@ -27,6 +27,8 @@ TaskInterface::TaskInterface(size_t buffer_size) { this->buffer_size = buffer_size; + this->running_.store(true, std::memory_order_relaxed); + this->immediate_stop.store(false, std::memory_order_relaxed); } TaskInterface::~TaskInterface() { @@ -124,7 +126,7 @@ std::unique_ptr TaskInterface::pop_finished(bool& empty) { return task; } -bool TaskInterface::search_and_increment(db::uuid_t id, std::string& design) { +bool TaskInterface::increment_design(const db::uuid_t& id) { std::lock_guard lock (this->designs_mutex); // make sure the requested design is in the list of available designs @@ -135,12 +137,10 @@ bool TaskInterface::search_and_increment(db::uuid_t id, std::string& design) { // if so, increment its reference counters ++design_entry.first; - // set the design parameter to the actual design information - design = design_entry.second; return true; } -void TaskInterface::decrement(db::uuid_t id) { +void TaskInterface::decrement_design(const db::uuid_t& id) { std::lock_guard lock (this->designs_mutex); // make sure the requested design is in the list of available designs @@ -158,7 +158,19 @@ void TaskInterface::decrement(db::uuid_t id) { } } -void TaskInterface::store(db::uuid_t id, std::string& design) { +std::string TaskInterface::get_design(const db::uuid_t& id) { + std::lock_guard lock (this->designs_mutex); + + // make sure the requested design is in the list of available designs + if (this->designs.find(id) == this->designs.end()) { + std::cerr << "Error: Design was somehow deleted before it could reach the execution stage. This should really never happen!" << std::endl; + return ""; + } + + return this->designs[id].second; +} + +void TaskInterface::store_design(const db::uuid_t& id, std::string& design) { std::lock_guard lock (this->designs_mutex); // make sure the design isn't already in the list of design entries @@ -188,6 +200,11 @@ void TaskInterface::wait_for_cleanup_ready() { this->cleanup_ready_condition.wait(lock, [&] { return this->cleanup_ready.load(std::memory_order_seq_cst); }); } +void TaskInterface::wait_for_download_halt() { + std::unique_lock lock(this->download_halt_mutex); + this->download_halt_condition.wait(lock, [&] { return !this->running(); }); +} + void TaskInterface::notify_download_program_halt() { this->fresh_queue_full_condition.notify_all(); } @@ -195,4 +212,8 @@ void TaskInterface::notify_download_program_halt() { void TaskInterface::notify_workers_program_halt() { this->finished_queue_empty_condition.notify_all(); this->fresh_queue_empty_condition.notify_all(); -} \ No newline at end of file +} + +void TaskInterface::notify_download_halt() { + this->download_halt_condition.notify_all(); +} diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index 6738590..f81aa6a 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -32,11 +32,13 @@ Worker::Worker(TaskInterface& interface) : interface(interface) {} void Worker::start() { DEBUG_PRINT("Starting worker thread..."); + this->task_interrupted.store(false, std::memory_order_relaxed); this->worker_thread = std::make_unique([this] () { this->thread_run(); }); } void Worker::cancel_current() { std::cout << "[WORKER] Current simulation cancelled." << std::endl; + this->task_interrupted.store(true, std::memory_order_relaxed); // set a condition variable and notify // must not be blocking } @@ -63,25 +65,56 @@ void Worker::thread_run() { // we need to make sure the queue wasn't emptied between waiting and getting new data if (queue_empty) continue; + // get the design this task uses; we'll need that later + auto design = task->design; + // everything is good, perform the given task bool complete; - this->current_task.store(db::uuid_t(), std::memory_order_relaxed); - auto output = this->perform_task(std::move(task), complete); + auto output = this->perform_task(task, complete); + + // testing code + complete = !this->task_interrupted.load(std::memory_order_relaxed); + this->task_interrupted.store(false, std::memory_order_relaxed); // if the task was finished, push the task to be uploaded // we need this since the task might have been interrupted // half way though - if (complete) this->interface.push_finished(std::move(output)); + if (complete) { + this->interface.push_finished(std::move(output)); + + // if this succeeded, we can decrease the number of + // tasks that require the design we needed for this task + this->interface.decrement_design(design); + } else { + + // if the task was not completed and we have reached the end of execution + // the tasks have to be reopened in the database; the download thread + // will handle everything remaining in the buffer, so this is where our + // unfinished tasks goes back into + if (!this->interface.running()) { + this->interface.push_fresh(std::move(task)); + } else { + + // the only other reason this could have failed is that we got + // interrupted since the corresponding task was halted; in this case + // we only wanna decrease our reference counter + this->interface.decrement_design(task->design); + } + } } } -std::unique_ptr Worker::perform_task(std::unique_ptr task, bool& finished) { - usleep(1000000); +std::unique_ptr Worker::perform_task(std::unique_ptr& task, bool& finished) { + usleep(10000000); std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl; task->id; - auto output = std::make_unique(); - return std::move(output); + finished = true; + return std::make_unique(); // wait for either the executing process to finish or // for the condition variable indicating the process should be stopped } + +void Worker::run_actsim() { + +}