actsim-cluster-agent/src/actsim_agent/downloader.cpp

126 lines
4 KiB
C++

/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include <cluster/artifact.hpp>
#include "util.h"
#include "downloader.hpp"
Downloader::Downloader(db::db_credentials_t db_cred, TaskInterface& interface) :
interface(interface)
{
this->conn = std::make_unique<db::Connection>(db_cred);
}
void Downloader::start() {
DEBUG_PRINT("Starting downloader thread...");
this->downloader_thread = std::make_unique<std::thread>([this]() { thread_run(); });
}
void Downloader::join() {
if (this->downloader_thread == nullptr) return;
this->downloader_thread->join();
}
void Downloader::thread_run() {
// connect to the database
if (!this->conn->connect()) {
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
this->interface.stop();
return;
}
while (this->interface.running()) {
// wait until there is more space in the buffer to fill
this->interface.wait_for_buffer_consume();
// make sure we weren't woken up because the program closed
if (!this->interface.running()) break;
// if the download buffer is not full, fetch some more tasks
this->fetch_tasks(this->interface.get_buffer_space());
}
// notify the control thread that download has halted
this->interface.notify_download_halt();
// wait for the workers to finish their thing
this->interface.wait_for_cleanup_ready();
// then reopen all tasks that were still left to do
bool empty = false;
while (!empty) {
auto task = this->interface.pop_fresh(empty);
if (empty) continue;
this->reopen_task(task->id);
this->interface.decrement_design(task->design);
}
}
bool Downloader::fetch_tasks(size_t n) {
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
std::cout << "[DOWNLOAGER] Downloading " << n << " tasks, implement me!" << std::endl;
auto id = db::uuid_t();
id = 1;
auto job_id = "deadbeef";
for (size_t i = 0; i < n; ++i) {
auto task = std::make_unique<pl::SimConfigArtifact>(id, id);
// see if we already have the desing locally; if not, load it
if (!this->interface.increment_design(task->design)) {
DEBUG_PRINT("Fetching new design with ID " + db::to_string(task->design));
std::string design;
// if we could not load the design, reopen it in the database
if (!this->fetch_design(task->design, design)) {
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
this->reopen_task(task->id);
continue;
}
this->interface.store_design(task->design, design);
}
// push the task to the list of open tasks
this->interface.push_fresh(std::move(task));
}
return true;
}
bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
design = "test design";
return true;
}
void Downloader::reopen_task(const db::uuid_t& id) {
std::cout << "[DOWNLOADER] Reopening task!" << std::endl;
}