fix threading, implement design dependency handling

This commit is contained in:
Fabian Posch 2024-01-11 17:02:33 -05:00
parent 67b6622c69
commit 6fe744fd76
8 changed files with 130 additions and 25 deletions

View file

@ -44,6 +44,8 @@ class Downloader {
void thread_run();
bool fetch_tasks(size_t n);
bool fetch_design(const db::uuid_t& id);
void reopen_task(const db::uuid_t& id);
std::unique_ptr<std::thread> downloader_thread;
std::unique_ptr<db::Connection> conn;

View file

@ -51,10 +51,12 @@ class TaskInterface {
void wait_for_finished();
void wait_for_buffer_consume();
void wait_for_cleanup_ready();
void wait_for_download_halt();
void notify_cleanup_ready();
void notify_download_program_halt();
void notify_workers_program_halt();
void notify_download_halt();
void push_fresh(std::unique_ptr<InputType> task);
std::unique_ptr<InputType> pop_fresh(bool& empty);
@ -62,14 +64,15 @@ class TaskInterface {
std::unique_ptr<OutputType> pop_finished(bool& empty);
size_t get_buffer_space();
bool search_and_increment(db::uuid_t id, std::string& design);
void decrement(db::uuid_t id);
void store(db::uuid_t, std::string& design);
bool increment_design(const db::uuid_t& id);
void decrement_design(const db::uuid_t& id);
std::string get_design(const db::uuid_t& id);
void store_design(const db::uuid_t&, std::string& design);
bool running() { return this->running_.load(std::memory_order_relaxed); };
bool is_stop_immediate() { return !this->immediate_stop.load(std::memory_order_relaxed); };
bool is_stop_immediate() { return this->immediate_stop.load(std::memory_order_relaxed); };
void stop() { this->running_.store(false, std::memory_order_relaxed); };
void stop_immediately() { this->immediate_stop.store(false, std::memory_order_relaxed); };
void stop_immediately() { this->immediate_stop.store(true, std::memory_order_relaxed); };
bool fresh_queue_empty();
bool finished_queue_empty();
@ -110,6 +113,10 @@ class TaskInterface {
// inform the download thread that data was consumed
std::mutex fresh_queue_full_mutex;
std::condition_variable fresh_queue_full_condition;
// inform the main thread that the program is ending
std::mutex download_halt_mutex;
std::condition_variable download_halt_condition;
};
#endif

View file

@ -46,11 +46,14 @@ class Worker {
private:
void thread_run();
std::unique_ptr<OutputType> perform_task(std::unique_ptr<InputType> task, bool& finished);
std::unique_ptr<OutputType> perform_task(std::unique_ptr<InputType>& task, bool& finished);
void run_actsim();
std::unique_ptr<std::thread> worker_thread;
std::atomic<db::uuid_t> current_task;
std::atomic_bool task_interrupted;
TaskInterface& interface;
};

View file

@ -91,7 +91,7 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
// this is where the threads do their thing
// anything past here means that the program was asked to close
download_thread->join();
interface.wait_for_download_halt();
// inform all threads about the program ending
interface.notify_workers_program_halt();
@ -114,7 +114,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
// now we can cleanup what was generated after stop was called
interface.notify_cleanup_ready();
// wait for the upload to finish
// wait for the net-threads to finish
download_thread->join();
upload_thread->join();
DEBUG_PRINT("Done.");

View file

@ -23,6 +23,7 @@
**************************************************************************
*/
#include <cluster/artifact.hpp>
#include "util.h"
#include "downloader.hpp"
@ -63,6 +64,20 @@ void Downloader::thread_run() {
this->fetch_tasks(this->interface.get_buffer_space());
}
// notify the control thread that download has halted
this->interface.notify_download_halt();
// wait for the workers to finish their thing
this->interface.wait_for_cleanup_ready();
// then reopen all tasks that were still left to do
bool empty = false;
while (!empty) {
auto task = this->interface.pop_fresh(empty);
if (empty) continue;
this->reopen_task(task->id);
}
}
bool Downloader::fetch_tasks(size_t n) {
@ -73,10 +88,32 @@ bool Downloader::fetch_tasks(size_t n) {
id = 1;
auto job_id = "deadbeef";
//for (size_t i = 0; i < n; ++i) {
// auto task = std::make_unique<Task>(id, job_id, TaskTypeType::SINGLE, false, db::uuid_t(), 2);
// this->interface.push_fresh(std::move(task));
//}
for (size_t i = 0; i < n; ++i) {
auto task = std::make_unique<pl::SimConfigArtifact>(id, id);
// see if we already have the desing locally; if not, load it
if (!this->interface.increment_design(task->design)) {
// if we could not load the design, reopen it in the database
if (!this->fetch_design(task->design)) {
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
this->reopen_task(task->id);
continue;
}
}
// push the task to the list of open tasks
this->interface.push_fresh(std::move(task));
}
return true;
}
bool Downloader::fetch_design(const db::uuid_t& id) {
return true;
}
void Downloader::reopen_task(const db::uuid_t& id) {
std::cout << "[DOWNLOADER] Reopening task!" << std::endl;
}

View file

@ -89,6 +89,7 @@ void Monitor::thread_run() {
}
if (this->interface.is_stop_immediate()) {
DEBUG_PRINT("Immediate stop requested, cancelling all tasks!");
for (auto& worker : workers) worker->cancel_current();
}

View file

@ -27,6 +27,8 @@
TaskInterface::TaskInterface(size_t buffer_size) {
this->buffer_size = buffer_size;
this->running_.store(true, std::memory_order_relaxed);
this->immediate_stop.store(false, std::memory_order_relaxed);
}
TaskInterface::~TaskInterface() {
@ -124,7 +126,7 @@ std::unique_ptr<OutputType> TaskInterface::pop_finished(bool& empty) {
return task;
}
bool TaskInterface::search_and_increment(db::uuid_t id, std::string& design) {
bool TaskInterface::increment_design(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->designs_mutex);
// make sure the requested design is in the list of available designs
@ -135,12 +137,10 @@ bool TaskInterface::search_and_increment(db::uuid_t id, std::string& design) {
// if so, increment its reference counters
++design_entry.first;
// set the design parameter to the actual design information
design = design_entry.second;
return true;
}
void TaskInterface::decrement(db::uuid_t id) {
void TaskInterface::decrement_design(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->designs_mutex);
// make sure the requested design is in the list of available designs
@ -158,7 +158,19 @@ void TaskInterface::decrement(db::uuid_t id) {
}
}
void TaskInterface::store(db::uuid_t id, std::string& design) {
std::string TaskInterface::get_design(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->designs_mutex);
// make sure the requested design is in the list of available designs
if (this->designs.find(id) == this->designs.end()) {
std::cerr << "Error: Design was somehow deleted before it could reach the execution stage. This should really never happen!" << std::endl;
return "";
}
return this->designs[id].second;
}
void TaskInterface::store_design(const db::uuid_t& id, std::string& design) {
std::lock_guard<std::mutex> lock (this->designs_mutex);
// make sure the design isn't already in the list of design entries
@ -188,6 +200,11 @@ void TaskInterface::wait_for_cleanup_ready() {
this->cleanup_ready_condition.wait(lock, [&] { return this->cleanup_ready.load(std::memory_order_seq_cst); });
}
void TaskInterface::wait_for_download_halt() {
std::unique_lock<std::mutex> lock(this->download_halt_mutex);
this->download_halt_condition.wait(lock, [&] { return !this->running(); });
}
void TaskInterface::notify_download_program_halt() {
this->fresh_queue_full_condition.notify_all();
}
@ -195,4 +212,8 @@ void TaskInterface::notify_download_program_halt() {
void TaskInterface::notify_workers_program_halt() {
this->finished_queue_empty_condition.notify_all();
this->fresh_queue_empty_condition.notify_all();
}
}
void TaskInterface::notify_download_halt() {
this->download_halt_condition.notify_all();
}

View file

@ -32,11 +32,13 @@ Worker::Worker(TaskInterface& interface) : interface(interface) {}
void Worker::start() {
DEBUG_PRINT("Starting worker thread...");
this->task_interrupted.store(false, std::memory_order_relaxed);
this->worker_thread = std::make_unique<std::thread>([this] () { this->thread_run(); });
}
void Worker::cancel_current() {
std::cout << "[WORKER] Current simulation cancelled." << std::endl;
this->task_interrupted.store(true, std::memory_order_relaxed);
// set a condition variable and notify
// must not be blocking
}
@ -63,25 +65,56 @@ void Worker::thread_run() {
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
// get the design this task uses; we'll need that later
auto design = task->design;
// everything is good, perform the given task
bool complete;
this->current_task.store(db::uuid_t(), std::memory_order_relaxed);
auto output = this->perform_task(std::move(task), complete);
auto output = this->perform_task(task, complete);
// testing code
complete = !this->task_interrupted.load(std::memory_order_relaxed);
this->task_interrupted.store(false, std::memory_order_relaxed);
// if the task was finished, push the task to be uploaded
// we need this since the task might have been interrupted
// half way though
if (complete) this->interface.push_finished(std::move(output));
if (complete) {
this->interface.push_finished(std::move(output));
// if this succeeded, we can decrease the number of
// tasks that require the design we needed for this task
this->interface.decrement_design(design);
} else {
// if the task was not completed and we have reached the end of execution
// the tasks have to be reopened in the database; the download thread
// will handle everything remaining in the buffer, so this is where our
// unfinished tasks goes back into
if (!this->interface.running()) {
this->interface.push_fresh(std::move(task));
} else {
// the only other reason this could have failed is that we got
// interrupted since the corresponding task was halted; in this case
// we only wanna decrease our reference counter
this->interface.decrement_design(task->design);
}
}
}
}
std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType> task, bool& finished) {
usleep(1000000);
std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& task, bool& finished) {
usleep(10000000);
std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl;
task->id;
auto output = std::make_unique<OutputType>();
return std::move(output);
finished = true;
return std::make_unique<OutputType>();
// wait for either the executing process to finish or
// for the condition variable indicating the process should be stopped
}
void Worker::run_actsim() {
}