implement threading interface and worker loop

This commit is contained in:
Fabian Posch 2024-01-09 17:12:31 -05:00
parent 03e8eeb305
commit 8ef69140d0
6 changed files with 278 additions and 17 deletions

View file

@ -30,8 +30,13 @@
#define DATABASE_VERSION 2 #define DATABASE_VERSION 2
#define DOWNLOAD_BUFFER 5
int start_agent(db::db_credentials_t db_cred, size_t worker_processes); int start_agent(db::db_credentials_t db_cred, size_t worker_processes);
// private headers
void sigint_handler(int signal); void sigint_handler(int signal);
void run_upload(db::db_credentials_t db_cred);
bool task_halted(db::uuid_t task);
#endif #endif

View file

@ -0,0 +1,93 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __TASK_INTERFACE__
#define __TASK_INTERFACE__
#include <queue>
#include <unordered_map>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include "task.hpp"
class TaskInterface {
public:
TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size);
void wait_for_fresh();
void wait_for_finished();
void wait_for_cleanup_ready();
void notify_cleanup_ready();
void notify_program_halt();
void push_fresh(std::unique_ptr<Task> task);
std::unique_ptr<Task> pop_fresh(bool& empty);
void push_finished(std::unique_ptr<Task> task);
std::unique_ptr<Task> pop_finished(bool& empty);
bool fresh_buffer_full();
bool running() { return !this->stop_flag.load(std::memory_order_relaxed); };
private:
bool fresh_queue_empty();
bool finished_queue_empty();
size_t buffer_size;
std::queue<std::unique_ptr<Task>> fresh_queue;
std::queue<std::unique_ptr<Task>> finished_queue;
volatile std::atomic_bool& stop_flag;
std::unordered_map<db::uuid_t, std::pair<size_t, std::string>> designs;
////// Mutexes //////
// access to task queues
std::mutex fresh_queue_mutex, finished_queue_mutex;
// design map access
std::mutex designs_mutex;
// notify upload thread that the finished queue is ready for cleanup
std::atomic_bool cleanup_ready;
std::mutex cleanup_ready_mutex;
std::condition_variable cleanup_ready_condition;
// inform the worker threads that there is data in the fresh task queue
std::mutex fresh_queue_empty_mutex;
std::condition_variable fresh_queue_empty_condition;
// inform the upload thread that there is data in the fresh task queue
std::mutex finished_queue_empty_mutex;
std::condition_variable finished_queue_empty_condition;
};
#endif

View file

@ -29,12 +29,13 @@
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <db_types.hpp> #include <db_types.hpp>
#include "task_interface.hpp"
class Worker { class Worker {
public: public:
Worker(volatile std::atomic_bool& stop_flag); Worker(TaskInterface& interface);
void start(); void start();
void cancel_current(); void cancel_current();
@ -45,10 +46,12 @@ class Worker {
private: private:
void thread_run(); void thread_run();
void perform_task(std::unique_ptr<Task>& task, bool& success);
std::unique_ptr<std::thread> worker_thread; std::unique_ptr<std::thread> worker_thread;
std::atomic<db::uuid_t> current_task; std::atomic<db::uuid_t> current_task;
volatile std::atomic_bool& stop_flag;
TaskInterface& interface;
}; };
#endif #endif

View file

@ -28,6 +28,7 @@
#include <atomic> #include <atomic>
#include <db_types.hpp> #include <db_types.hpp>
#include <db_client.hpp> #include <db_client.hpp>
#include "task_interface.hpp"
#include "worker.hpp" #include "worker.hpp"
#include "util.h" #include "util.h"
#include "actsim_agent.hpp" #include "actsim_agent.hpp"
@ -40,9 +41,6 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
DEBUG_PRINT("Agent asked to start with " + std::to_string(worker_processes) + " worker threads."); DEBUG_PRINT("Agent asked to start with " + std::to_string(worker_processes) + " worker threads.");
// this will hold the ID of the task a given thread is currently working on
db::uuid_t worker_tasks[worker_processes];
// inform the database about which database version we are implementing // inform the database about which database version we are implementing
db_cred.version = DATABASE_VERSION; db_cred.version = DATABASE_VERSION;
@ -56,22 +54,29 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
// register the abort signal to stop the program // register the abort signal to stop the program
std::signal(SIGINT, sigint_handler); std::signal(SIGINT, sigint_handler);
// create the thread interface
auto interface = TaskInterface(stop_requested, DOWNLOAD_BUFFER);
// now we create the worker threads which will host our worker processes // now we create the worker threads which will host our worker processes
std::vector<std::unique_ptr<Worker>> workers; std::vector<std::unique_ptr<Worker>> workers;
DEBUG_PRINT("Starting workers..."); DEBUG_PRINT("Starting workers...");
for (size_t i = 0; i < worker_processes; i++) { for (size_t i = 0; i < worker_processes; i++) {
auto worker = std::make_unique<Worker>(stop_requested); auto worker = std::make_unique<Worker>(interface);
worker->start(); worker->start();
workers.emplace_back(std::move(worker)); workers.emplace_back(std::move(worker));
} }
// Spawn the upload thread // Spawn the upload thread
auto upload_thread = std::make_unique<std::thread>(run_upload); auto upload_thread = std::make_unique<std::thread>(run_upload, db_cred);
// Run loop feeding the threads // Run loop feeding the threads
while (!stop_requested.load(std::memory_order_relaxed)) { while (true) {
// if the program has been stopped, break the loop
if (stop_requested.load(std::memory_order_relaxed)) break;
//fill_queue(); //fill_queue();
// make sure the tasks the workers are performing haven't been halted // make sure the tasks the workers are performing haven't been halted
@ -88,6 +93,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
} }
} }
interface.notify_program_halt();
DEBUG_PRINT("Control thread is finished, waiting for workers to stop..."); DEBUG_PRINT("Control thread is finished, waiting for workers to stop...");
// wait for the workers to finish // wait for the workers to finish
@ -117,10 +124,10 @@ void sigint_handler(int signal) {
} }
} }
bool task_halted(db::uuid_t task) { bool task_halted([[maybe_unused]]db::uuid_t task) {
return false; return false;
} }
void run_upload() { void run_upload([[maybe_unused]]db::db_credentials_t db_cred) {
} }

