/************************************************************************* * * This file is part of the ACT library * * Copyright (c) 2024 Fabian Posch * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. * ************************************************************************** */ #include #include "task_interface.hpp" TaskInterface::TaskInterface(size_t buffer_size) { this->buffer_size = buffer_size; this->running_.store(true, std::memory_order_relaxed); this->immediate_stop.store(false, std::memory_order_relaxed); } TaskInterface::~TaskInterface() { bool empty = false; while (!empty) this->pop_fresh(empty); empty = false; while (!empty) this->pop_finished(empty); } void TaskInterface::push_fresh(std::unique_ptr task) { // lock the queue and insert into it std::lock_guard lock(this->fresh_queue_mutex); this->fresh_queue.push(std::move(task)); // we put one task in there so we notify one worker thread this->fresh_queue_empty_condition.notify_one(); } bool TaskInterface::fresh_queue_empty() { std::lock_guard lock(this->fresh_queue_mutex); return this->fresh_queue.empty(); } bool TaskInterface::finished_queue_empty() { std::lock_guard lock(this->finished_queue_mutex); return this->finished_queue.empty(); } std::unique_ptr TaskInterface::pop_fresh(bool& empty) { // we first need exclusive access to the queue std::lock_guard lock (this->fresh_queue_mutex); // we have to make sure the queue wasn't emptied since the last empty call was made // or at least inform the calling thread that the queue is currently empty std::unique_ptr task; if (!(empty = this->fresh_queue.empty())) { task = std::move(this->fresh_queue.front()); // if the task needs to be simulated by multiple workers, don't remove it from the queue if (!task->parallelizable()) { this->fresh_queue.pop(); this->fresh_queue_full_condition.notify_one(); } } return task; } void TaskInterface::wait_for_fresh() { std::unique_lock lock (this->fresh_queue_empty_mutex); // we will be notified either when there is new data or the program has been stopped this->fresh_queue_empty_condition.wait(lock, [&] { return !this->fresh_queue_empty() || !running(); }); } void TaskInterface::wait_for_finished() { std::unique_lock lock (this->finished_queue_empty_mutex); // we will be notified either when there is new data or the program has been stopped this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); }); } void TaskInterface::wait_for_buffer_consume() { std::unique_lock lock (this->fresh_queue_full_mutex); // we will be notified either when there is new data or the program has been stopped this->fresh_queue_full_condition.wait(lock, [&] { return this->get_buffer_space() > 0 || !running(); }); } void TaskInterface::push_finished(std::unique_ptr task) { std::lock_guard lock(this->finished_queue_mutex); this->finished_queue.push(std::move(task)); // there is new data, inform the upload thread this->finished_queue_empty_condition.notify_one(); } std::unique_ptr TaskInterface::pop_finished(bool& empty) { // we first need exclusive access to the queue std::lock_guard lock (this->finished_queue_mutex); // we have to make sure the queue wasn't emptied since the last empty call was made // or at least inform the calling thread that the queue is currently empty std::unique_ptr task; if (!(empty = this->finished_queue.empty())) { task = std::move(this->finished_queue.front()); this->finished_queue.pop(); } return task; } bool TaskInterface::increment_design(const db::uuid_t& id) { std::lock_guard lock (this->designs_mutex); DEBUG_PRINT("Looking for design with ID " + db::to_string(id)); // make sure the requested design is in the list of available designs if (this->designs.find(id) == this->designs.end()) { DEBUG_PRINT("Design not found."); return false; } std::pair& design_entry = designs[id]; // if so, increment its reference counters ++design_entry.first; DEBUG_PRINT("Design found. Incrementing reference counter. New counter is " + std::to_string(design_entry.first)); return true; } void TaskInterface::decrement_design(const db::uuid_t& id) { std::lock_guard lock (this->designs_mutex); DEBUG_PRINT("Looking to decrement design with ID " + db::to_string(id)); // make sure the requested design is in the list of available designs if (this->designs.find(id) == this->designs.end()) { DEBUG_PRINT("Could not find design. Not decrementing."); return; } std::pair& design_entry = designs[id]; // if so, decrement its reference counters --design_entry.first; DEBUG_PRINT("Design found. Decrementing reference counter. New counter is " + std::to_string(design_entry.first)); // if the reference counter hit 0, erase the design entry from the list // of available designs if (design_entry.first == 0) { DEBUG_PRINT("Reference counter has hit 0. Deleting temp file from disk..."); // delete the temporary file from disk DEBUG_PRINT("Deleting design file from disk."); std::remove(design_entry.second.c_str()); DEBUG_PRINT("Erasing design from store."); this->designs.erase(id); } } std::string TaskInterface::get_design(const db::uuid_t& id) { std::lock_guard lock (this->designs_mutex); // make sure the requested design is in the list of available designs if (this->designs.find(id) == this->designs.end()) { std::cerr << "Error: Design was somehow deleted before it could reach the execution stage. This should really never happen!" << std::endl; return ""; } return this->designs[id].second; } void TaskInterface::store_design(const db::uuid_t& id, std::string& design) { std::lock_guard lock (this->designs_mutex); DEBUG_PRINT("Henlo Storing new design with ID " + db::to_string(id)); // make sure the design isn't already in the list of design entries // if it is, just increment its reference counter if (this->designs.find(id) != this->designs.end()) { DEBUG_PRINT("Design is already in here, incrementing reference counter instead."); ++(this->designs[id]).first; return; } // otherwise, create a new entry for this design this->designs[id] = {1, design}; } size_t TaskInterface::get_buffer_space() { std::lock_guard lock(this->fresh_queue_mutex); return this->buffer_size - this->fresh_queue.size(); } void TaskInterface::notify_cleanup_ready() { this->cleanup_ready.store(true, std::memory_order_seq_cst); this->cleanup_ready_condition.notify_all(); } void TaskInterface::wait_for_cleanup_ready() { // lock the thread and wait to be notified std::unique_lock lock(this->cleanup_ready_mutex); this->cleanup_ready_condition.wait(lock, [&] { return this->cleanup_ready.load(std::memory_order_seq_cst); }); } void TaskInterface::wait_for_download_halt() { std::unique_lock lock(this->download_halt_mutex); this->download_halt_condition.wait(lock, [&] { return !this->running(); }); } void TaskInterface::stop() { this->running_.store(false, std::memory_order_relaxed); this->fresh_queue_full_condition.notify_all(); }; void TaskInterface::notify_workers_program_halt() { this->finished_queue_empty_condition.notify_all(); this->fresh_queue_empty_condition.notify_all(); } void TaskInterface::notify_download_halt() { this->download_halt_condition.notify_all(); }