final performant version

This commit is contained in:
Fabian Posch 2025-02-02 16:33:52 +01:00
parent 5786b7bf22
commit 7dcadc1ca2
9 changed files with 176 additions and 237 deletions

View file

@ -31,8 +31,6 @@
#include <cluster/db_client.hpp>
#include "task_interface.hpp"
#define NOTHING_AVAILABLE_SLEEP_TIME 500ms
class Downloader {
public:

View file

@ -26,6 +26,7 @@
#ifndef __TASK_INTERFACE__
#define __TASK_INTERFACE__
#include <cstddef>
#include <queue>
#include <unordered_map>
#include <atomic>
@ -33,6 +34,7 @@
#include <mutex>
#include <vector>
#include "agent_artifact.hpp"
#include <chrono>
/**
* If you want to use this interface for different types, you only need to change it here.
@ -41,6 +43,8 @@
using InputType = DBSimConfigArtifact;
using OutputType = DBSimOutputArtifact;
#define NOTHING_AVAILABLE_SLEEP_TIME 500ms
class TaskInterface {
public:
@ -49,10 +53,11 @@ class TaskInterface {
~TaskInterface();
void wait_for_fresh();
void wait_for_finished();
void wait_for_finished(size_t min_size);
void wait_for_buffer_consume();
void wait_for_cleanup_ready();
void wait_for_download_halt();
void wait_for_available();
void notify_cleanup_ready();
void notify_workers_program_halt();
@ -62,7 +67,7 @@ class TaskInterface {
void push_fresh(std::vector<std::unique_ptr<InputType>>& tasks);
std::unique_ptr<InputType> pop_fresh(bool& empty);
void push_finished(std::unique_ptr<OutputType> task);
std::unique_ptr<OutputType> pop_finished(bool& empty);
std::vector<std::unique_ptr<OutputType>> pop_finished(bool& empty);
size_t get_buffer_space();
/*
@ -87,7 +92,7 @@ class TaskInterface {
void stop_immediately() { this->immediate_stop.store(true, std::memory_order_relaxed); };
bool fresh_queue_empty();
bool finished_queue_empty();
bool finished_queue_empty(size_t min_size);
private:
@ -102,6 +107,9 @@ class TaskInterface {
std::unordered_map<db::uuid_t, std::pair<size_t, std::string>> designs;
std::unordered_map<db::uuid_t, std::pair<size_t, std::shared_ptr<pl::SimOutputArtifact>>> references;
bool worker_waiting = false;
bool downloader_waiting = false;
////// Mutexes //////
// access to task queues

View file

@ -32,6 +32,8 @@
#include <thread>
#include "task_interface.hpp"
std::string build_fault_flags(const std::unique_ptr<OutputType>& task);
class Uploader {
public:
@ -44,8 +46,7 @@ class Uploader {
private:
void thread_run();
bool upload_task(std::unique_ptr<OutputType> task);
std::string build_fault_flags(const std::unique_ptr<OutputType>& task);
bool upload_task(std::vector<std::unique_ptr<OutputType>>& tasks);
std::unique_ptr<std::thread> uploader_thread;
std::unique_ptr<db::Connection> conn;

View file

@ -26,6 +26,7 @@
#define __DB_CLIENT_H__
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <pqxx/pqxx>
@ -69,6 +70,31 @@ class Connection {
bool prepare_statement(std::string name, std::string statement);
bool unprepare_statement(std::string name);
pqxx::work open_transaction(bool& success) {
retry_send:
if (!ensure_connected()) {
cerr << "Could contact database. Broken database connection." << endl;
success = false;
}
try {
return pqxx::work(*c);
} catch (const pqxx::broken_connection& e) {
cerr << "Pipe broke during database operation... Retrying..." << endl;
cerr << e.what() << endl;
// using a goto here since it'll gracefully retry connection
goto retry_send;
}
success = true;
}
// Send request takes an arbitrary request method to the database and wraps error handling
// around it. It takes a function with an arbitrary set of arguments and the arguments to give
// that function and returns whatever the function returns.

View file

@ -1,5 +1,4 @@
include_directories(../../include)
include_directories(../../include/db_lib)
include_directories(../../include/actsim_agent)
include_directories($ENV{ACT_HOME}/include)
include_directories(../../deps/libpqxx/include)

View file

@ -23,6 +23,7 @@
**************************************************************************
*/
#include "../../include/cluster/db_client.hpp"
#include <cstddef>
#include <cstdint>
#include <iostream>
@ -34,11 +35,11 @@
#include <cstdlib>
#include <pqxx/pqxx>
#include <functional>
#include <unordered_set>
#include <vector>
#include "actsim_agent.hpp"
#include "agent_artifact.hpp"
#include "cluster/db_types.hpp"
#include "pqxx/internal/statement_parameters.hxx"
#include "pqxx/prepared_statement.hxx"
#include "task_interface.hpp"
#include "util.h"
@ -62,8 +63,6 @@ void Downloader::join() {
void Downloader::thread_run() {
using namespace std::chrono_literals;
// connect to the database
if (!this->conn->connect()) {
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
@ -72,6 +71,10 @@ void Downloader::thread_run() {
return;
}
this->conn->prepare_statement("mark_tasks",
"UPDATE sim_configs SET part_status = 'finished' WHERE id = $1;"
);
while (this->interface.running()) {
// wait until there is more space in the buffer to fill
@ -86,7 +89,7 @@ void Downloader::thread_run() {
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
// we can sleep for a certain amount of time, nothing to do
// DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit...");
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
this->interface.wait_for_available();
}
}
@ -112,155 +115,46 @@ void Downloader::thread_run() {
bool Downloader::fetch_tasks(size_t n) {
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
// fetch a new task from the database
auto fetch_task_lambda = [n](
pqxx::work *txn,
bool *task_avail,
std::unordered_set<db::uuid_t> *references,
std::vector<std::unique_ptr<DBSimConfigArtifact>> *tasks
) {
bool txn_opened;
auto txn = this->conn->open_transaction(txn_opened);
pqxx::result res;
// Fetch a new task by a couple rules:
// 1.) It has not been started yet or the output row doesn't exist
// 2.) Passes that are already in progress are preferred
// 3.) New passes are started in the order they were added to the database
// 4.) Passes are only started if all their dependencies are fulfilled
auto res = txn->exec(
"SELECT "
" ap.design_file AS design, "
" ap.top_proc, "
" ap.outputs AS target_artifact, "
" ap.id AS source_pass, "
" sc.id AS source_config, "
" sc.sim_commands, "
" sc.has_reference AS reference, "
" so.id AS id "
"FROM "
" sim_configs sc "
"JOIN "
" actsim_passes ap ON ap.sim_configs = sc.artifact "
"LEFT JOIN "
" sim_outputs so ON sc.has_reference = so.sim_config "
"JOIN "
" jobs j ON ap.job = j.id "
"WHERE "
" (so.part_status IS NULL OR so.part_status = 'finished') "
" AND ap.id IN ( "
" SELECT ap_dep.id "
" FROM actsim_passes ap_dep "
" JOIN passes p ON ap_dep.id = p.id "
" WHERE NOT EXISTS ( "
" SELECT 1 "
" FROM passes dep "
" WHERE dep.id = ANY(p.depends_on) "
" AND dep.pass_status != 'finished' "
" ) "
" ) "
// " AND (sc.has_reference IS NULL OR "
// " EXISTS ("
// " SELECT 1 "
// " FROM sim_outputs out "
// " WHERE out.sim_config = sc.has_reference "
// " AND out.part_status = 'finished' "
// " ) "
// " ) "
// "ORDER BY "
// " ap.pass_status = 'in_progress' DESC, "
// " j.time_added ASC "
try {
res = txn.exec(
"SELECT * "
"FROM open_tasks "
"LIMIT $1;",
n
);
// seems like there is nothing to do right now
if (res.size() < 1) {
*task_avail = false;
return;
for (auto&& row : res) {
txn.exec(pqxx::prepped("mark_tasks"), {row["source_config"].as<db::uuid_t>()});
}
*task_avail = true;
txn.commit();
} catch (const std::exception& e) {
// generate the output rows
std::string sql = R"(
INSERT INTO sim_outputs (artifact, source_pass, sim_config)
VALUES ($1, $2, $3)
ON CONFLICT (sim_config)
DO UPDATE SET part_status = 'in_progress'
RETURNING id;
)";
// if something happened during the transaction, we roll it back
txn.abort();
txn->conn().prepare("insert_output_rows", sql);
for (auto row : res) {
auto target_artifact = row["target_artifact"].as<db::uuid_t>();
auto source_pass = row["source_pass"].as<db::uuid_t>();
auto design = row["design"].as<db::uuid_t>();
db::uuid_t reference;
reference = 0;
if (!row["reference"].is_null()) {
reference = row["reference"].as<db::uuid_t>();
(*references).emplace(reference);
}
auto source_config = row["source_config"].as<db::uuid_t>();
auto com_arr = row["sim_commands"].as_sql_array<std::string>();
std::vector<std::string> commands(com_arr.cbegin(), com_arr.cend());
pl::testcase_t testcase = {
commands,
row["top_proc"].as<std::string>(),
false
};
// get the id
auto id = txn->exec(pqxx::prepped{"insert_output_rows"}, {target_artifact, source_pass, source_config})[0]["id"].as<db::uuid_t>();
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, reference, source_config);
task->add_testcase(testcase);
(*tasks).emplace_back(std::move(task));
}
txn->conn().unprepare("insert_output_rows");
};
std::function<void(
pqxx::work *txn,
bool *task_avail,
std::unordered_set<db::uuid_t> *references,
std::vector<std::unique_ptr<DBSimConfigArtifact>> *tasks
)> fetch_task_func = fetch_task_lambda;
std::unordered_set<db::uuid_t> references;
bool task_avail;
std::vector<std::unique_ptr<DBSimConfigArtifact>> tasks;
if (!this->conn->send_request(
&fetch_task_func,
&task_avail,
&references,
&tasks
)) {
// if we lost connection, there's nothing else we can really do
std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl;
std::cerr << e.what() << std::endl;
this->interface.stop();
this->interface.stop_immediately();
}
// seems like there are no tasks to do right now; tell the calling function
// that there is still space in the buffer and we should just wait a while
if (!task_avail) {
DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!");
return false;
}
for (auto&& task : tasks) {
// seems like there is nothing to do right now
if (res.size() < 1) {
return false;
}
auto design = task->design;
for (auto row : res) {
auto target_artifact = row["target_artifact"].as<db::uuid_t>();
auto source_pass = row["source_pass"].as<db::uuid_t>();
auto design = row["design"].as<db::uuid_t>();
auto source_config = row["source_config"].as<db::uuid_t>();
// see if we already have the design locally; if not, load it
if (!this->interface.increment_design(design)) {
@ -270,37 +164,51 @@ bool Downloader::fetch_tasks(size_t n) {
// if we could not load the design, reopen it in the database
if (!this->fetch_design(design, design_path)) {
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
this->reopen_task(task->id, true);
std::cerr << "Error: Could not load design for task " << source_config << ", reopening it." << std::endl;
this->reopen_task(source_config, true);
continue;
}
this->interface.store_design(design, design_path);
}
}
db::uuid_t reference;
reference = 0;
// if we have a reference for this run, we have to see if it is loaded
for (auto&& reference : references) {
if (!row["reference"].is_null()) {
reference = row["reference"].as<db::uuid_t>();
// see if we already have the reference run locally; if not, load it
if (!this->interface.increment_reference(reference)) {
// see if we already have the reference run locally; if not, load it
if (!this->interface.increment_reference(reference)) {
DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference));
std::shared_ptr<pl::SimOutputArtifact> reference_run;
// if we could not load the reference run, reopen the task in the database
if ((reference_run = this->fetch_reference_run(reference)) == nullptr) {
std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl;
// this->reopen_task(task->id, true);
continue;
}
DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference));
std::shared_ptr<pl::SimOutputArtifact> reference_run;
// if we could not load the reference run, reopen the task in the database
if ((reference_run = this->fetch_reference_run(reference)) == nullptr) {
std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl;
// this->reopen_task(task->id, true);
continue;
this->interface.store_reference(reference, reference_run);
}
this->interface.store_reference(reference, reference_run);
}
}
// push the task to the list of open tasks
this->interface.push_fresh(tasks);
auto com_arr = row["sim_commands"].as_sql_array<std::string>();
std::vector<std::string> commands(com_arr.cbegin(), com_arr.cend());
pl::testcase_t testcase = {
commands,
row["top_proc"].as<std::string>(),
false
};
auto task = std::make_unique<DBSimConfigArtifact>(db::uuid_t(), source_pass, target_artifact, design, reference, source_config);
task->add_testcase(testcase);
this->interface.push_fresh(std::move(task));
}
return true;
}
@ -379,14 +287,14 @@ std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db:
bool *found
) {
try {
auto res = txn->exec("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id)[0];
auto res = txn->exec("SELECT output_tokens, output_token_timings FROM sim_outputs WHERE sim_config = $1;", id)[0];
// load the output token timings
auto arr_ott = res["output_token_timings"].as_sql_array<uint32_t>();
*output_token_timings = std::vector<uint32_t>(arr_ott.cbegin(), arr_ott.cend());
*output_tokens = res["output_tokens"].as<long>();
*log_size = res["log_size"].as<long>();
*log_size = 0;
*found = true;
@ -443,7 +351,7 @@ void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
// open up the status of this partial output in the database again
auto task_reopen_lambda = [id, status](pqxx::work *txn) {
txn->exec("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status});
txn->exec("UPDATE sim_configs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status});
};
std::function<void(pqxx::work*)> task_reopen_func = task_reopen_lambda;

View file

@ -24,7 +24,10 @@
*/
#include "util.h"
#include <chrono>
#include <cstddef>
#include <cstdio>
#include <thread>
#include <utility>
#include <vector>
#include "task_interface.hpp"
@ -75,9 +78,9 @@ bool TaskInterface::fresh_queue_empty() {
return this->fresh_queue.empty();
}
bool TaskInterface::finished_queue_empty() {
bool TaskInterface::finished_queue_empty(size_t min_size) {
std::lock_guard<std::mutex> lock(this->finished_queue_mutex);
return this->finished_queue.empty();
return this->finished_queue.size() < min_size;
}
std::unique_ptr<InputType> TaskInterface::pop_fresh(bool& empty) {
@ -105,17 +108,21 @@ std::unique_ptr<InputType> TaskInterface::pop_fresh(bool& empty) {
void TaskInterface::wait_for_fresh() {
std::unique_lock<std::mutex> lock (this->fresh_queue_empty_mutex);
worker_waiting = true;
finished_queue_empty_condition.notify_all();
// we will be notified either when there is new data or the program has been stopped
this->fresh_queue_empty_condition.wait(lock, [&] { return !this->fresh_queue_empty() || !running(); });
DEBUG_PRINT("Worker released from block");
worker_waiting = false;
}
void TaskInterface::wait_for_finished() {
void TaskInterface::wait_for_finished(size_t min_size) {
std::unique_lock<std::mutex> lock (this->finished_queue_empty_mutex);
// we will be notified either when there is new data or the program has been stopped
this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); });
this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty(min_size) || !running() || (worker_waiting && downloader_waiting);});
}
void TaskInterface::wait_for_buffer_consume() {
@ -125,6 +132,18 @@ void TaskInterface::wait_for_buffer_consume() {
this->fresh_queue_full_condition.wait(lock, [&] { return this->get_buffer_space() > 0 || !running(); });
}
void TaskInterface::wait_for_available() {
downloader_waiting = true;
finished_queue_empty_condition.notify_all();
using namespace std::chrono_literals;
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
downloader_waiting = false;
}
void TaskInterface::push_finished(std::unique_ptr<OutputType> task) {
std::lock_guard<std::mutex> lock(this->finished_queue_mutex);
this->finished_queue.push(std::move(task));
@ -133,21 +152,27 @@ void TaskInterface::push_finished(std::unique_ptr<OutputType> task) {
this->finished_queue_empty_condition.notify_one();
}
std::unique_ptr<OutputType> TaskInterface::pop_finished(bool& empty) {
std::vector<std::unique_ptr<OutputType>> TaskInterface::pop_finished(bool& empty) {
std::vector<std::unique_ptr<OutputType>> tasks;
// we first need exclusive access to the queue
std::lock_guard<std::mutex> lock (this->finished_queue_mutex);
// we have to make sure the queue wasn't emptied since the last empty call was made
// or at least inform the calling thread that the queue is currently empty
std::unique_ptr<OutputType> task;
if (!(empty = this->finished_queue.empty())) {
task = std::move(this->finished_queue.front());
this->finished_queue.pop();
// check if there is a minimum amount of tasks in the queue
if (this->finished_queue.empty()) {
empty = true;
return tasks;
}
return task;
empty = false;
while (!this->finished_queue.empty()) {
tasks.emplace_back(std::move(this->finished_queue.front()));
this->finished_queue.pop();
}
return tasks;
}
bool TaskInterface::increment_design(const db::uuid_t& id) {

View file

@ -23,6 +23,7 @@
**************************************************************************
*/
#include <cstddef>
#include <iostream>
#include <memory>
#include <pqxx/pqxx>
@ -57,12 +58,15 @@ void Uploader::thread_run() {
this->interface.stop();
return;
}
size_t min_size = 1;
while (this->interface.running()) {
// this blocks until either a new task is available for upload or the
// program was halted
this->interface.wait_for_finished();
this->interface.wait_for_finished(min_size);
DEBUG_PRINT("Uploader was woken up");
@ -71,7 +75,7 @@ void Uploader::thread_run() {
// we're still good to go! get a task from the fresh queue
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
auto tasks = this->interface.pop_finished(queue_empty);
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
@ -79,7 +83,7 @@ void Uploader::thread_run() {
DEBUG_PRINT("Uploader dequeued new task");
// everything is good, upload the given task
bool success = this->upload_task(std::move(task));
bool success = this->upload_task(tasks);
// Uh oh, seems like we lost database connection! Close the program.
if (!success) {
@ -89,6 +93,7 @@ void Uploader::thread_run() {
}
DEBUG_PRINT("Task successfully uploaded");
min_size = 200;
}
DEBUG_PRINT("Uploader is starting the shutdown procedure, waiting for cleanup ready");
@ -100,75 +105,44 @@ void Uploader::thread_run() {
DEBUG_PRINT("Uploader has received go for cleanup");
// upload all the remaining tasks
while (!this->interface.finished_queue_empty()) {
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
bool queue_empty;
auto tasks = this->interface.pop_finished(queue_empty);
// in case there are ever multiple upload threads,
// the same issues apply as before
if (!queue_empty) {
DEBUG_PRINT("Uploading finished task");
if (!this->upload_task(std::move(task))) {
std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl;
}
// in case there are ever multiple upload threads,
// the same issues apply as before
if (!queue_empty) {
DEBUG_PRINT("Uploading finished task");
if (!this->upload_task(tasks)) {
std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl;
}
}
DEBUG_PRINT("Uploader is done");
}
bool Uploader::upload_task(std::unique_ptr<OutputType> task) {
bool Uploader::upload_task(std::vector<std::unique_ptr<OutputType>>& tasks) {
auto&& task_id = task->id;
auto&& sim_log = task->get_content().first;
auto&& sim_error = task->get_content().second;
auto&& output_token_timings = task->get_output_token_timings();
auto&& output_tokens = task->output_tokens;
auto log_size = sim_log.size() + sim_error.size();
auto upload_results_lambda = [] (pqxx::work* txn, std::vector<std::unique_ptr<OutputType>>* tasks) {
const auto&& fault_flags = build_fault_flags(task);
pqxx::stream_to stream = pqxx::stream_to::table(*txn, {"sim_outputs"}, {"artifact","source_pass", "sim_config", "output_tokens", "output_token_timings", "fault_flags", "part_status"});
// make sure any task that is uploaded isn't halted in the database
auto task_upload_lambda = [ task_id,
sim_log,
sim_error,
output_tokens,
output_token_timings,
fault_flags,
log_size
](
pqxx::work *txn
) {
txn->exec(
"UPDATE sim_outputs SET "
" sim_log = $1, "
" error_log = $2, "
" output_tokens = $3, "
" output_token_timings = $4, "
" fault_flags = $5, "
" log_size = $6, "
" part_status = 'finished' "
"WHERE id = $7 AND part_status != 'halted';",
{sim_log,
sim_error,
output_tokens,
output_token_timings,
fault_flags,
log_size,
task_id}
);
for (auto&& task : (*tasks)) {
stream.write_values(task->target_artifact, task->source_pass, task->source_config, task->output_tokens, task->get_output_token_timings(), build_fault_flags(task), "finished");
}
stream.complete();
};
std::function<void(pqxx::work*)> task_upload_func = task_upload_lambda;
std::function<void(pqxx::work*, std::vector<std::unique_ptr<OutputType>>*)> task_upload_func = upload_results_lambda;
DEBUG_PRINT("Updating task " + db::to_string(task->id));
DEBUG_PRINT("Updating tasks");
return this->conn->send_request(&task_upload_func);
return this->conn->send_request(&task_upload_func, &tasks);
}
std::string Uploader::build_fault_flags(const std::unique_ptr<OutputType>& task) {
std::string build_fault_flags(const std::unique_ptr<OutputType>& task) {
// bit mask for faults is
// 0: timing

View file

@ -125,7 +125,7 @@ bool Connection::prepare_statement(std::string name, std::string statement) {
try {
this->c->prepare(name, statement);
} catch (const exception &e) {
cerr << "Error: Could not prepare statements!" << endl;
cerr << "Error: Could not prepare statements! " << e.what() << endl;
return false;
}