/************************************************************************* * * Copyright (c) 2023 Fabian Posch * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. * ************************************************************************** */ #ifndef __DB_CLIENT_H__ #define __DB_CLIENT_H__ #include #include #include #include "db_types.hpp" #include "task.hpp" namespace db { using namespace std; // some database constants #define MAX_CON_RETRIES 5 class Connection { public: Connection( db_credentials_t& credentials, std::function setup_function ); Connection( db_credentials_t& credentials ); ~Connection(); pqxx::connection* get_connection() { return c; }; void setup() { setup_function(this->c); } bool connect(); void disconnect(); bool ensure_connected(); int find_job(std::string p_name, std::string *f_name); JobStatusType get_job_status(std::string job); JobStatusType get_task_status(db::uuid_t); bool prepare_statements(vector> statements); bool prepare_statement(std::string name, std::string statement); bool unprepare_statement(std::string name); // Send request takes an arbitrary request method to the database and wraps error handling // around it. It takes a function with an arbitrary set of arguments and the arguments to give // that function and returns whatever the function returns. // Careful with typing here! C++ doesn't quite do the strictest typing once you go deep like this! // The function is defined here as it can be accessed from outside this library and is templated. // This means it must be accessible to the compiler at all times. Failing this will cause a linker error. template bool send_request(std::function *db_function, Args... args) { retry_send: if (!ensure_connected()) { cerr << "Could contact database. Broken database connection." << endl; return false; } try { pqxx::work txn(*c); try { // we assume the lambda takes a pointer to a transaction object as first argument // and whatever else was given to us as the rest (*db_function)(&txn, args...); // commit the task to the database txn.commit(); } catch (const exception& e) { // if something happened during the transaction, we roll it back txn.abort(); cerr << "Could not complete database operation. Error in execution." << endl; cerr << e.what() << endl; return false; } } catch (const pqxx::broken_connection& e) { cerr << "Pipe broke during database operation... Retrying..." << endl; cerr << e.what() << endl; // using a goto here since it'll gracefully retry connection goto retry_send; } return true; }; private: db_credentials_t db_credentials; pqxx::connection *c; std::function setup_function; bool connected; }; const vector> uplink_statements = { {"commit_task", "UPDATE tasks SET t_status = DONE, error = $2, sim_log = $3, sim_trace = $4 WHERE id = $1"} }; const vector> downlink_statements = { {"check_open_tasks", "SELECT id FROM tasks WHERE t_status=OPEN LIMIT 1"}, // TODO add inner join to grab pipeline depth {"fetch_task", "SELECT id, job, t_status, t_type, is_reference, reference, max_plf, error, sim_log FROM tasks WHERE id = $1 LIMIT 1"} }; }; #endif