implement downloader and monitor

This commit is contained in:
Fabian Posch 2024-01-10 15:45:49 -05:00
parent 2730d204f9
commit 5e7c0b6ebb
8 changed files with 360 additions and 45 deletions

View file

@ -36,6 +36,5 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes);
// private headers
void sigint_handler(int signal);
bool task_halted(db::uuid_t task);
#endif

View file

@ -0,0 +1,55 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DOWNLOADER_H__
#define __DOWNLOADER_H__
#include <thread>
#include <db_types.hpp>
#include <db_client.hpp>
#include "task_interface.hpp"
class Downloader {
public:
Downloader(db::db_credentials_t db_cred, TaskInterface& interface);
void start();
void join();
private:
void thread_run();
bool fetch_tasks(size_t n);
std::unique_ptr<std::thread> downloader_thread;
std::unique_ptr<db::Connection> conn;
TaskInterface& interface;
};
#endif

View file

@ -0,0 +1,70 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __MONITOR_H__
#define __MONITOR_H__
#define STATUS_CHECK_INTERVAL_MS 1000
#include <thread>
#include <vector>
#include <atomic>
#include <db_types.hpp>
#include <db_client.hpp>
#include "task_interface.hpp"
#include "worker.hpp"
class Monitor {
public:
Monitor(
db::db_credentials_t db_cred,
std::vector<std::unique_ptr<Worker>>& workers,
TaskInterface& interface,
volatile std::atomic_bool& all_workers_finished
);
bool check_connection();
void stop_all();
void start();
void join();
private:
void thread_run();
std::unique_ptr<std::thread> monitor_thread;
std::unique_ptr<db::Connection> conn;
TaskInterface& interface;
std::vector<std::unique_ptr<Worker>>& workers;
volatile std::atomic_bool& all_workers_finished;
volatile std::atomic_bool stop_all_ = std::atomic_bool(false);
};
#endif

View file

