threading works, using full interface as threading control now

This commit is contained in:
Fabian Posch 2024-01-10 17:07:38 -05:00
parent 5e7c0b6ebb
commit feb36c2eb6
8 changed files with 52 additions and 34 deletions

View file

@ -48,7 +48,6 @@ class Monitor {
);
bool check_connection();
void stop_all();
void start();
void join();
@ -63,7 +62,6 @@ class Monitor {
std::vector<std::unique_ptr<Worker>>& workers;
volatile std::atomic_bool& all_workers_finished;
volatile std::atomic_bool stop_all_ = std::atomic_bool(false);
};

View file

@ -37,7 +37,7 @@ class TaskInterface {
public:
TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size);
TaskInterface(size_t buffer_size);
void wait_for_fresh();
void wait_for_finished();
@ -45,7 +45,8 @@ class TaskInterface {
void wait_for_cleanup_ready();
void notify_cleanup_ready();
void notify_program_halt();
void notify_download_program_halt();
void notify_workers_program_halt();
void push_fresh(std::unique_ptr<Task> task);
std::unique_ptr<Task> pop_fresh(bool& empty);
@ -53,8 +54,10 @@ class TaskInterface {
std::unique_ptr<Task> pop_finished(bool& empty);
size_t get_buffer_space();
bool running() { return !this->stop_flag.load(std::memory_order_relaxed); };
void stop() { this->stop_flag.store(false, std::memory_order_relaxed); };
bool running() { return this->running_.load(std::memory_order_relaxed); };
bool is_stop_immediate() { return !this->immediate_stop.load(std::memory_order_relaxed); };
void stop() { this->running_.store(false, std::memory_order_relaxed); };
void stop_immediately() { this->immediate_stop.store(false, std::memory_order_relaxed); };
bool fresh_queue_empty();
bool finished_queue_empty();
@ -66,7 +69,8 @@ class TaskInterface {
std::queue<std::unique_ptr<Task>> fresh_queue;
std::queue<std::unique_ptr<Task>> finished_queue;
volatile std::atomic_bool& stop_flag;
volatile std::atomic_bool running_ = std::atomic_bool(true);
volatile std::atomic_bool immediate_stop;
std::unordered_map<db::uuid_t, std::pair<size_t, std::string>> designs;

View file

@ -37,9 +37,8 @@
#include "util.h"
#include "actsim_agent.hpp"
// flag indicating program stop to the worker threads
static volatile std::atomic_bool stop_requested(false);
static std::unique_ptr<Monitor> monitor;
// threading safe interface
static TaskInterface interface(DOWNLOAD_BUFFER);
int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
@ -48,14 +47,11 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
// inform the database about which database version we are implementing
db_cred.version = DATABASE_VERSION;
// create the thread interface
auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER);
// this will hold our worker threads once we spawn them
std::vector<std::unique_ptr<Worker>> workers;
volatile std::atomic_bool all_workers_finished(false);
monitor = std::make_unique<Monitor>(db_cred, workers, interface, all_workers_finished);
auto monitor = std::make_unique<Monitor>(db_cred, workers, interface, all_workers_finished);
// First we check that we can talk to the database. No reason to spin up if there is no one to talk to.
if (!monitor->check_connection()) {
@ -90,13 +86,15 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
auto download_thread = std::make_unique<Downloader>(db_cred, interface);
download_thread->start();
DEBUG_PRINT("Agent is running!");
// this is where the threads do their thing
// anything past here means that the program was asked to close
download_thread->join();
// inform all threads about the program ending
interface.notify_program_halt();
interface.notify_workers_program_halt();
DEBUG_PRINT("Program was closed, download thread stopped, waiting for workers to finish...");
@ -128,12 +126,13 @@ void sigint_handler(int signal) {
if (signal != SIGINT) return;
if (stop_requested) {
if (!interface.running()) {
std::cout << "Stopping all workers immediately and closing open tasks." << std::endl;
monitor->stop_all();
interface.stop_immediately();
} else {
stop_requested.store(true, std::memory_order_relaxed);
interface.stop();
interface.notify_download_program_halt();
std::cout << "Finishing all running simulations, then closing the agent." << std::endl;
std::cout << "To stop immediately instead, press Ctrl+C again." << std::endl;

View file

@ -33,7 +33,7 @@ Downloader::Downloader(db::db_credentials_t db_cred, TaskInterface& interface) :
}
void Downloader::start() {
std::cout << "REMOVE Download thread started" << std::endl;
DEBUG_PRINT("Starting downloader thread...");
this->downloader_thread = std::make_unique<std::thread>([this]() { thread_run(); });
}
@ -67,6 +67,16 @@ void Downloader::thread_run() {
bool Downloader::fetch_tasks(size_t n) {
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
std::cout << "Downloading " << n << " tasks, implement me!" << std::endl;
std::cout << "[DOWNLOAGER] Downloading " << n << " tasks, implement me!" << std::endl;
auto id = db::uuid_t();
id = 1;
auto job_id = "deadbeef";
for (size_t i = 0; i < n; ++i) {
auto task = std::make_unique<Task>(id, job_id, TaskTypeType::SINGLE, false, db::uuid_t(), 2);
this->interface.push_fresh(std::move(task));
}
return true;
}

View file

@ -57,10 +57,6 @@ void Monitor::join() {
this->monitor_thread->join();
}
void Monitor::stop_all() {
this->stop_all_.store(true, std::memory_order_relaxed);
}
void Monitor::thread_run() {
using namespace std::chrono;
@ -74,6 +70,9 @@ void Monitor::thread_run() {
}
while (!all_workers_finished.load(std::memory_order_relaxed)) {
DEBUG_PRINT("Monitor waking up to check on running tasks...");
for (auto& worker : workers) {
db::uuid_t task;
@ -90,10 +89,12 @@ void Monitor::thread_run() {
}
}
if (this->stop_all_.load(std::memory_order_relaxed)) {
if (this->interface.is_stop_immediate()) {
for (auto& worker : workers) worker->cancel_current();
}
sleep_for(milliseconds(STATUS_CHECK_INTERVAL_MS));
}
DEBUG_PRINT("All workers are finished, monitor shutting down.");
}

View file

@ -25,7 +25,7 @@
#include "task_interface.hpp"
TaskInterface::TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size) : stop_flag(stop_flag) {
TaskInterface::TaskInterface(size_t buffer_size) {
this->buffer_size = buffer_size;
}
@ -64,6 +64,7 @@ std::unique_ptr<Task> TaskInterface::pop_fresh(bool& empty) {
// if the task needs to be simulated by multiple workers, don't remove it from the queue
if (task->task_type != TaskTypeType::MULTI_COV) {
this->fresh_queue.pop();
this->fresh_queue_full_condition.notify_one();
}
}
@ -132,8 +133,11 @@ void TaskInterface::wait_for_cleanup_ready() {
this->cleanup_ready_condition.wait(lock, [&] { return this->cleanup_ready.load(std::memory_order_seq_cst); });
}
void TaskInterface::notify_program_halt() {
void TaskInterface::notify_download_program_halt() {
this->fresh_queue_full_condition.notify_all();
}
void TaskInterface::notify_workers_program_halt() {
this->finished_queue_empty_condition.notify_all();
this->fresh_queue_empty_condition.notify_all();
this->fresh_queue_full_condition.notify_all();
}

View file

@ -23,6 +23,7 @@
**************************************************************************
*/
#include "util.h"
#include "uploader.hpp"
Uploader::Uploader(db::db_credentials_t& db_cred, TaskInterface& interface) :
@ -32,7 +33,7 @@ Uploader::Uploader(db::db_credentials_t& db_cred, TaskInterface& interface) :
}
void Uploader::start() {
std::cout << "REMOVE Upload thread started" << std::endl;
DEBUG_PRINT("Starting upload thread...");
this->uploader_thread = std::make_unique<std::thread>([this]() { thread_run(); });
}
@ -102,6 +103,6 @@ bool Uploader::upload_task([[maybe_unused]]std::unique_ptr<Task> task) {
// make sure any task that is uploaded isn't halted in the database
std::cout << "Task uploaded!";
std::cout << "[UPLOADER] Task uploaded!" << std::endl;
return true;
}

View file

@ -25,17 +25,18 @@
#include <iostream>
#include <unistd.h>
#include "util.h"
#include "worker.hpp"
Worker::Worker(TaskInterface& interface) : interface(interface) {}
void Worker::start() {
std::cout << "Worker started" << std::endl;
DEBUG_PRINT("Starting worker thread...");
this->worker_thread = std::make_unique<std::thread>([this] () { this->thread_run(); });
}
void Worker::cancel_current() {
std::cout << "Current simulation cancelled." << std::endl;
std::cout << "[WORKER] Current simulation cancelled." << std::endl;
// set a condition variable and notify
// must not be blocking
}
@ -74,8 +75,8 @@ void Worker::thread_run() {
}
bool Worker::perform_task(std::unique_ptr<Task>& task) {
usleep(100000);
std::cout << "Worker performed task. Please implement me!" << std::endl;
usleep(1000000);
std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl;
task->uuid;
return true;