From feb36c2eb6e1c9910bf656ccd04208acd40e4473 Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Wed, 10 Jan 2024 17:07:38 -0500 Subject: [PATCH] threading works, using full interface as threading control now --- include/actsim_agent/monitor.hpp | 2 -- include/actsim_agent/task_interface.hpp | 14 +++++++++----- src/actsim_agent/actsim_agent.cpp | 21 ++++++++++----------- src/actsim_agent/downloader.cpp | 14 ++++++++++++-- src/actsim_agent/monitor.cpp | 11 ++++++----- src/actsim_agent/task_interface.cpp | 10 +++++++--- src/actsim_agent/uploader.cpp | 5 +++-- src/actsim_agent/worker.cpp | 9 +++++---- 8 files changed, 52 insertions(+), 34 deletions(-) diff --git a/include/actsim_agent/monitor.hpp b/include/actsim_agent/monitor.hpp index 9991004..5d70cfe 100644 --- a/include/actsim_agent/monitor.hpp +++ b/include/actsim_agent/monitor.hpp @@ -48,7 +48,6 @@ class Monitor { ); bool check_connection(); - void stop_all(); void start(); void join(); @@ -63,7 +62,6 @@ class Monitor { std::vector>& workers; volatile std::atomic_bool& all_workers_finished; - volatile std::atomic_bool stop_all_ = std::atomic_bool(false); }; diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp index 71cddbd..86d2489 100644 --- a/include/actsim_agent/task_interface.hpp +++ b/include/actsim_agent/task_interface.hpp @@ -37,7 +37,7 @@ class TaskInterface { public: - TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size); + TaskInterface(size_t buffer_size); void wait_for_fresh(); void wait_for_finished(); @@ -45,7 +45,8 @@ class TaskInterface { void wait_for_cleanup_ready(); void notify_cleanup_ready(); - void notify_program_halt(); + void notify_download_program_halt(); + void notify_workers_program_halt(); void push_fresh(std::unique_ptr task); std::unique_ptr pop_fresh(bool& empty); @@ -53,8 +54,10 @@ class TaskInterface { std::unique_ptr pop_finished(bool& empty); size_t get_buffer_space(); - bool running() { return !this->stop_flag.load(std::memory_order_relaxed); }; - void stop() { this->stop_flag.store(false, std::memory_order_relaxed); }; + bool running() { return this->running_.load(std::memory_order_relaxed); }; + bool is_stop_immediate() { return !this->immediate_stop.load(std::memory_order_relaxed); }; + void stop() { this->running_.store(false, std::memory_order_relaxed); }; + void stop_immediately() { this->immediate_stop.store(false, std::memory_order_relaxed); }; bool fresh_queue_empty(); bool finished_queue_empty(); @@ -66,7 +69,8 @@ class TaskInterface { std::queue> fresh_queue; std::queue> finished_queue; - volatile std::atomic_bool& stop_flag; + volatile std::atomic_bool running_ = std::atomic_bool(true); + volatile std::atomic_bool immediate_stop; std::unordered_map> designs; diff --git a/src/actsim_agent/actsim_agent.cpp b/src/actsim_agent/actsim_agent.cpp index e3dc87c..7490e8f 100644 --- a/src/actsim_agent/actsim_agent.cpp +++ b/src/actsim_agent/actsim_agent.cpp @@ -37,9 +37,8 @@ #include "util.h" #include "actsim_agent.hpp" -// flag indicating program stop to the worker threads -static volatile std::atomic_bool stop_requested(false); -static std::unique_ptr monitor; +// threading safe interface +static TaskInterface interface(DOWNLOAD_BUFFER); int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { @@ -48,14 +47,11 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { // inform the database about which database version we are implementing db_cred.version = DATABASE_VERSION; - // create the thread interface - auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER); - // this will hold our worker threads once we spawn them std::vector> workers; volatile std::atomic_bool all_workers_finished(false); - monitor = std::make_unique(db_cred, workers, interface, all_workers_finished); + auto monitor = std::make_unique(db_cred, workers, interface, all_workers_finished); // First we check that we can talk to the database. No reason to spin up if there is no one to talk to. if (!monitor->check_connection()) { @@ -90,13 +86,15 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { auto download_thread = std::make_unique(db_cred, interface); download_thread->start(); + DEBUG_PRINT("Agent is running!"); + // this is where the threads do their thing // anything past here means that the program was asked to close download_thread->join(); // inform all threads about the program ending - interface.notify_program_halt(); + interface.notify_workers_program_halt(); DEBUG_PRINT("Program was closed, download thread stopped, waiting for workers to finish..."); @@ -128,12 +126,13 @@ void sigint_handler(int signal) { if (signal != SIGINT) return; - if (stop_requested) { + if (!interface.running()) { std::cout << "Stopping all workers immediately and closing open tasks." << std::endl; - monitor->stop_all(); + interface.stop_immediately(); } else { - stop_requested.store(true, std::memory_order_relaxed); + interface.stop(); + interface.notify_download_program_halt(); std::cout << "Finishing all running simulations, then closing the agent." << std::endl; std::cout << "To stop immediately instead, press Ctrl+C again." << std::endl; diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp index 801511c..ae0081b 100644 --- a/src/actsim_agent/downloader.cpp +++ b/src/actsim_agent/downloader.cpp @@ -33,7 +33,7 @@ Downloader::Downloader(db::db_credentials_t db_cred, TaskInterface& interface) : } void Downloader::start() { - std::cout << "REMOVE Download thread started" << std::endl; + DEBUG_PRINT("Starting downloader thread..."); this->downloader_thread = std::make_unique([this]() { thread_run(); }); } @@ -67,6 +67,16 @@ void Downloader::thread_run() { bool Downloader::fetch_tasks(size_t n) { DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks..."); - std::cout << "Downloading " << n << " tasks, implement me!" << std::endl; + std::cout << "[DOWNLOAGER] Downloading " << n << " tasks, implement me!" << std::endl; + + auto id = db::uuid_t(); + id = 1; + auto job_id = "deadbeef"; + + for (size_t i = 0; i < n; ++i) { + auto task = std::make_unique(id, job_id, TaskTypeType::SINGLE, false, db::uuid_t(), 2); + this->interface.push_fresh(std::move(task)); + } + return true; } diff --git a/src/actsim_agent/monitor.cpp b/src/actsim_agent/monitor.cpp index dbf7d8d..8a12e87 100644 --- a/src/actsim_agent/monitor.cpp +++ b/src/actsim_agent/monitor.cpp @@ -57,10 +57,6 @@ void Monitor::join() { this->monitor_thread->join(); } -void Monitor::stop_all() { - this->stop_all_.store(true, std::memory_order_relaxed); -} - void Monitor::thread_run() { using namespace std::chrono; @@ -74,6 +70,9 @@ void Monitor::thread_run() { } while (!all_workers_finished.load(std::memory_order_relaxed)) { + + DEBUG_PRINT("Monitor waking up to check on running tasks..."); + for (auto& worker : workers) { db::uuid_t task; @@ -90,10 +89,12 @@ void Monitor::thread_run() { } } - if (this->stop_all_.load(std::memory_order_relaxed)) { + if (this->interface.is_stop_immediate()) { for (auto& worker : workers) worker->cancel_current(); } sleep_for(milliseconds(STATUS_CHECK_INTERVAL_MS)); } + + DEBUG_PRINT("All workers are finished, monitor shutting down."); } diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp index 230df35..d26f8a7 100644 --- a/src/actsim_agent/task_interface.cpp +++ b/src/actsim_agent/task_interface.cpp @@ -25,7 +25,7 @@ #include "task_interface.hpp" -TaskInterface::TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size) : stop_flag(stop_flag) { +TaskInterface::TaskInterface(size_t buffer_size) { this->buffer_size = buffer_size; } @@ -64,6 +64,7 @@ std::unique_ptr TaskInterface::pop_fresh(bool& empty) { // if the task needs to be simulated by multiple workers, don't remove it from the queue if (task->task_type != TaskTypeType::MULTI_COV) { this->fresh_queue.pop(); + this->fresh_queue_full_condition.notify_one(); } } @@ -132,8 +133,11 @@ void TaskInterface::wait_for_cleanup_ready() { this->cleanup_ready_condition.wait(lock, [&] { return this->cleanup_ready.load(std::memory_order_seq_cst); }); } -void TaskInterface::notify_program_halt() { +void TaskInterface::notify_download_program_halt() { + this->fresh_queue_full_condition.notify_all(); +} + +void TaskInterface::notify_workers_program_halt() { this->finished_queue_empty_condition.notify_all(); this->fresh_queue_empty_condition.notify_all(); - this->fresh_queue_full_condition.notify_all(); } \ No newline at end of file diff --git a/src/actsim_agent/uploader.cpp b/src/actsim_agent/uploader.cpp index e5cfd47..233d4f6 100644 --- a/src/actsim_agent/uploader.cpp +++ b/src/actsim_agent/uploader.cpp @@ -23,6 +23,7 @@ ************************************************************************** */ +#include "util.h" #include "uploader.hpp" Uploader::Uploader(db::db_credentials_t& db_cred, TaskInterface& interface) : @@ -32,7 +33,7 @@ Uploader::Uploader(db::db_credentials_t& db_cred, TaskInterface& interface) : } void Uploader::start() { - std::cout << "REMOVE Upload thread started" << std::endl; + DEBUG_PRINT("Starting upload thread..."); this->uploader_thread = std::make_unique([this]() { thread_run(); }); } @@ -102,6 +103,6 @@ bool Uploader::upload_task([[maybe_unused]]std::unique_ptr task) { // make sure any task that is uploaded isn't halted in the database - std::cout << "Task uploaded!"; + std::cout << "[UPLOADER] Task uploaded!" << std::endl; return true; } diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index 76b7581..fb57e78 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -25,17 +25,18 @@ #include #include +#include "util.h" #include "worker.hpp" Worker::Worker(TaskInterface& interface) : interface(interface) {} void Worker::start() { - std::cout << "Worker started" << std::endl; + DEBUG_PRINT("Starting worker thread..."); this->worker_thread = std::make_unique([this] () { this->thread_run(); }); } void Worker::cancel_current() { - std::cout << "Current simulation cancelled." << std::endl; + std::cout << "[WORKER] Current simulation cancelled." << std::endl; // set a condition variable and notify // must not be blocking } @@ -74,8 +75,8 @@ void Worker::thread_run() { } bool Worker::perform_task(std::unique_ptr& task) { - usleep(100000); - std::cout << "Worker performed task. Please implement me!" << std::endl; + usleep(1000000); + std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl; task->uuid; return true;