From 4a7993cf353ebffacf5e26a7fa56e5b2277871e7 Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Mon, 8 Jan 2024 17:11:58 -0500 Subject: [PATCH] implement worker threads and download thread skelleton --- include/actsim_agent/actsim_agent.hpp | 5 ++ src/actsim_agent/actsim_agent.cpp | 99 ++++++++++++++++++++++++++- 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/include/actsim_agent/actsim_agent.hpp b/include/actsim_agent/actsim_agent.hpp index 658ca1b..e919477 100644 --- a/include/actsim_agent/actsim_agent.hpp +++ b/include/actsim_agent/actsim_agent.hpp @@ -1,3 +1,4 @@ + /************************************************************************* * * This file is part of the ACT library @@ -27,6 +28,10 @@ #include +#define DATABASE_VERSION 2 + int start_agent(db::db_credentials_t db_cred, size_t worker_processes); +void sigint_handler(int signal); + #endif diff --git a/src/actsim_agent/actsim_agent.cpp b/src/actsim_agent/actsim_agent.cpp index 4618cc9..45790af 100644 --- a/src/actsim_agent/actsim_agent.cpp +++ b/src/actsim_agent/actsim_agent.cpp @@ -23,9 +23,104 @@ */ #include +#include +#include +#include +#include +#include +#include "worker.hpp" +#include "util.h" #include "actsim_agent.hpp" +// flag indicating program stop to the worker threads +volatile std::atomic_bool stop_requested(false); +volatile std::atomic_bool immediate_stop(false); + int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { - std::cout << "Hello world from the agent." << std::endl; + + DEBUG_PRINT("Agent asked to start with " + std::to_string(worker_processes) + " worker threads."); + + // this will hold the ID of the task a given thread is currently working on + db::uuid_t worker_tasks[worker_processes]; + + // inform the database about which database version we are implementing + db_cred.version = DATABASE_VERSION; + + // First we check that we can talk to the database. No reason to spin up if there is no one to talk to. + auto con = std::make_unique(db_cred); + if (!con->connect()) { + std::cerr << "Could not connect to database. Aborting." << std::endl; + return 1; + } + + // register the abort signal to stop the program + std::signal(SIGINT, sigint_handler); + + // now we create the worker threads which will host our worker processes + std::vector> workers; + + DEBUG_PRINT("Starting workers..."); + + for (size_t i = 0; i < worker_processes; i++) { + auto worker = std::make_unique(stop_requested); + worker->start(); + workers.emplace_back(std::move(worker)); + } + + // Spawn the upload thread + auto upload_thread = std::make_unique(run_upload); + + // Run loop feeding the threads + while (!stop_requested.load(std::memory_order_relaxed)) { + //fill_queue(); + + // make sure the tasks the workers are performing haven't been halted + for (auto& worker : workers) { + if (task_halted(worker->get_current_task())) { + worker->cancel_current(); + } + } + } + + if (immediate_stop.load(std::memory_order_relaxed)) { + for (auto& worker : workers) { + worker->cancel_current(); + } + } + + DEBUG_PRINT("Control thread is finished, waiting for workers to stop..."); + + // wait for the workers to finish + for (auto& worker : workers) { + worker->join(); + } + + // wait for the upload to finish + upload_thread->join(); + return 0; -} \ No newline at end of file +} + +void sigint_handler(int signal) { + + if (signal != SIGINT) return; + + if (stop_requested) { + immediate_stop.store(true, std::memory_order_relaxed); + + std::cout << "Stopping all workers immediately and closing open tasks." << std::endl; + } else { + stop_requested.store(true, std::memory_order_relaxed); + + std::cout << "Finishing all running simulations, then closing the agent." << std::endl; + std::cout << "To stop immediately instead, press Ctrl+C again." << std::endl; + } +} + +bool task_halted(db::uuid_t task) { + return false; +} + +void run_upload() { + +}