diff --git a/include/actsim_agent/actsim_agent.hpp b/include/actsim_agent/actsim_agent.hpp index e919477..4257822 100644 --- a/include/actsim_agent/actsim_agent.hpp +++ b/include/actsim_agent/actsim_agent.hpp @@ -30,8 +30,13 @@ #define DATABASE_VERSION 2 +#define DOWNLOAD_BUFFER 5 + int start_agent(db::db_credentials_t db_cred, size_t worker_processes); +// private headers void sigint_handler(int signal); +void run_upload(db::db_credentials_t db_cred); +bool task_halted(db::uuid_t task); #endif diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp new file mode 100644 index 0000000..8cd2573 --- /dev/null +++ b/include/actsim_agent/task_interface.hpp @@ -0,0 +1,93 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#ifndef __TASK_INTERFACE__ +#define __TASK_INTERFACE__ + +#include +#include +#include +#include +#include +#include "task.hpp" + +class TaskInterface { + + public: + + TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size); + + void wait_for_fresh(); + void wait_for_finished(); + void wait_for_cleanup_ready(); + + void notify_cleanup_ready(); + void notify_program_halt(); + + void push_fresh(std::unique_ptr task); + std::unique_ptr pop_fresh(bool& empty); + void push_finished(std::unique_ptr task); + std::unique_ptr pop_finished(bool& empty); + bool fresh_buffer_full(); + + bool running() { return !this->stop_flag.load(std::memory_order_relaxed); }; + + private: + + bool fresh_queue_empty(); + bool finished_queue_empty(); + + size_t buffer_size; + + std::queue> fresh_queue; + std::queue> finished_queue; + + volatile std::atomic_bool& stop_flag; + + std::unordered_map> designs; + + ////// Mutexes ////// + + // access to task queues + std::mutex fresh_queue_mutex, finished_queue_mutex; + + // design map access + std::mutex designs_mutex; + + // notify upload thread that the finished queue is ready for cleanup + std::atomic_bool cleanup_ready; + std::mutex cleanup_ready_mutex; + std::condition_variable cleanup_ready_condition; + + // inform the worker threads that there is data in the fresh task queue + std::mutex fresh_queue_empty_mutex; + std::condition_variable fresh_queue_empty_condition; + + // inform the upload thread that there is data in the fresh task queue + std::mutex finished_queue_empty_mutex; + std::condition_variable finished_queue_empty_condition; +}; + +#endif diff --git a/include/actsim_agent/worker.hpp b/include/actsim_agent/worker.hpp index 6c20864..693b589 100644 --- a/include/actsim_agent/worker.hpp +++ b/include/actsim_agent/worker.hpp @@ -29,12 +29,13 @@ #include #include #include +#include "task_interface.hpp" class Worker { public: - Worker(volatile std::atomic_bool& stop_flag); + Worker(TaskInterface& interface); void start(); void cancel_current(); @@ -45,10 +46,12 @@ class Worker { private: void thread_run(); + void perform_task(std::unique_ptr& task, bool& success); std::unique_ptr worker_thread; std::atomic current_task; - volatile std::atomic_bool& stop_flag; + + TaskInterface& interface; }; #endif \ No newline at end of file diff --git a/src/actsim_agent/actsim_agent.cpp b/src/actsim_agent/actsim_agent.cpp index 45790af..823cd52 100644 --- a/src/actsim_agent/actsim_agent.cpp +++ b/src/actsim_agent/actsim_agent.cpp @@ -28,6 +28,7 @@ #include #include #include +#include "task_interface.hpp" #include "worker.hpp" #include "util.h" #include "actsim_agent.hpp" @@ -40,9 +41,6 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { DEBUG_PRINT("Agent asked to start with " + std::to_string(worker_processes) + " worker threads."); - // this will hold the ID of the task a given thread is currently working on - db::uuid_t worker_tasks[worker_processes]; - // inform the database about which database version we are implementing db_cred.version = DATABASE_VERSION; @@ -56,22 +54,29 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { // register the abort signal to stop the program std::signal(SIGINT, sigint_handler); + // create the thread interface + auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER); + // now we create the worker threads which will host our worker processes std::vector> workers; DEBUG_PRINT("Starting workers..."); for (size_t i = 0; i < worker_processes; i++) { - auto worker = std::make_unique(stop_requested); + auto worker = std::make_unique(interface); worker->start(); workers.emplace_back(std::move(worker)); } // Spawn the upload thread - auto upload_thread = std::make_unique(run_upload); + auto upload_thread = std::make_unique(run_upload, db_cred); // Run loop feeding the threads - while (!stop_requested.load(std::memory_order_relaxed)) { + while (true) { + + // if the program has been stopped, break the loop + if (stop_requested.load(std::memory_order_relaxed)) break; + //fill_queue(); // make sure the tasks the workers are performing haven't been halted @@ -88,6 +93,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { } } + interface.notify_program_halt(); + DEBUG_PRINT("Control thread is finished, waiting for workers to stop..."); // wait for the workers to finish @@ -117,10 +124,10 @@ void sigint_handler(int signal) { } } -bool task_halted(db::uuid_t task) { +bool task_halted([[maybe_unused]]db::uuid_t task) { return false; } -void run_upload() { +void run_upload([[maybe_unused]]db::db_credentials_t db_cred) { } diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp new file mode 100644 index 0000000..4a00ca7 --- /dev/null +++ b/src/actsim_agent/task_interface.cpp @@ -0,0 +1,132 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#include "task_interface.hpp" + +TaskInterface::TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size) : stop_flag(stop_flag) { + this->buffer_size = buffer_size; +} + +void TaskInterface::push_fresh(std::unique_ptr task) { + + // lock the queue and insert into it + std::lock_guard lock(this->fresh_queue_mutex); + this->fresh_queue.push(std::move(task)); + + // we put one task in there so we notify one worker thread + this->fresh_queue_empty_condition.notify_one(); +} + +bool TaskInterface::fresh_queue_empty() { + std::lock_guard lock(this->fresh_queue_mutex); + return this->fresh_queue.empty(); +} + +bool TaskInterface::finished_queue_empty() { + std::lock_guard lock(this->finished_queue_mutex); + return this->finished_queue.empty(); +} + +std::unique_ptr TaskInterface::pop_fresh(bool& empty) { + + // we first need exclusive access to the queue + std::lock_guard lock (this->fresh_queue_mutex); + + // we have to make sure the queue wasn't emptied since the last empty call was made + // or at least inform the calling thread that the queue is currently empty + std::unique_ptr task; + + if (!(empty = this->fresh_queue.empty())) { + task = std::move(this->fresh_queue.front()); + + // if the task needs to be simulated by multiple workers, don't remove it from the queue + if (task->get_task_type() != TaskTypeType::MULTI_COV) { + this->fresh_queue.pop(); + } + } + + return task; +} + +void TaskInterface::wait_for_fresh() { + std::unique_lock lock (this->fresh_queue_empty_mutex); + + // we will be notified either when there is new data or the program has been stopped + this->fresh_queue_empty_condition.wait(lock, [&] { return !this->fresh_queue_empty() || !running(); }); +} + +void TaskInterface::wait_for_finished() { + std::unique_lock lock (this->finished_queue_empty_mutex); + + // we will be notified either when there is new data or the program has been stopped + this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); }); +} + +void TaskInterface::push_finished(std::unique_ptr task) { + std::lock_guard lock(this->finished_queue_mutex); + this->finished_queue.push(std::move(task)); + + // there is new data, inform the upload thread + this->finished_queue_empty_condition.notify_one(); +} + +std::unique_ptr TaskInterface::pop_finished(bool& empty) { + + // we first need exclusive access to the queue + std::lock_guard lock (this->finished_queue_mutex); + + // we have to make sure the queue wasn't emptied since the last empty call was made + // or at least inform the calling thread that the queue is currently empty + std::unique_ptr task; + + if (!(empty = this->finished_queue.empty())) { + auto task = std::move(this->finished_queue.front()); + this->finished_queue.pop(); + } + + return task; +} + +bool TaskInterface::fresh_buffer_full() { + std::lock_guard lock(this->fresh_queue_mutex); + if (this->fresh_queue.size() < this->buffer_size) return false; + return true; +} + +void TaskInterface::notify_cleanup_ready() { + this->cleanup_ready.store(true, std::memory_order_seq_cst); + this->cleanup_ready_condition.notify_all(); +} + +void TaskInterface::wait_for_cleanup_ready() { + // lock the thread and wait to be notified + std::unique_lock lock(this->cleanup_ready_mutex); + this->cleanup_ready_condition.wait(lock, [&] { return this->cleanup_ready.load(std::memory_order_seq_cst); }); +} + +void TaskInterface::notify_program_halt() { + this->finished_queue_empty_condition.notify_all(); + this->fresh_queue_empty_condition.notify_all(); +} \ No newline at end of file diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index e0d2229..98eb934 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -27,7 +27,7 @@ #include #include "worker.hpp" -Worker::Worker(volatile std::atomic_bool& stop_flag) : stop_flag(stop_flag) {} +Worker::Worker(TaskInterface& interface) : interface(interface) {} void Worker::start() { std::cout << "Worker started" << std::endl; @@ -43,12 +43,33 @@ void Worker::join() { } void Worker::thread_run() { - while (true) { - std::cout << "Worker ping" << std::endl; - if (this->stop_flag.load(std::memory_order_relaxed)) { - break; - } + while (this->interface.running()) { - usleep(100000); + // this blocks until either a new task is available or the program was closed + this->interface.wait_for_fresh(); + + // so first we check if we should still be running + if (!this->interface.running()) break; + + // we're still good to go! get a task from the fresh queue + bool queue_empty; + auto task = this->interface.pop_fresh(queue_empty); + + // we need to make sure the queue wasn't emptied between waiting and getting new data + if (queue_empty) continue; + + // everything is good, perform the given task + bool success; + this->perform_task(task, success); + + // if everything worked, push the task to be uploaded + this->interface.push_finished(std::move(task)); } } + +void Worker::perform_task(std::unique_ptr& task, bool& success) { + std::cout << "Worker ping" << std::endl; + usleep(100000); + success = true; + task->get_uuid(); +}