implemented uploader

This commit is contained in:
Fabian Posch 2024-01-09 21:30:02 -05:00
parent 3713c50c47
commit 0344d030bf
4 changed files with 163 additions and 6 deletions

View file

@ -36,7 +36,6 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes);
// private headers
void sigint_handler(int signal);
void run_upload(db::db_credentials_t db_cred);
bool task_halted(db::uuid_t task);
#endif

View file

@ -0,0 +1,54 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __UPLOADER_H__
#define __UPLOADER_H__
#include <db_types.hpp>
#include <db_client.hpp>
#include <thread>
#include "task_interface.hpp"
class Uploader {
public:
Uploader(db::db_credentials_t& db_cred, TaskInterface& interface);
void start();
void join();
private:
void thread_run();
bool upload_task(std::unique_ptr<Task> task);
std::unique_ptr<std::thread> uploader_thread;
std::unique_ptr<db::Connection> conn;
TaskInterface& interface;
};
#endif

View file

@ -30,6 +30,7 @@
#include <db_client.hpp>
#include "task_interface.hpp"
#include "worker.hpp"
#include "uploader.hpp"
#include "util.h"
#include "actsim_agent.hpp"
@ -69,7 +70,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
}
// Spawn the upload thread
auto upload_thread = std::make_unique<std::thread>(run_upload, db_cred);
auto upload_thread = std::make_unique<Uploader>(db_cred, interface);
upload_thread->start();
// Run loop feeding the threads
while (true) {
@ -102,6 +104,8 @@ int start_agent(db::db_credentials_t db_cred, size_t worker_processes) {
worker->join();
}
interface.notify_cleanup_ready();
// wait for the upload to finish
upload_thread->join();
@ -127,7 +131,3 @@ void sigint_handler(int signal) {
bool task_halted([[maybe_unused]]db::uuid_t task) {
return false;
}
void run_upload([[maybe_unused]]db::db_credentials_t db_cred) {
}

View file

@ -0,0 +1,104 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include "uploader.hpp"
Uploader::Uploader(db::db_credentials_t& db_cred, TaskInterface& interface) :
interface(interface)
{
this->conn = std::make_unique<db::Connection>(db_cred);
}
void Uploader::start() {
std::cout << "REMOVE Upload thread started" << std::endl;
this->uploader_thread = std::make_unique<std::thread>([this]() { thread_run(); });
}
void Uploader::join() {
if (this->uploader_thread != nullptr) return;
this->uploader_thread->join();
}
void Uploader::thread_run() {
// connect to the database
if (!this->conn->connect()) {
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
this->interface.stop();
return;
}
while (this->interface.running()) {
// this blocks until either a new task is available for upload or the
// program was halted
this->interface.wait_for_finished();
// so first we check if we should still be running
if (!this->interface.running()) break;
// we're still good to go! get a task from the fresh queue
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
// everything is good, upload the given task
bool success = this->upload_task(std::move(task));
// Uh oh, seems like we lost database connection! Close the program.
if (!success) {
std::cerr << "Error: Lost database connection during upload! Database integrity might be compromised." << std::endl;
this->interface.stop();
return;
}
}
// since worker threads might have been running after we ended
// the normal upload loop, we have to clean up after them
this->interface.wait_for_cleanup_ready();
// upload all the remaining tasks
while (!this->interface.finished_queue_empty()) {
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
// in case there are ever multiple upload threads,
// the same issues apply as before
if (!queue_empty) {
if (!this->upload_task(std::move(task))) {
std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl;
}
}
}
}
bool Uploader::upload_task([[maybe_unused]]std::unique_ptr<Task> task) {
std::cout << "Task uploaded!";
return true;
}