/************************************************************************* * * This file is part of the ACT library * * Copyright (c) 2024 Fabian Posch * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. * ************************************************************************** */ #include #include #include #include #include #include #include "task_interface.hpp" #include "worker.hpp" #include "uploader.hpp" #include "downloader.hpp" #include "monitor.hpp" #include "util.h" #include "actsim_agent.hpp" // threading safe interface static TaskInterface interface(DOWNLOAD_BUFFER); int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { DEBUG_PRINT("Agent asked to start with " + std::to_string(worker_processes) + " worker threads."); // inform the database about which database version we are implementing db_cred.version = DATABASE_VERSION; // this will hold our worker threads once we spawn them std::vector> workers; volatile std::atomic_bool all_workers_finished(false); auto monitor = std::make_unique(db_cred, workers, interface, all_workers_finished); // First we check that we can talk to the database. No reason to spin up if there is no one to talk to. if (!monitor->check_connection()) { std::cerr << "Error: Could not connect to the database!" << std::endl; return 1; } // register the abort signal to stop the program std::signal(SIGINT, sigint_handler); DEBUG_PRINT("Start the upload thread..."); // start the upload thread auto upload_thread = std::make_unique(db_cred, interface); upload_thread->start(); DEBUG_PRINT("Starting workers..."); // start the worker threads for (size_t i = 0; i < worker_processes; i++) { auto worker = std::make_unique(interface); worker->start(); workers.emplace_back(std::move(worker)); } // now we can start the monitor monitor->start(); DEBUG_PRINT("Starting download thread..."); // finally, start the download thread auto download_thread = std::make_unique(db_cred, interface); download_thread->start(); DEBUG_PRINT("Agent is running!"); // this is where the threads do their thing // anything past here means that the program was asked to close download_thread->join(); // inform all threads about the program ending interface.notify_workers_program_halt(); DEBUG_PRINT("Program was closed, download thread stopped, waiting for workers to finish..."); // wait for the workers to finish for (auto& worker : workers) { worker->join(); } DEBUG_PRINT("Worker threads shut down, closing the monitor..."); // all workers are closed, monitor shut down all_workers_finished.store(true, std::memory_order_seq_cst); monitor->join(); DEBUG_PRINT("Cleaning up remaining uploads..."); // now we can cleanup what was generated after stop was called interface.notify_cleanup_ready(); // wait for the upload to finish upload_thread->join(); DEBUG_PRINT("Done."); return 0; } void sigint_handler(int signal) { if (signal != SIGINT) return; if (!interface.running()) { std::cout << "Stopping all workers immediately and closing open tasks." << std::endl; interface.stop_immediately(); } else { interface.stop(); interface.notify_download_program_halt(); std::cout << "Finishing all running simulations, then closing the agent." << std::endl; std::cout << "To stop immediately instead, press Ctrl+C again." << std::endl; } }