actsim-cluster-agent/include/db_lib/db_client.hpp

142 lines
4.4 KiB
C++

/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DB_CLIENT_H__
#define __DB_CLIENT_H__
#include <cstdint>
#include <iostream>
#include <pqxx/pqxx>
#include "db_types.hpp"
#include "task.hpp"
namespace db {
using namespace std;
// some database constants
#define MAX_CON_RETRIES 5
class Connection {
public:
Connection(
db_credentials_t& credentials,
std::function<void(pqxx::connection *c)> setup_function
);
Connection(
db_credentials_t& credentials
);
~Connection();
pqxx::connection* get_connection() { return c; };
void setup() {
setup_function(this->c);
}
bool connect();
void disconnect();
bool ensure_connected();
int find_job(std::string p_name, std::string *f_name);
JobStatusType get_job_status(std::string job);
JobStatusType get_task_status(db::uuid_t);
bool prepare_statements(vector<pair<string, string>> statements);
bool prepare_statement(std::string name, std::string statement);
bool unprepare_statement(std::string name);
// Send request takes an arbitrary request method to the database and wraps error handling
// around it. It takes a function with an arbitrary set of arguments and the arguments to give
// that function and returns whatever the function returns.
// Careful with typing here! C++ doesn't quite do the strictest typing once you go deep like this!
// The function is defined here as it can be accessed from outside this library and is templated.
// This means it must be accessible to the compiler at all times. Failing this will cause a linker error.
template<typename... Args>
bool send_request(std::function<void(pqxx::work*, Args...)> *db_function, Args... args) {
retry_send:
if (!ensure_connected()) {
cerr << "Could contact database. Broken database connection." << endl;
return false;
}
try {
pqxx::work txn(*c);
try {
// we assume the lambda takes a pointer to a transaction object as first argument
// and whatever else was given to us as the rest
(*db_function)(&txn, args...);
// commit the task to the database
txn.commit();
} catch (const exception& e) {
// if something happened during the transaction, we roll it back
txn.abort();
cerr << "Could not complete database operation. Error in execution." << endl;
cerr << e.what() << endl;
return false;
}
} catch (const pqxx::broken_connection& e) {
cerr << "Pipe broke during database operation... Retrying..." << endl;
cerr << e.what() << endl;
// using a goto here since it'll gracefully retry connection
goto retry_send;
}
return true;
};
private:
db_credentials_t db_credentials;
pqxx::connection *c;
std::function<void(pqxx::connection *c)> setup_function;
bool connected;
};
const vector<pair<string, string>> uplink_statements = {
{"commit_task", "UPDATE tasks SET t_status = DONE, error = $2, sim_log = $3, sim_trace = $4 WHERE id = $1"}
};
const vector<pair<string, string>> downlink_statements = {
{"check_open_tasks", "SELECT id FROM tasks WHERE t_status=OPEN LIMIT 1"},
// TODO add inner join to grab pipeline depth
{"fetch_task", "SELECT id, job, t_status, t_type, is_reference, reference, max_plf, error, sim_log FROM tasks WHERE id = $1 LIMIT 1"}
};
};
#endif