actsim-cluster-agent/src/actsim_agent/uploader.cpp

108 lines
3.6 KiB
C++

/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include "util.h"
#include "uploader.hpp"
Uploader::Uploader(db::db_credentials_t& db_cred, TaskInterface& interface) :
interface(interface)
{
this->conn = std::make_unique<db::Connection>(db_cred);
}
void Uploader::start() {
DEBUG_PRINT("Starting upload thread...");
this->uploader_thread = std::make_unique<std::thread>([this]() { thread_run(); });
}
void Uploader::join() {
if (this->uploader_thread == nullptr) return;
this->uploader_thread->join();
}
void Uploader::thread_run() {
// connect to the database
if (!this->conn->connect()) {
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
this->interface.stop();
return;
}
while (this->interface.running()) {
// this blocks until either a new task is available for upload or the
// program was halted
this->interface.wait_for_finished();
// so first we check if we should still be running
if (!this->interface.running()) break;
// we're still good to go! get a task from the fresh queue
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
// everything is good, upload the given task
bool success = this->upload_task(std::move(task));
// Uh oh, seems like we lost database connection! Close the program.
if (!success) {
std::cerr << "Error: Lost database connection during upload! Database integrity might be compromised." << std::endl;
this->interface.stop();
return;
}
}
// since worker threads might have been running after we ended
// the normal upload loop, we have to clean up after them
this->interface.wait_for_cleanup_ready();
// upload all the remaining tasks
while (!this->interface.finished_queue_empty()) {
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
// in case there are ever multiple upload threads,
// the same issues apply as before
if (!queue_empty) {
if (!this->upload_task(std::move(task))) {
std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl;
}
}
}
}
bool Uploader::upload_task([[maybe_unused]]std::unique_ptr<OutputType> task) {
// make sure any task that is uploaded isn't halted in the database
std::cout << "[UPLOADER] Task uploaded!" << std::endl;
return true;
}