From d62fcf76695fae67d597d8e6a43407bff5addd5b Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Thu, 11 Jan 2024 15:06:05 -0500 Subject: [PATCH] migrate to artifact instead of task, make interface more flexible --- include/actsim_agent/task_interface.hpp | 19 ++++++++++++------- include/actsim_agent/uploader.hpp | 2 +- include/actsim_agent/worker.hpp | 2 +- src/actsim_agent/monitor.cpp | 3 +-- src/actsim_agent/task_interface.cpp | 14 +++++++------- src/actsim_agent/uploader.cpp | 2 +- src/actsim_agent/worker.cpp | 14 ++++++++------ 7 files changed, 31 insertions(+), 25 deletions(-) diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp index 0d6821d..0e2c95b 100644 --- a/include/actsim_agent/task_interface.hpp +++ b/include/actsim_agent/task_interface.hpp @@ -31,10 +31,15 @@ #include #include #include -#include +#include + +// if you want to use this interface for different types, you only need to change it here +using InputType = pl::SimConfigArtifact; +using OutputType = pl::SimOutputArtifact; class TaskInterface { + public: TaskInterface(size_t buffer_size); @@ -48,10 +53,10 @@ class TaskInterface { void notify_download_program_halt(); void notify_workers_program_halt(); - void push_fresh(std::unique_ptr task); - std::unique_ptr pop_fresh(bool& empty); - void push_finished(std::unique_ptr task); - std::unique_ptr pop_finished(bool& empty); + void push_fresh(std::unique_ptr task); + std::unique_ptr pop_fresh(bool& empty); + void push_finished(std::unique_ptr task); + std::unique_ptr pop_finished(bool& empty); size_t get_buffer_space(); bool search_and_increment(db::uuid_t id, std::string& design); @@ -70,8 +75,8 @@ class TaskInterface { size_t buffer_size; - std::queue> fresh_queue; - std::queue> finished_queue; + std::queue> fresh_queue; + std::queue> finished_queue; volatile std::atomic_bool running_ = std::atomic_bool(true); volatile std::atomic_bool immediate_stop; diff --git a/include/actsim_agent/uploader.hpp b/include/actsim_agent/uploader.hpp index 7d93522..ceef2c0 100644 --- a/include/actsim_agent/uploader.hpp +++ b/include/actsim_agent/uploader.hpp @@ -43,7 +43,7 @@ class Uploader { private: void thread_run(); - bool upload_task(std::unique_ptr task); + bool upload_task(std::unique_ptr task); std::unique_ptr uploader_thread; std::unique_ptr conn; diff --git a/include/actsim_agent/worker.hpp b/include/actsim_agent/worker.hpp index cf1db4e..0cb4cb8 100644 --- a/include/actsim_agent/worker.hpp +++ b/include/actsim_agent/worker.hpp @@ -46,7 +46,7 @@ class Worker { private: void thread_run(); - bool perform_task(std::unique_ptr& task); + std::unique_ptr perform_task(std::unique_ptr task, bool& finished); std::unique_ptr worker_thread; std::atomic current_task; diff --git a/src/actsim_agent/monitor.cpp b/src/actsim_agent/monitor.cpp index 6028165..8acfce5 100644 --- a/src/actsim_agent/monitor.cpp +++ b/src/actsim_agent/monitor.cpp @@ -25,7 +25,6 @@ #include #include -#include #include #include "util.h" #include "monitor.hpp" @@ -81,7 +80,7 @@ void Monitor::thread_run() { if ((task = worker->get_current_task()) != 0) { // if the job they belong to was halted, cancel the task execution - if (this->conn->get_task_status(task) == JobStatusType::HALTED) { + if (this->conn->get_task_status(task) == db::JobStatusType::HALTED) { DEBUG_PRINT("Task " + db::to_string(task) + " was halted, cancelling execution."); worker->cancel_current(); } diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp index 99dd5f4..a7e421f 100644 --- a/src/actsim_agent/task_interface.cpp +++ b/src/actsim_agent/task_interface.cpp @@ -29,7 +29,7 @@ TaskInterface::TaskInterface(size_t buffer_size) { this->buffer_size = buffer_size; } -void TaskInterface::push_fresh(std::unique_ptr task) { +void TaskInterface::push_fresh(std::unique_ptr task) { // lock the queue and insert into it std::lock_guard lock(this->fresh_queue_mutex); @@ -49,20 +49,20 @@ bool TaskInterface::finished_queue_empty() { return this->finished_queue.empty(); } -std::unique_ptr TaskInterface::pop_fresh(bool& empty) { +std::unique_ptr TaskInterface::pop_fresh(bool& empty) { // we first need exclusive access to the queue std::lock_guard lock (this->fresh_queue_mutex); // we have to make sure the queue wasn't emptied since the last empty call was made // or at least inform the calling thread that the queue is currently empty - std::unique_ptr task; + std::unique_ptr task; if (!(empty = this->fresh_queue.empty())) { task = std::move(this->fresh_queue.front()); // if the task needs to be simulated by multiple workers, don't remove it from the queue - if (task->task_type != TaskTypeType::MULTI_COV) { + if (!task->parallelizable()) { this->fresh_queue.pop(); this->fresh_queue_full_condition.notify_one(); } @@ -92,7 +92,7 @@ void TaskInterface::wait_for_buffer_consume() { this->fresh_queue_full_condition.wait(lock, [&] { return this->get_buffer_space() > 0 || !running(); }); } -void TaskInterface::push_finished(std::unique_ptr task) { +void TaskInterface::push_finished(std::unique_ptr task) { std::lock_guard lock(this->finished_queue_mutex); this->finished_queue.push(std::move(task)); @@ -100,14 +100,14 @@ void TaskInterface::push_finished(std::unique_ptr task) { this->finished_queue_empty_condition.notify_one(); } -std::unique_ptr TaskInterface::pop_finished(bool& empty) { +std::unique_ptr TaskInterface::pop_finished(bool& empty) { // we first need exclusive access to the queue std::lock_guard lock (this->finished_queue_mutex); // we have to make sure the queue wasn't emptied since the last empty call was made // or at least inform the calling thread that the queue is currently empty - std::unique_ptr task; + std::unique_ptr task; if (!(empty = this->finished_queue.empty())) { auto task = std::move(this->finished_queue.front()); diff --git a/src/actsim_agent/uploader.cpp b/src/actsim_agent/uploader.cpp index 233d4f6..7366ddb 100644 --- a/src/actsim_agent/uploader.cpp +++ b/src/actsim_agent/uploader.cpp @@ -99,7 +99,7 @@ void Uploader::thread_run() { } } -bool Uploader::upload_task([[maybe_unused]]std::unique_ptr task) { +bool Uploader::upload_task([[maybe_unused]]std::unique_ptr task) { // make sure any task that is uploaded isn't halted in the database diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index fb57e78..6738590 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -58,27 +58,29 @@ void Worker::thread_run() { // we're still good to go! get a task from the fresh queue bool queue_empty; auto task = this->interface.pop_fresh(queue_empty); - this->current_task.store(task->uuid, std::memory_order_relaxed); + this->current_task.store(task->id, std::memory_order_relaxed); // we need to make sure the queue wasn't emptied between waiting and getting new data if (queue_empty) continue; // everything is good, perform the given task - bool complete = this->perform_task(task); + bool complete; this->current_task.store(db::uuid_t(), std::memory_order_relaxed); + auto output = this->perform_task(std::move(task), complete); // if the task was finished, push the task to be uploaded // we need this since the task might have been interrupted // half way though - if (complete) this->interface.push_finished(std::move(task)); + if (complete) this->interface.push_finished(std::move(output)); } } -bool Worker::perform_task(std::unique_ptr& task) { +std::unique_ptr Worker::perform_task(std::unique_ptr task, bool& finished) { usleep(1000000); std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl; - task->uuid; - return true; + task->id; + auto output = std::make_unique(); + return std::move(output); // wait for either the executing process to finish or // for the condition variable indicating the process should be stopped