View file

@ -0,0 +1,132 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include "task_interface.hpp"
TaskInterface::TaskInterface(volatile std::atomic_bool& stop_flag, size_t buffer_size) : stop_flag(stop_flag) {
this->buffer_size = buffer_size;
}
void TaskInterface::push_fresh(std::unique_ptr<Task> task) {
// lock the queue and insert into it
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
this->fresh_queue.push(std::move(task));
// we put one task in there so we notify one worker thread
this->fresh_queue_empty_condition.notify_one();
}
bool TaskInterface::fresh_queue_empty() {
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
return this->fresh_queue.empty();
}
bool TaskInterface::finished_queue_empty() {
std::lock_guard<std::mutex> lock(this->finished_queue_mutex);
return this->finished_queue.empty();
}
std::unique_ptr<Task> TaskInterface::pop_fresh(bool& empty) {
// we first need exclusive access to the queue
std::lock_guard<std::mutex> lock (this->fresh_queue_mutex);
// we have to make sure the queue wasn't emptied since the last empty call was made
// or at least inform the calling thread that the queue is currently empty
std::unique_ptr<Task> task;
if (!(empty = this->fresh_queue.empty())) {
task = std::move(this->fresh_queue.front());
// if the task needs to be simulated by multiple workers, don't remove it from the queue
if (task->get_task_type() != TaskTypeType::MULTI_COV) {
this->fresh_queue.pop();
}
}
return task;
}
void TaskInterface::wait_for_fresh() {
std::unique_lock<std::mutex> lock (this->fresh_queue_empty_mutex);
// we will be notified either when there is new data or the program has been stopped
this->fresh_queue_empty_condition.wait(lock, [&] { return !this->fresh_queue_empty() || !running(); });
}
void TaskInterface::wait_for_finished() {
std::unique_lock<std::mutex> lock (this->finished_queue_empty_mutex);
// we will be notified either when there is new data or the program has been stopped
this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); });
}
void TaskInterface::push_finished(std::unique_ptr<Task> task) {
std::lock_guard<std::mutex> lock(this->finished_queue_mutex);
this->finished_queue.push(std::move(task));
// there is new data, inform the upload thread
this->finished_queue_empty_condition.notify_one();
}
std::unique_ptr<Task> TaskInterface::pop_finished(bool& empty) {
// we first need exclusive access to the queue
std::lock_guard<std::mutex> lock (this->finished_queue_mutex);
// we have to make sure the queue wasn't emptied since the last empty call was made
// or at least inform the calling thread that the queue is currently empty
std::unique_ptr<Task> task;
if (!(empty = this->finished_queue.empty())) {
auto task = std::move(this->finished_queue.front());
this->finished_queue.pop();
}
return task;
}
bool TaskInterface::fresh_buffer_full() {
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
if (this->fresh_queue.size() < this->buffer_size) return false;
return true;
}
void TaskInterface::notify_cleanup_ready() {
this->cleanup_ready.store(true, std::memory_order_seq_cst);
this->cleanup_ready_condition.notify_all();
}
void TaskInterface::wait_for_cleanup_ready() {
// lock the thread and wait to be notified
std::unique_lock<std::mutex> lock(this->cleanup_ready_mutex);
this->cleanup_ready_condition.wait(lock, [&] { return this->cleanup_ready.load(std::memory_order_seq_cst); });
}
void TaskInterface::notify_program_halt() {
this->finished_queue_empty_condition.notify_all();
this->fresh_queue_empty_condition.notify_all();
}

View file

@ -27,7 +27,7 @@
#include <unistd.h> #include <unistd.h>
#include "worker.hpp" #include "worker.hpp"
Worker::Worker(volatile std::atomic_bool& stop_flag) : stop_flag(stop_flag) {} Worker::Worker(TaskInterface& interface) : interface(interface) {}
void Worker::start() { void Worker::start() {
std::cout << "Worker started" << std::endl; std::cout << "Worker started" << std::endl;
@ -43,12 +43,33 @@ void Worker::join() {
} }
void Worker::thread_run() { void Worker::thread_run() {
while (true) { while (this->interface.running()) {
std::cout << "Worker ping" << std::endl;
if (this->stop_flag.load(std::memory_order_relaxed)) {
break;
}
usleep(100000); // this blocks until either a new task is available or the program was closed
this->interface.wait_for_fresh();
// so first we check if we should still be running
if (!this->interface.running()) break;
// we're still good to go! get a task from the fresh queue
bool queue_empty;
auto task = this->interface.pop_fresh(queue_empty);
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
// everything is good, perform the given task
bool success;
this->perform_task(task, success);
// if everything worked, push the task to be uploaded
this->interface.push_finished(std::move(task));
} }
} }
void Worker::perform_task(std::unique_ptr<Task>& task, bool& success) {
std::cout << "Worker ping" << std::endl;
usleep(100000);
success = true;
task->get_uuid();
}