diff --git a/include/actsim_agent/actsim_agent.hpp b/include/actsim_agent/actsim_agent.hpp index 4257822..23254fd 100644 --- a/include/actsim_agent/actsim_agent.hpp +++ b/include/actsim_agent/actsim_agent.hpp @@ -36,7 +36,6 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes); // private headers void sigint_handler(int signal); -void run_upload(db::db_credentials_t db_cred); bool task_halted(db::uuid_t task); #endif diff --git a/include/actsim_agent/uploader.hpp b/include/actsim_agent/uploader.hpp new file mode 100644 index 0000000..672d3f2 --- /dev/null +++ b/include/actsim_agent/uploader.hpp @@ -0,0 +1,54 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#ifndef __UPLOADER_H__ +#define __UPLOADER_H__ + +#include +#include +#include +#include "task_interface.hpp" + +class Uploader { + + public: + + Uploader(db::db_credentials_t& db_cred, TaskInterface& interface); + + void start(); + void join(); + + private: + + void thread_run(); + bool upload_task(std::unique_ptr task); + + std::unique_ptr uploader_thread; + std::unique_ptr conn; + + TaskInterface& interface; +}; + +#endif \ No newline at end of file diff --git a/src/actsim_agent/actsim_agent.cpp b/src/actsim_agent/actsim_agent.cpp index 823cd52..3dfa715 100644 --- a/src/actsim_agent/actsim_agent.cpp +++ b/src/actsim_agent/actsim_agent.cpp @@ -30,6 +30,7 @@ #include #include "task_interface.hpp" #include "worker.hpp" +#include "uploader.hpp" #include "util.h" #include "actsim_agent.hpp" @@ -69,7 +70,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { } // Spawn the upload thread - auto upload_thread = std::make_unique(run_upload, db_cred); + auto upload_thread = std::make_unique(db_cred, interface); + upload_thread->start(); // Run loop feeding the threads while (true) { @@ -102,6 +104,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) { worker->join(); } + interface.notify_cleanup_ready(); + // wait for the upload to finish upload_thread->join(); @@ -127,7 +131,3 @@ void sigint_handler(int signal) { bool task_halted([[maybe_unused]]db::uuid_t task) { return false; } - -void run_upload([[maybe_unused]]db::db_credentials_t db_cred) { - -} diff --git a/src/actsim_agent/uploader.cpp b/src/actsim_agent/uploader.cpp new file mode 100644 index 0000000..b45df97 --- /dev/null +++ b/src/actsim_agent/uploader.cpp @@ -0,0 +1,104 @@ + +/************************************************************************* + * + * This file is part of the ACT library + * + * Copyright (c) 2024 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#include "uploader.hpp" + +Uploader::Uploader(db::db_credentials_t& db_cred, TaskInterface& interface) : + interface(interface) +{ + this->conn = std::make_unique(db_cred); +} + +void Uploader::start() { + std::cout << "REMOVE Upload thread started" << std::endl; + this->uploader_thread = std::make_unique([this]() { thread_run(); }); +} + +void Uploader::join() { + if (this->uploader_thread != nullptr) return; + this->uploader_thread->join(); +} + +void Uploader::thread_run() { + + // connect to the database + if (!this->conn->connect()) { + std::cerr << "Error: Upload thread could not connect to the database!" << std::endl; + this->interface.stop(); + return; + } + + while (this->interface.running()) { + + // this blocks until either a new task is available for upload or the + // program was halted + this->interface.wait_for_finished(); + + // so first we check if we should still be running + if (!this->interface.running()) break; + + // we're still good to go! get a task from the fresh queue + bool queue_empty; + auto task = this->interface.pop_finished(queue_empty); + + // we need to make sure the queue wasn't emptied between waiting and getting new data + if (queue_empty) continue; + + // everything is good, upload the given task + bool success = this->upload_task(std::move(task)); + + // Uh oh, seems like we lost database connection! Close the program. + if (!success) { + std::cerr << "Error: Lost database connection during upload! Database integrity might be compromised." << std::endl; + this->interface.stop(); + return; + } + } + + // since worker threads might have been running after we ended + // the normal upload loop, we have to clean up after them + this->interface.wait_for_cleanup_ready(); + + // upload all the remaining tasks + while (!this->interface.finished_queue_empty()) { + + bool queue_empty; + auto task = this->interface.pop_finished(queue_empty); + + // in case there are ever multiple upload threads, + // the same issues apply as before + if (!queue_empty) { + if (!this->upload_task(std::move(task))) { + std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl; + } + } + + } +} + +bool Uploader::upload_task([[maybe_unused]]std::unique_ptr task) { + std::cout << "Task uploaded!"; + return true; +}