optimize dbase interaction
This commit is contained in:
parent
dbe9bf9a9e
commit
25b4684669
6 changed files with 189 additions and 170 deletions
|
|
@ -30,7 +30,7 @@
|
|||
|
||||
#define DATABASE_VERSION 3
|
||||
|
||||
#define DOWNLOAD_BUFFER 5
|
||||
#define DOWNLOAD_BUFFER 200
|
||||
|
||||
int start_agent(db::db_credentials_t db_cred, size_t worker_processes);
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
#include "agent_artifact.hpp"
|
||||
|
||||
/**
|
||||
|
|
@ -58,6 +59,7 @@ class TaskInterface {
|
|||
void notify_download_halt();
|
||||
|
||||
void push_fresh(std::unique_ptr<InputType> task);
|
||||
void push_fresh(std::vector<std::unique_ptr<InputType>>& tasks);
|
||||
std::unique_ptr<InputType> pop_fresh(bool& empty);
|
||||
void push_finished(std::unique_ptr<OutputType> task);
|
||||
std::unique_ptr<OutputType> pop_finished(bool& empty);
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@
|
|||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <csignal>
|
||||
#include <atomic>
|
||||
#include <cluster/db_types.hpp>
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@
|
|||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <cluster/artifact.hpp>
|
||||
#include <filesystem>
|
||||
|
|
@ -32,7 +34,12 @@
|
|||
#include <cstdlib>
|
||||
#include <pqxx/pqxx>
|
||||
#include <functional>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
#include "agent_artifact.hpp"
|
||||
#include "cluster/db_types.hpp"
|
||||
#include "pqxx/prepared_statement.hxx"
|
||||
#include "task_interface.hpp"
|
||||
#include "util.h"
|
||||
#include "downloader.hpp"
|
||||
|
||||
|
|
@ -101,167 +108,155 @@ void Downloader::thread_run() {
|
|||
|
||||
bool Downloader::fetch_tasks(size_t n) {
|
||||
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
|
||||
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
|
||||
// fetch a new task from the database
|
||||
auto fetch_task_lambda = [n](
|
||||
pqxx::work *txn,
|
||||
bool *task_avail,
|
||||
pl::testcase_t *testcase,
|
||||
db::uuid_t *target_artifact,
|
||||
db::uuid_t *source_pass,
|
||||
db::uuid_t *design,
|
||||
db::uuid_t *reference,
|
||||
db::uuid_t *source_config,
|
||||
db::uuid_t *id
|
||||
) {
|
||||
|
||||
// Fetch a new task by a couple rules:
|
||||
// 1.) It has not been started yet or the output row doesn't exist
|
||||
// 2.) Passes that are already in progress are preferred
|
||||
// 3.) New passes are started in the order they were added to the database
|
||||
// 4.) Passes are only started if all their dependencies are fulfilled
|
||||
auto res = txn->exec_params(
|
||||
"SELECT "
|
||||
" ap.design_file AS design, "
|
||||
" ap.top_proc, "
|
||||
" ap.outputs AS target_artifact, "
|
||||
" ap.id AS source_pass, "
|
||||
" sc.id AS source_config, "
|
||||
" sc.sim_commands, "
|
||||
" sc.has_reference AS reference, "
|
||||
" so.id AS id "
|
||||
"FROM "
|
||||
" actsim_passes ap "
|
||||
"JOIN "
|
||||
" sim_configs sc ON ap.sim_configs = sc.artifact "
|
||||
"LEFT JOIN "
|
||||
" sim_outputs so ON sc.id = so.sim_config "
|
||||
"JOIN "
|
||||
" jobs j ON ap.job = j.id "
|
||||
"WHERE "
|
||||
" (so.id IS NULL OR so.part_status = 'not_started') "
|
||||
" AND ap.id IN ( "
|
||||
" SELECT ap_dep.id "
|
||||
" FROM actsim_passes ap_dep "
|
||||
" JOIN passes p ON ap_dep.id = p.id "
|
||||
" WHERE NOT EXISTS ( "
|
||||
" SELECT 1 "
|
||||
" FROM passes dep "
|
||||
" WHERE dep.id = ANY(p.depends_on) "
|
||||
" AND dep.pass_status != 'finished' "
|
||||
" ) "
|
||||
" ) "
|
||||
" AND (sc.has_reference IS NULL OR "
|
||||
" EXISTS ("
|
||||
" SELECT 1 "
|
||||
" FROM sim_outputs out "
|
||||
" WHERE out.sim_config = sc.has_reference "
|
||||
" AND out.part_status = 'finished' "
|
||||
" ) "
|
||||
" ) "
|
||||
"ORDER BY "
|
||||
" ap.pass_status = 'in_progress' DESC, "
|
||||
" j.time_added ASC "
|
||||
"LIMIT $1;",
|
||||
n
|
||||
);
|
||||
// fetch a new task from the database
|
||||
auto fetch_task_lambda = [n](
|
||||
pqxx::work *txn,
|
||||
bool *task_avail,
|
||||
std::unordered_set<db::uuid_t> *references,
|
||||
std::vector<std::unique_ptr<DBSimConfigArtifact>> *tasks
|
||||
) {
|
||||
|
||||
// seems like there is nothing to do right now
|
||||
if (res.size() < 1) {
|
||||
*task_avail = false;
|
||||
return;
|
||||
}
|
||||
// Fetch a new task by a couple rules:
|
||||
// 1.) It has not been started yet or the output row doesn't exist
|
||||
// 2.) Passes that are already in progress are preferred
|
||||
// 3.) New passes are started in the order they were added to the database
|
||||
// 4.) Passes are only started if all their dependencies are fulfilled
|
||||
auto res = txn->exec(
|
||||
"SELECT "
|
||||
" ap.design_file AS design, "
|
||||
" ap.top_proc, "
|
||||
" ap.outputs AS target_artifact, "
|
||||
" ap.id AS source_pass, "
|
||||
" sc.id AS source_config, "
|
||||
" sc.sim_commands, "
|
||||
" sc.has_reference AS reference, "
|
||||
" so.id AS id "
|
||||
"FROM "
|
||||
" actsim_passes ap "
|
||||
"JOIN "
|
||||
" sim_configs sc ON ap.sim_configs = sc.artifact "
|
||||
"LEFT JOIN "
|
||||
" sim_outputs so ON sc.id = so.sim_config "
|
||||
"JOIN "
|
||||
" jobs j ON ap.job = j.id "
|
||||
"WHERE "
|
||||
" (so.id IS NULL OR so.part_status = 'not_started') "
|
||||
" AND ap.id IN ( "
|
||||
" SELECT ap_dep.id "
|
||||
" FROM actsim_passes ap_dep "
|
||||
" JOIN passes p ON ap_dep.id = p.id "
|
||||
" WHERE NOT EXISTS ( "
|
||||
" SELECT 1 "
|
||||
" FROM passes dep "
|
||||
" WHERE dep.id = ANY(p.depends_on) "
|
||||
" AND dep.pass_status != 'finished' "
|
||||
" ) "
|
||||
" ) "
|
||||
" AND (sc.has_reference IS NULL OR "
|
||||
" EXISTS ("
|
||||
" SELECT 1 "
|
||||
" FROM sim_outputs out "
|
||||
" WHERE out.sim_config = sc.has_reference "
|
||||
" AND out.part_status = 'finished' "
|
||||
" ) "
|
||||
" ) "
|
||||
// "ORDER BY "
|
||||
// " ap.pass_status = 'in_progress' DESC, "
|
||||
// " j.time_added ASC "
|
||||
"LIMIT $1;",
|
||||
n
|
||||
);
|
||||
|
||||
auto row = res[0];
|
||||
// seems like there is nothing to do right now
|
||||
if (res.size() < 1) {
|
||||
*task_avail = false;
|
||||
return;
|
||||
}
|
||||
|
||||
*task_avail = true;
|
||||
|
||||
// generate the output rows
|
||||
std::string sql = R"(
|
||||
INSERT INTO sim_outputs (artifact, source_pass, sim_config)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (sim_config)
|
||||
DO UPDATE SET part_status = 'in_progress'
|
||||
RETURNING id;
|
||||
)";
|
||||
|
||||
txn->conn().prepare("insert_output_rows", sql);
|
||||
|
||||
for (auto row : res) {
|
||||
auto target_artifact = row["target_artifact"].as<db::uuid_t>();
|
||||
auto source_pass = row["source_pass"].as<db::uuid_t>();
|
||||
auto design = row["design"].as<db::uuid_t>();
|
||||
|
||||
db::uuid_t reference;
|
||||
|
||||
// we got something back, sort the data
|
||||
*task_avail = true;
|
||||
*target_artifact = row["target_artifact"].as<db::uuid_t>();
|
||||
*source_pass = row["source_pass"].as<db::uuid_t>();
|
||||
*design = row["design"].as<db::uuid_t>();
|
||||
if (!row["reference"].is_null()) {
|
||||
*reference = row["reference"].as<db::uuid_t>();
|
||||
} else {
|
||||
*reference = db::uuid_t();
|
||||
reference = row["reference"].as<db::uuid_t>();
|
||||
(*references).emplace(reference);
|
||||
}
|
||||
*source_config = row["source_config"].as<db::uuid_t>();
|
||||
|
||||
auto source_config = row["source_config"].as<db::uuid_t>();
|
||||
|
||||
auto com_arr = row["sim_commands"].as_sql_array<std::string>();
|
||||
std::vector<std::string> commands(com_arr.cbegin(), com_arr.cend());
|
||||
|
||||
*testcase = {
|
||||
pl::testcase_t testcase = {
|
||||
commands,
|
||||
row["top_proc"].as<std::string>(),
|
||||
false
|
||||
};
|
||||
|
||||
if (row["id"].is_null()) {
|
||||
// create a new sim output row in the database
|
||||
pqxx::row new_row = txn->exec_params1(
|
||||
"INSERT INTO sim_outputs (artifact, source_pass, sim_config) "
|
||||
" VALUES ($1, $2, $3) "
|
||||
" RETURNING id;",
|
||||
*target_artifact,
|
||||
*source_pass,
|
||||
*source_config
|
||||
);
|
||||
// get the id
|
||||
auto id = txn->exec(pqxx::prepped{"insert_output_rows"}, {target_artifact, source_pass, source_config})[0]["id"].as<db::uuid_t>();
|
||||
|
||||
// and get the auto-generated ID
|
||||
*id = new_row["id"].as<db::uuid_t>();
|
||||
} else {
|
||||
// get the ID
|
||||
*id = row["id"].as<db::uuid_t>();
|
||||
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, reference, source_config);
|
||||
task->add_testcase(testcase);
|
||||
|
||||
// and since the sim output row already exists, update its status
|
||||
txn->exec_params0(
|
||||
"UPDATE sim_outputs SET part_status = 'in_progress' WHERE id = $1;",
|
||||
*id
|
||||
);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
std::function<void(pqxx::work*, bool*, pl::testcase_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*)> fetch_task_func = fetch_task_lambda;
|
||||
|
||||
bool task_avail;
|
||||
pl::testcase_t testcase;
|
||||
db::uuid_t target_artifact;
|
||||
db::uuid_t source_pass;
|
||||
db::uuid_t design;
|
||||
db::uuid_t reference;
|
||||
db::uuid_t source_config;
|
||||
db::uuid_t id;
|
||||
|
||||
if (!this->conn->send_request(
|
||||
&fetch_task_func,
|
||||
&task_avail,
|
||||
&testcase,
|
||||
&target_artifact,
|
||||
&source_pass,
|
||||
&design,
|
||||
&reference,
|
||||
&source_config,
|
||||
&id
|
||||
)) {
|
||||
// if we lost connection, there's nothing else we can really do
|
||||
std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl;
|
||||
this->interface.stop();
|
||||
this->interface.stop_immediately();
|
||||
(*tasks).emplace_back(std::move(task));
|
||||
}
|
||||
|
||||
// seems like there are no tasks to do right now; tell the calling function
|
||||
// that there is still space in the buffer and we should just wait a while
|
||||
if (!task_avail) {
|
||||
DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!");
|
||||
return false;
|
||||
}
|
||||
txn->conn().unprepare("insert_output_rows");
|
||||
|
||||
DEBUG_PRINT("Fetched task with id " + db::to_string(id) + ", stemming from pass " + db::to_string(source_pass) + ", outputting to artifact " + db::to_string(target_artifact));
|
||||
DEBUG_PRINT("Design used is " + db::to_string(design) + ", simulation config " + db::to_string(source_config));
|
||||
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, reference, source_config);
|
||||
task->add_testcase(testcase);
|
||||
};
|
||||
|
||||
std::function<void(
|
||||
pqxx::work *txn,
|
||||
bool *task_avail,
|
||||
std::unordered_set<db::uuid_t> *references,
|
||||
std::vector<std::unique_ptr<DBSimConfigArtifact>> *tasks
|
||||
)> fetch_task_func = fetch_task_lambda;
|
||||
|
||||
|
||||
std::unordered_set<db::uuid_t> references;
|
||||
bool task_avail;
|
||||
std::vector<std::unique_ptr<DBSimConfigArtifact>> tasks;
|
||||
|
||||
if (!this->conn->send_request(
|
||||
&fetch_task_func,
|
||||
&task_avail,
|
||||
&references,
|
||||
&tasks
|
||||
)) {
|
||||
// if we lost connection, there's nothing else we can really do
|
||||
std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl;
|
||||
this->interface.stop();
|
||||
this->interface.stop_immediately();
|
||||
}
|
||||
|
||||
// seems like there are no tasks to do right now; tell the calling function
|
||||
// that there is still space in the buffer and we should just wait a while
|
||||
if (!task_avail) {
|
||||
DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!");
|
||||
return false;
|
||||
}
|
||||
|
||||
for (auto&& task : tasks) {
|
||||
|
||||
auto design = task->design;
|
||||
|
||||
// see if we already have the design locally; if not, load it
|
||||
if (!this->interface.increment_design(design)) {
|
||||
|
|
@ -278,32 +273,31 @@ bool Downloader::fetch_tasks(size_t n) {
|
|||
|
||||
this->interface.store_design(design, design_path);
|
||||
}
|
||||
|
||||
// if we have a reference for this run, we have to see if it is loaded
|
||||
if (reference != 0) {
|
||||
|
||||
// see if we already have the reference run locally; if not, load it
|
||||
if (!this->interface.increment_reference(reference)) {
|
||||
|
||||
DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference));
|
||||
std::shared_ptr<pl::SimOutputArtifact> reference_run;
|
||||
|
||||
// if we could not load the reference run, reopen the task in the database
|
||||
if ((reference_run = this->fetch_reference_run(reference)) == nullptr) {
|
||||
std::cerr << "Error: Could not load reference run for task " << task->id << ", reopening it." << std::endl;
|
||||
this->reopen_task(task->id, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
this->interface.store_reference(reference, reference_run);
|
||||
}
|
||||
}
|
||||
|
||||
// push the task to the list of open tasks
|
||||
this->interface.push_fresh(std::move(task));
|
||||
|
||||
}
|
||||
|
||||
// if we have a reference for this run, we have to see if it is loaded
|
||||
for (auto&& reference : references) {
|
||||
|
||||
// see if we already have the reference run locally; if not, load it
|
||||
if (!this->interface.increment_reference(reference)) {
|
||||
|
||||
DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference));
|
||||
std::shared_ptr<pl::SimOutputArtifact> reference_run;
|
||||
|
||||
// if we could not load the reference run, reopen the task in the database
|
||||
if ((reference_run = this->fetch_reference_run(reference)) == nullptr) {
|
||||
std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl;
|
||||
// this->reopen_task(task->id, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
this->interface.store_reference(reference, reference_run);
|
||||
}
|
||||
}
|
||||
|
||||
// push the task to the list of open tasks
|
||||
this->interface.push_fresh(tasks);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -313,7 +307,7 @@ bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
|
|||
|
||||
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design, bool *found) {
|
||||
try {
|
||||
auto res = txn->exec_params1("SELECT content FROM act_files WHERE artifact = $1;", *design_id);
|
||||
auto res = txn->exec("SELECT content FROM act_files WHERE artifact = $1;", *design_id)[0];
|
||||
// load design into string variable
|
||||
*design = res["content"].as<std::string>();
|
||||
*found = true;
|
||||
|
|
@ -381,7 +375,7 @@ std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db:
|
|||
bool *found
|
||||
) {
|
||||
try {
|
||||
auto res = txn->exec_params1("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id);
|
||||
auto res = txn->exec("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id)[0];
|
||||
|
||||
// load the output token timings
|
||||
auto arr_ott = res["output_token_timings"].as_sql_array<uint32_t>();
|
||||
|
|
@ -423,6 +417,11 @@ std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db:
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
std::cout << "Output tokens:" << std::endl;
|
||||
for (auto&& token: output_token_timings) {
|
||||
std::cout << token << std::endl;
|
||||
}
|
||||
|
||||
auto reference = std::make_shared<pl::SimOutputArtifact>();
|
||||
reference->set_output_token_timings(output_token_timings);
|
||||
reference->set_output_tokens(output_tokens);
|
||||
|
|
@ -445,7 +444,7 @@ void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
|
|||
|
||||
// open up the status of this partial output in the database again
|
||||
auto task_reopen_lambda = [id, status](pqxx::work *txn) {
|
||||
txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", id, status);
|
||||
txn->exec("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status});
|
||||
};
|
||||
|
||||
std::function<void(pqxx::work*)> task_reopen_func = task_reopen_lambda;
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@
|
|||
|
||||
#include "util.h"
|
||||
#include <cstdio>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "task_interface.hpp"
|
||||
|
||||
TaskInterface::TaskInterface(size_t buffer_size) {
|
||||
|
|
@ -52,6 +54,22 @@ void TaskInterface::push_fresh(std::unique_ptr<InputType> task) {
|
|||
this->fresh_queue_empty_condition.notify_one();
|
||||
}
|
||||
|
||||
void TaskInterface::push_fresh(std::vector<std::unique_ptr<InputType>>& tasks) {
|
||||
|
||||
DEBUG_PRINT("New task in the queue!");
|
||||
|
||||
// lock the queue and insert into it
|
||||
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
|
||||
|
||||
while (!tasks.empty()) {
|
||||
this->fresh_queue.push(std::move(tasks.front()));
|
||||
tasks.erase(tasks.begin());
|
||||
}
|
||||
|
||||
// we put one task in there so we notify one worker thread
|
||||
this->fresh_queue_empty_condition.notify_one();
|
||||
}
|
||||
|
||||
bool TaskInterface::fresh_queue_empty() {
|
||||
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
|
||||
return this->fresh_queue.empty();
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@
|
|||
**************************************************************************
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <pqxx/pqxx>
|
||||
#include <functional>
|
||||
|
|
|
|||
Loading…
Reference in a new issue