@ -41,6 +41,7 @@ class TaskInterface {
void wait_for_fresh();
void wait_for_finished();
void wait_for_buffer_consume();
void wait_for_cleanup_ready();
void notify_cleanup_ready();
@ -50,7 +51,7 @@ class TaskInterface {
std::unique_ptr<Task> pop_fresh(bool& empty);
void push_finished(std::unique_ptr<Task> task);
std::unique_ptr<Task> pop_finished(bool& empty);
bool fresh_buffer_full();
size_t get_buffer_space();
bool running() { return !this->stop_flag.load(std::memory_order_relaxed); };
void stop() { this->stop_flag.store(false, std::memory_order_relaxed); };
@ -89,6 +90,10 @@ class TaskInterface {
// inform the upload thread that there is data in the fresh task queue
std::mutex finished_queue_empty_mutex;
std::condition_variable finished_queue_empty_condition;
// inform the download thread that data was consumed
std::mutex fresh_queue_full_mutex;
std::condition_variable fresh_queue_full_condition;
};
#endif

View file

@ -1,3 +1,4 @@
/*************************************************************************
*
* This file is part of the ACT library
@ -31,12 +32,14 @@
#include "task_interface.hpp"
#include "worker.hpp"
#include "uploader.hpp"
#include "downloader.hpp"
#include "monitor.hpp"
#include "util.h"
#include "actsim_agent.hpp"
// flag indicating program stop to the worker threads
volatile std::atomic_bool stop_requested(false);
volatile std::atomic_bool immediate_stop(false);
static volatile std::atomic_bool stop_requested(false);
static std::unique_ptr<Monitor> monitor;
int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
@ -45,70 +48,79 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
// inform the database about which database version we are implementing
db_cred.version = DATABASE_VERSION;
// create the thread interface
auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER);
// this will hold our worker threads once we spawn them
std::vector<std::unique_ptr<Worker>> workers;
volatile std::atomic_bool all_workers_finished(false);
monitor = std::make_unique<Monitor>(db_cred, workers, interface, all_workers_finished);
// First we check that we can talk to the database. No reason to spin up if there is no one to talk to.
auto con = std::make_unique<db::Connection>(db_cred);
if (!con->connect()) {
std::cerr << "Could not connect to database. Aborting." << std::endl;
if (!monitor->check_connection()) {
std::cerr << "Error: Could not connect to the database!" << std::endl;
return 1;
}
// register the abort signal to stop the program
std::signal(SIGINT, sigint_handler);
// create the thread interface
auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER);
// now we create the worker threads which will host our worker processes
std::vector<std::unique_ptr<Worker>> workers;
DEBUG_PRINT("Start the upload thread...");
// start the upload thread
auto upload_thread = std::make_unique<Uploader>(db_cred, interface);
upload_thread->start();
DEBUG_PRINT("Starting workers...");
// start the worker threads
for (size_t i = 0; i < worker_processes; i++) {
auto worker = std::make_unique<Worker>(interface);
worker->start();
workers.emplace_back(std::move(worker));
}
// Spawn the upload thread
auto upload_thread = std::make_unique<Uploader>(db_cred, interface);
upload_thread->start();
// now we can start the monitor
monitor->start();
// Run loop feeding the threads
while (true) {
DEBUG_PRINT("Starting download thread...");
// if the program has been stopped, break the loop
if (stop_requested.load(std::memory_order_relaxed)) break;
// finally, start the download thread
auto download_thread = std::make_unique<Downloader>(db_cred, interface);
download_thread->start();
//fill_queue();
// make sure the tasks the workers are performing haven't been halted
for (auto& worker : workers) {
if (task_halted(worker->get_current_task())) {
worker->cancel_current();
}
}
}
if (immediate_stop.load(std::memory_order_relaxed)) {
for (auto& worker : workers) {
worker->cancel_current();
}
}
// this is where the threads do their thing
// anything past here means that the program was asked to close
download_thread->join();
// inform all threads about the program ending
interface.notify_program_halt();
DEBUG_PRINT("Control thread is finished, waiting for workers to stop...");
DEBUG_PRINT("Program was closed, download thread stopped, waiting for workers to finish...");
// wait for the workers to finish
for (auto& worker : workers) {
worker->join();
}
DEBUG_PRINT("Worker threads shut down, closing the monitor...");
// all workers are closed, monitor shut down
all_workers_finished.store(true, std::memory_order_seq_cst);
monitor->join();
DEBUG_PRINT("Cleaning up remaining uploads...");
// now we can cleanup what was generated after stop was called
interface.notify_cleanup_ready();
// wait for the upload to finish
upload_thread->join();
DEBUG_PRINT("Done.");
return 0;
}
@ -117,9 +129,9 @@ void sigint_handler(int signal) {
if (signal != SIGINT) return;
if (stop_requested) {
immediate_stop.store(true, std::memory_order_relaxed);
std::cout << "Stopping all workers immediately and closing open tasks." << std::endl;
monitor->stop_all();
} else {
stop_requested.store(true, std::memory_order_relaxed);
@ -127,7 +139,3 @@ void sigint_handler(int signal) {
std::cout << "To stop immediately instead, press Ctrl+C again." << std::endl;
}
}
bool task_halted([[maybe_unused]]db::uuid_t task) {
return false;
}

View file

@ -0,0 +1,72 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include "util.h"
#include "downloader.hpp"
Downloader::Downloader(db::db_credentials_t db_cred, TaskInterface& interface) :
interface(interface)
{
this->conn = std::make_unique<db::Connection>(db_cred);
}
void Downloader::start() {
std::cout << "REMOVE Download thread started" << std::endl;
this->downloader_thread = std::make_unique<std::thread>([this]() { thread_run(); });
}
void Downloader::join() {
if (this->downloader_thread == nullptr) return;
this->downloader_thread->join();
}
void Downloader::thread_run() {
// connect to the database
if (!this->conn->connect()) {
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
this->interface.stop();
return;
}
while (this->interface.running()) {
// wait until there is more space in the buffer to fill
this->interface.wait_for_buffer_consume();
// make sure we weren't woken up because the program closed
if (!this->interface.running()) break;
// if the download buffer is not full, fetch some more tasks
this->fetch_tasks(this->interface.get_buffer_space());
}
}
bool Downloader::fetch_tasks(size_t n) {
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
std::cout << "Downloading " << n << " tasks, implement me!" << std::endl;
return true;
}

View file

@ -0,0 +1,99 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include <db_types.hpp>
#include <db_client.hpp>
#include <task.hpp>
#include <chrono>
#include "util.h"
#include "monitor.hpp"
Monitor::Monitor(
db::db_credentials_t db_cred,
std::vector<std::unique_ptr<Worker>>& workers,
TaskInterface& interface,
volatile std::atomic_bool& all_workers_finished
) :
interface(interface),
workers(workers),
all_workers_finished(all_workers_finished)
{
this->conn = std::make_unique<db::Connection>(db_cred);
}
bool Monitor::check_connection() {
return this->conn->connect();
}
void Monitor::start() {
DEBUG_PRINT("Starting monitor thread...");
this->monitor_thread = std::make_unique<std::thread>([this] () { this->thread_run(); });
}
void Monitor::join() {
if (this->monitor_thread == nullptr) return;
this->monitor_thread->join();
}
void Monitor::stop_all() {
this->stop_all_.store(true, std::memory_order_relaxed);
}
void Monitor::thread_run() {
using namespace std::chrono;
using namespace std::this_thread;
// make sure we are connected to the database
if (!this->conn->ensure_connected()) {
std::cerr << "Error: Monitor thread could not connect to the database!" << std::endl;
this->interface.stop();
return;
}
while (!all_workers_finished.load(std::memory_order_relaxed)) {
for (auto& worker : workers) {
db::uuid_t task;
// go through all workers and see if they are working on a task
if ((task = worker->get_current_task()) != 0) {
// if the job they belong to was halted, cancel the task execution
if (this->conn->get_task_status(task) == JobStatusType::HALTED) {
DEBUG_PRINT("Task " + db::to_string(task) + " was halted, cancelling execution.");
worker->cancel_current();
}
}
}
if (this->stop_all_.load(std::memory_order_relaxed)) {
for (auto& worker : workers) worker->cancel_current();
}
sleep_for(milliseconds(STATUS_CHECK_INTERVAL_MS));
}
}

View file

@ -62,7 +62,7 @@ std::unique_ptr<Task> TaskInterface::pop_fresh(bool& empty) {
task = std::move(this->fresh_queue.front());
// if the task needs to be simulated by multiple workers, don't remove it from the queue
if (task->get_task_type() != TaskTypeType::MULTI_COV) {
if (task->task_type != TaskTypeType::MULTI_COV) {
this->fresh_queue.pop();
}
}
@ -84,6 +84,13 @@ void TaskInterface::wait_for_finished() {
this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); });
}
void TaskInterface::wait_for_buffer_consume() {
std::unique_lock<std::mutex> lock (this->fresh_queue_full_mutex);
// we will be notified either when there is new data or the program has been stopped
this->fresh_queue_full_condition.wait(lock, [&] { return this->get_buffer_space() > 0 || !running(); });
}
void TaskInterface::push_finished(std::unique_ptr<Task> task) {
std::lock_guard<std::mutex> lock(this->finished_queue_mutex);
this->finished_queue.push(std::move(task));
@ -109,10 +116,9 @@ std::unique_ptr<Task> TaskInterface::pop_finished(bool& empty) {
return task;
}
bool TaskInterface::fresh_buffer_full() {
size_t TaskInterface::get_buffer_space() {
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
if (this->fresh_queue.size() < this->buffer_size) return false;
return true;
return this->buffer_size - this->fresh_queue.size();
}
void TaskInterface::notify_cleanup_ready() {
@ -129,4 +135,5 @@ void TaskInterface::wait_for_cleanup_ready() {
void TaskInterface::notify_program_halt() {
this->finished_queue_empty_condition.notify_all();
this->fresh_queue_empty_condition.notify_all();
this->fresh_queue_full_condition.notify_all();
}