From 5e7c0b6ebb86d32e5167ff04cae3642250264357 Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Wed, 10 Jan 2024 15:45:49 -0500 Subject: [PATCH] implement downloader and monitor --- include/actsim_agent/actsim_agent.hpp | 1 - include/actsim_agent/downloader.hpp | 55 ++++++++++++++ include/actsim_agent/monitor.hpp | 70 +++++++++++++++++ include/actsim_agent/task_interface.hpp | 7 +- src/actsim_agent/actsim_agent.cpp | 86 +++++++++++---------- src/actsim_agent/downloader.cpp | 72 ++++++++++++++++++ src/actsim_agent/monitor.cpp | 99 +++++++++++++++++++++++++ src/actsim_agent/task_interface.cpp | 15 +++- 8 files changed, 360 insertions(+), 45 deletions(-) create mode 100644 include/actsim_agent/downloader.hpp create mode 100644 include/actsim_agent/monitor.hpp create mode 100644 src/actsim_agent/downloader.cpp create mode 100644 src/actsim_agent/monitor.cpp diff --git a/include/actsim_agent/actsim_agent.hpp b/include/actsim_agent/actsim_agent.hpp index 23254fd..8e2c102 100644 --- a/include/actsim_agent/actsim_agent.hpp +++ b/include/actsim_agent/actsim_agent.hpp @@ -36,6 +36,5 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes); // private headers void sigint_handler(int signal); -bool task_halted(db::uuid_t task); #endif diff --git a/include/actsim_agent/downloader.hpp b/include/actsim_agent/downloader.hpp new file mode 100644 index 0000000..f329ec1 --- /dev/null +++ b/include/actsim_agent/downloader.hpp @@ -0,0 +1,55 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#ifndef __DOWNLOADER_H__ +#define __DOWNLOADER_H__ + +#include +#include +#include +#include "task_interface.hpp" + +class Downloader { + + public: + + Downloader(db::db_credentials_t db_cred, TaskInterface& interface); + + void start(); + void join(); + + private: + + void thread_run(); + bool fetch_tasks(size_t n); + + std::unique_ptr downloader_thread; + std::unique_ptr conn; + + TaskInterface& interface; + +}; + +#endif \ No newline at end of file diff --git a/include/actsim_agent/monitor.hpp b/include/actsim_agent/monitor.hpp new file mode 100644 index 0000000..9991004 --- /dev/null +++ b/include/actsim_agent/monitor.hpp @@ -0,0 +1,70 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#ifndef __MONITOR_H__ +#define __MONITOR_H__ + +#define STATUS_CHECK_INTERVAL_MS 1000 + +#include +#include +#include +#include +#include +#include "task_interface.hpp" +#include "worker.hpp" + +class Monitor { + + public: + + Monitor( + db::db_credentials_t db_cred, + std::vector>& workers, + TaskInterface& interface, + volatile std::atomic_bool& all_workers_finished + ); + + bool check_connection(); + void stop_all(); + void start(); + void join(); + + private: + + void thread_run(); + + std::unique_ptr monitor_thread; + std::unique_ptr conn; + + TaskInterface& interface; + + std::vector>& workers; + volatile std::atomic_bool& all_workers_finished; + volatile std::atomic_bool stop_all_ = std::atomic_bool(false); + +}; + +#endif \ No newline at end of file diff --git a/include/actsim_agent/task_interface.hpp b/include/actsim_agent/task_interface.hpp index ae8762b..71cddbd 100644 --- a/include/actsim_agent/task_interface.hpp +++ b/include/actsim_agent/task_interface.hpp @@ -41,6 +41,7 @@ class TaskInterface { void wait_for_fresh(); void wait_for_finished(); + void wait_for_buffer_consume(); void wait_for_cleanup_ready(); void notify_cleanup_ready(); @@ -50,7 +51,7 @@ class TaskInterface { std::unique_ptr pop_fresh(bool& empty); void push_finished(std::unique_ptr task); std::unique_ptr pop_finished(bool& empty); - bool fresh_buffer_full(); + size_t get_buffer_space(); bool running() { return !this->stop_flag.load(std::memory_order_relaxed); }; void stop() { this->stop_flag.store(false, std::memory_order_relaxed); }; @@ -89,6 +90,10 @@ class TaskInterface { // inform the upload thread that there is data in the fresh task queue std::mutex finished_queue_empty_mutex; std::condition_variable finished_queue_empty_condition; + + // inform the download thread that data was consumed + std::mutex fresh_queue_full_mutex; + std::condition_variable fresh_queue_full_condition; }; #endif diff --git a/src/actsim_agent/actsim_agent.cpp b/src/actsim_agent/actsim_agent.cpp index 3dfa715..e3dc87c 100644 --- a/src/actsim_agent/actsim_agent.cpp +++ b/src/actsim_agent/actsim_agent.cpp @@ -1,3 +1,4 @@ + /************************************************************************* * * This file is part of the ACT library @@ -31,12 +32,14 @@ #include "task_interface.hpp" #include "worker.hpp" #include "uploader.hpp" +#include "downloader.hpp" +#include "monitor.hpp" #include "util.h" #include "actsim_agent.hpp" // flag indicating program stop to the worker threads -volatile std::atomic_bool stop_requested(false); -volatile std::atomic_bool immediate_stop(false); +static volatile std::atomic_bool stop_requested(false); +static std::unique_ptr monitor; int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { @@ -45,70 +48,79 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { // inform the database about which database version we are implementing db_cred.version = DATABASE_VERSION; + // create the thread interface + auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER); + + // this will hold our worker threads once we spawn them + std::vector> workers; + + volatile std::atomic_bool all_workers_finished(false); + monitor = std::make_unique(db_cred, workers, interface, all_workers_finished); + // First we check that we can talk to the database. No reason to spin up if there is no one to talk to. - auto con = std::make_unique(db_cred); - if (!con->connect()) { - std::cerr << "Could not connect to database. Aborting." << std::endl; + if (!monitor->check_connection()) { + std::cerr << "Error: Could not connect to the database!" << std::endl; return 1; } // register the abort signal to stop the program std::signal(SIGINT, sigint_handler); - // create the thread interface - auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER); - - // now we create the worker threads which will host our worker processes - std::vector> workers; + DEBUG_PRINT("Start the upload thread..."); + + // start the upload thread + auto upload_thread = std::make_unique(db_cred, interface); + upload_thread->start(); DEBUG_PRINT("Starting workers..."); + // start the worker threads for (size_t i = 0; i < worker_processes; i++) { auto worker = std::make_unique(interface); worker->start(); workers.emplace_back(std::move(worker)); } - // Spawn the upload thread - auto upload_thread = std::make_unique(db_cred, interface); - upload_thread->start(); + // now we can start the monitor + monitor->start(); - // Run loop feeding the threads - while (true) { + DEBUG_PRINT("Starting download thread..."); - // if the program has been stopped, break the loop - if (stop_requested.load(std::memory_order_relaxed)) break; + // finally, start the download thread + auto download_thread = std::make_unique(db_cred, interface); + download_thread->start(); - //fill_queue(); - - // make sure the tasks the workers are performing haven't been halted - for (auto& worker : workers) { - if (task_halted(worker->get_current_task())) { - worker->cancel_current(); - } - } - } - - if (immediate_stop.load(std::memory_order_relaxed)) { - for (auto& worker : workers) { - worker->cancel_current(); - } - } + // this is where the threads do their thing + // anything past here means that the program was asked to close + + download_thread->join(); + // inform all threads about the program ending interface.notify_program_halt(); - DEBUG_PRINT("Control thread is finished, waiting for workers to stop..."); + DEBUG_PRINT("Program was closed, download thread stopped, waiting for workers to finish..."); // wait for the workers to finish for (auto& worker : workers) { worker->join(); } + DEBUG_PRINT("Worker threads shut down, closing the monitor..."); + + // all workers are closed, monitor shut down + all_workers_finished.store(true, std::memory_order_seq_cst); + monitor->join(); + + DEBUG_PRINT("Cleaning up remaining uploads..."); + + // now we can cleanup what was generated after stop was called interface.notify_cleanup_ready(); // wait for the upload to finish upload_thread->join(); + DEBUG_PRINT("Done."); + return 0; } @@ -117,9 +129,9 @@ void sigint_handler(int signal) { if (signal != SIGINT) return; if (stop_requested) { - immediate_stop.store(true, std::memory_order_relaxed); - std::cout << "Stopping all workers immediately and closing open tasks." << std::endl; + + monitor->stop_all(); } else { stop_requested.store(true, std::memory_order_relaxed); @@ -127,7 +139,3 @@ void sigint_handler(int signal) { std::cout << "To stop immediately instead, press Ctrl+C again." << std::endl; } } - -bool task_halted([[maybe_unused]]db::uuid_t task) { - return false; -} diff --git a/src/actsim_agent/downloader.cpp b/src/actsim_agent/downloader.cpp new file mode 100644 index 0000000..801511c --- /dev/null +++ b/src/actsim_agent/downloader.cpp @@ -0,0 +1,72 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#include "util.h" +#include "downloader.hpp" + +Downloader::Downloader(db::db_credentials_t db_cred, TaskInterface& interface) : + interface(interface) +{ + this->conn = std::make_unique(db_cred); +} + +void Downloader::start() { + std::cout << "REMOVE Download thread started" << std::endl; + this->downloader_thread = std::make_unique([this]() { thread_run(); }); +} + +void Downloader::join() { + if (this->downloader_thread == nullptr) return; + this->downloader_thread->join(); +} + +void Downloader::thread_run() { + + // connect to the database + if (!this->conn->connect()) { + std::cerr << "Error: Upload thread could not connect to the database!" << std::endl; + this->interface.stop(); + return; + } + + while (this->interface.running()) { + + // wait until there is more space in the buffer to fill + this->interface.wait_for_buffer_consume(); + + // make sure we weren't woken up because the program closed + if (!this->interface.running()) break; + + // if the download buffer is not full, fetch some more tasks + this->fetch_tasks(this->interface.get_buffer_space()); + + } +} + +bool Downloader::fetch_tasks(size_t n) { + DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks..."); + std::cout << "Downloading " << n << " tasks, implement me!" << std::endl; + return true; +} diff --git a/src/actsim_agent/monitor.cpp b/src/actsim_agent/monitor.cpp new file mode 100644 index 0000000..dbf7d8d --- /dev/null +++ b/src/actsim_agent/monitor.cpp @@ -0,0 +1,99 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#include +#include +#include +#include +#include "util.h" +#include "monitor.hpp" + +Monitor::Monitor( + db::db_credentials_t db_cred, + std::vector>& workers, + TaskInterface& interface, + volatile std::atomic_bool& all_workers_finished +) : + interface(interface), + workers(workers), + all_workers_finished(all_workers_finished) +{ + this->conn = std::make_unique(db_cred); +} + +bool Monitor::check_connection() { + return this->conn->connect(); +} + +void Monitor::start() { + DEBUG_PRINT("Starting monitor thread..."); + this->monitor_thread = std::make_unique([this] () { this->thread_run(); }); +} + +void Monitor::join() { + if (this->monitor_thread == nullptr) return; + this->monitor_thread->join(); +} + +void Monitor::stop_all() { + this->stop_all_.store(true, std::memory_order_relaxed); +} + +void Monitor::thread_run() { + + using namespace std::chrono; + using namespace std::this_thread; + + // make sure we are connected to the database + if (!this->conn->ensure_connected()) { + std::cerr << "Error: Monitor thread could not connect to the database!" << std::endl; + this->interface.stop(); + return; + } + + while (!all_workers_finished.load(std::memory_order_relaxed)) { + for (auto& worker : workers) { + + db::uuid_t task; + + // go through all workers and see if they are working on a task + if ((task = worker->get_current_task()) != 0) { + + // if the job they belong to was halted, cancel the task execution + if (this->conn->get_task_status(task) == JobStatusType::HALTED) { + DEBUG_PRINT("Task " + db::to_string(task) + " was halted, cancelling execution."); + worker->cancel_current(); + } + + } + } + + if (this->stop_all_.load(std::memory_order_relaxed)) { + for (auto& worker : workers) worker->cancel_current(); + } + + sleep_for(milliseconds(STATUS_CHECK_INTERVAL_MS)); + } +} diff --git a/src/actsim_agent/task_interface.cpp b/src/actsim_agent/task_interface.cpp index 4a00ca7..230df35 100644 --- a/src/actsim_agent/task_interface.cpp +++ b/src/actsim_agent/task_interface.cpp @@ -62,7 +62,7 @@ std::unique_ptr TaskInterface::pop_fresh(bool& empty) { task = std::move(this->fresh_queue.front()); // if the task needs to be simulated by multiple workers, don't remove it from the queue - if (task->get_task_type() != TaskTypeType::MULTI_COV) { + if (task->task_type != TaskTypeType::MULTI_COV) { this->fresh_queue.pop(); } } @@ -84,6 +84,13 @@ void TaskInterface::wait_for_finished() { this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); }); } +void TaskInterface::wait_for_buffer_consume() { + std::unique_lock lock (this->fresh_queue_full_mutex); + + // we will be notified either when there is new data or the program has been stopped + this->fresh_queue_full_condition.wait(lock, [&] { return this->get_buffer_space() > 0 || !running(); }); +} + void TaskInterface::push_finished(std::unique_ptr task) { std::lock_guard lock(this->finished_queue_mutex); this->finished_queue.push(std::move(task)); @@ -109,10 +116,9 @@ std::unique_ptr TaskInterface::pop_finished(bool& empty) { return task; } -bool TaskInterface::fresh_buffer_full() { +size_t TaskInterface::get_buffer_space() { std::lock_guard lock(this->fresh_queue_mutex); - if (this->fresh_queue.size() < this->buffer_size) return false; - return true; + return this->buffer_size - this->fresh_queue.size(); } void TaskInterface::notify_cleanup_ready() { @@ -129,4 +135,5 @@ void TaskInterface::wait_for_cleanup_ready() { void TaskInterface::notify_program_halt() { this->finished_queue_empty_condition.notify_all(); this->fresh_queue_empty_condition.notify_all(); + this->fresh_queue_full_condition.notify_all(); } \ No newline at end of file