add reference store to task interface

This commit is contained in:
Fabian Posch 2024-01-29 11:55:35 -05:00
parent 9c06cd7e64
commit 978c58268a
2 changed files with 89 additions and 0 deletions

View file

@ -63,11 +63,22 @@ class TaskInterface {
std::unique_ptr<OutputType> pop_finished(bool& empty);
size_t get_buffer_space();
/*
* Store a design entry locally
*/
bool increment_design(const db::uuid_t& id);
void decrement_design(const db::uuid_t& id);
std::string get_design(const db::uuid_t& id);
void store_design(const db::uuid_t&, std::string& design);
/*
* Store a reference run locally
*/
bool increment_reference(const db::uuid_t& id);
void decrement_reference(const db::uuid_t& id);
pl::SimOutputArtifact get_reference(const db::uuid_t& id);
void store_reference(const db::uuid_t&, pl::SimOutputArtifact);
bool running() { return this->running_.load(std::memory_order_relaxed); };
bool is_stop_immediate() { return this->immediate_stop.load(std::memory_order_relaxed); };
void stop();
@ -87,6 +98,7 @@ class TaskInterface {
volatile std::atomic_bool immediate_stop;
std::unordered_map<db::uuid_t, std::pair<size_t, std::string>> designs;
std::unordered_map<db::uuid_t, std::pair<size_t, pl::SimOutputArtifact>> references;
////// Mutexes //////
@ -96,6 +108,9 @@ class TaskInterface {
// design map access
std::mutex designs_mutex;
// reference map access
std::mutex references_mutex;
// notify upload thread that the finished queue is ready for cleanup
std::atomic_bool cleanup_ready;
std::mutex cleanup_ready_mutex;

View file

@ -206,6 +206,80 @@ void TaskInterface::store_design(const db::uuid_t& id, std::string& design) {
this->designs[id] = {1, design};
}
bool TaskInterface::increment_reference(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->references_mutex);
DEBUG_PRINT("Looking for reference run with ID " + db::to_string(id));
// make sure the requested reference run is in the list of available reference runs
if (this->references.find(id) == this->references.end()) {
DEBUG_PRINT("Reference run not found.");
return false;
}
std::pair<size_t, pl::SimOutputArtifact>& reference_run = references[id];
// if so, increment its reference counter
++reference_run.first;
DEBUG_PRINT("Reference run found. Incrementing reference counter. New counter is " + std::to_string(reference_run.first));
return true;
}
void TaskInterface::decrement_reference(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->references_mutex);
DEBUG_PRINT("Looking to decrement reference run with ID " + db::to_string(id));
// make sure the requested reference run is in the list of available reference runs
if (this->references.find(id) == this->references.end()) {
DEBUG_PRINT("Could not find reference run. Not decrementing.");
return;
}
std::pair<size_t, pl::SimOutputArtifact>& reference_run = references[id];
// if so, decrement its reference counters
--reference_run.first;
DEBUG_PRINT("Reference run found. Decrementing reference counter. New counter is " + std::to_string(reference_run.first));
// if the reference counter hit 0, erase the reference run entry from the list
// of available reference runs
if (reference_run.first == 0) {
DEBUG_PRINT("Reference counter has hit 0. Erasing reference run from map...");
this->references.erase(id);
}
}
pl::SimOutputArtifact TaskInterface::get_reference(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->references_mutex);
// make sure the requested reference run is in the list of available reference runs
if (this->references.find(id) == this->references.end()) {
std::cerr << "Error: Reference run was somehow deleted before it could reach the execution stage. This should really never happen!" << std::endl;
return pl::SimOutputArtifact();
}
return this->references[id].second;
}
void TaskInterface::store_reference(const db::uuid_t& id, pl::SimOutputArtifact reference) {
std::lock_guard<std::mutex> lock (this->references_mutex);
DEBUG_PRINT("Storing new design with ID " + db::to_string(id));
// make sure the reference run isn't already in the list of reference runs
// if it is, just increment its reference counter
if (this->references.find(id) != this->references.end()) {
DEBUG_PRINT("Reference run is already in here, incrementing reference counter instead.");
++(this->references[id]).first;
return;
}
// otherwise, create a new entry for this design
this->references[id] = {1, reference};
}
size_t TaskInterface::get_buffer_space() {
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
return this->buffer_size - this->fresh_queue.size();