implement worker threads and download thread skelleton

This commit is contained in:
Fabian Posch 2024-01-08 17:11:58 -05:00
parent 9809021551
commit 4a7993cf35
2 changed files with 102 additions and 2 deletions

View file

@ -1,3 +1,4 @@
/************************************************************************* /*************************************************************************
* *
* This file is part of the ACT library * This file is part of the ACT library
@ -27,6 +28,10 @@
#include <db_types.hpp> #include <db_types.hpp>
#define DATABASE_VERSION 2
int start_agent(db::db_credentials_t db_cred, size_t worker_processes); int start_agent(db::db_credentials_t db_cred, size_t worker_processes);
void sigint_handler(int signal);
#endif #endif

View file

@ -23,9 +23,104 @@
*/ */
#include <iostream> #include <iostream>
#include <thread>
#include <csignal>
#include <atomic>
#include <db_types.hpp>
#include <db_client.hpp>
#include "worker.hpp"
#include "util.h"
#include "actsim_agent.hpp" #include "actsim_agent.hpp"
// flag indicating program stop to the worker threads
volatile std::atomic_bool stop_requested(false);
volatile std::atomic_bool immediate_stop(false);
int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
std::cout << "Hello world from the agent." << std::endl;
DEBUG_PRINT("Agent asked to start with " + std::to_string(worker_processes) + " worker threads.");
// this will hold the ID of the task a given thread is currently working on
db::uuid_t worker_tasks[worker_processes];
// inform the database about which database version we are implementing
db_cred.version = DATABASE_VERSION;
// First we check that we can talk to the database. No reason to spin up if there is no one to talk to.
auto con = std::make_unique<db::Connection>(db_cred);
if (!con->connect()) {
std::cerr << "Could not connect to database. Aborting." << std::endl;
return 1;
}
// register the abort signal to stop the program
std::signal(SIGINT, sigint_handler);
// now we create the worker threads which will host our worker processes
std::vector<std::unique_ptr<Worker>> workers;
DEBUG_PRINT("Starting workers...");
for (size_t i = 0; i < worker_processes; i++) {
auto worker = std::make_unique<Worker>(stop_requested);
worker->start();
workers.emplace_back(std::move(worker));
}
// Spawn the upload thread
auto upload_thread = std::make_unique<std::thread>(run_upload);
// Run loop feeding the threads
while (!stop_requested.load(std::memory_order_relaxed)) {
//fill_queue();
// make sure the tasks the workers are performing haven't been halted
for (auto& worker : workers) {
if (task_halted(worker->get_current_task())) {
worker->cancel_current();
}
}
}
if (immediate_stop.load(std::memory_order_relaxed)) {
for (auto& worker : workers) {
worker->cancel_current();
}
}
DEBUG_PRINT("Control thread is finished, waiting for workers to stop...");
// wait for the workers to finish
for (auto& worker : workers) {
worker->join();
}
// wait for the upload to finish
upload_thread->join();
return 0; return 0;
} }
void sigint_handler(int signal) {
if (signal != SIGINT) return;
if (stop_requested) {
immediate_stop.store(true, std::memory_order_relaxed);
std::cout << "Stopping all workers immediately and closing open tasks." << std::endl;
} else {
stop_requested.store(true, std::memory_order_relaxed);
std::cout << "Finishing all running simulations, then closing the agent." << std::endl;
std::cout << "To stop immediately instead, press Ctrl+C again." << std::endl;
}
}
bool task_halted(db::uuid_t task) {
return false;
}
void run_upload() {
}