From 9b1bd73df1a0552cba1df108037561d9d20bd02e Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Fri, 24 Jan 2025 12:44:45 +0100 Subject: [PATCH] merge with cluster lib because --- .gitmodules | 3 + CMakeLists.txt | 6 +- deps/libpqxx | 1 + include/cluster/artifact.hpp | 396 ++++++++++++++++++++++++++++++++ include/cluster/db_artifact.hpp | 96 ++++++++ include/cluster/db_client.hpp | 131 +++++++++++ include/cluster/db_types.hpp | 349 ++++++++++++++++++++++++++++ src/CMakeLists.txt | 3 +- src/actsim_agent/CMakeLists.txt | 3 +- src/cluster/CMakeLists.txt | 14 ++ src/cluster/db_artifact.cpp | 38 +++ src/cluster/db_client.cpp | 238 +++++++++++++++++++ 12 files changed, 1272 insertions(+), 6 deletions(-) create mode 100644 .gitmodules create mode 160000 deps/libpqxx create mode 100644 include/cluster/artifact.hpp create mode 100644 include/cluster/db_artifact.hpp create mode 100644 include/cluster/db_client.hpp create mode 100644 include/cluster/db_types.hpp create mode 100644 src/cluster/CMakeLists.txt create mode 100644 src/cluster/db_artifact.cpp create mode 100644 src/cluster/db_client.cpp diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..b93b538 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "deps/libpqxx"] + path = deps/libpqxx + url = https://github.com/jtv/libpqxx diff --git a/CMakeLists.txt b/CMakeLists.txt index 7b38a49..7ddd286 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,10 +57,8 @@ include_directories(./) #add_library(ActCommon SHARED IMPORTED) #set_target_properties(ActCommon PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libvlsilib.a) -# Include the cluster librar -add_library(act-cluster-lib SHARED IMPORTED) -set_target_properties(act-cluster-lib PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libact_cluster.so) - +# Add pqxx +add_subdirectory(deps/libpqxx build-pqxx EXCLUDE_FROM_ALL) # ATTENTION # The main executable is defined in the CMakeLists.txt in the ./src folder. diff --git a/deps/libpqxx b/deps/libpqxx new file mode 160000 index 0000000..6af956b --- /dev/null +++ b/deps/libpqxx @@ -0,0 +1 @@ +Subproject commit 6af956bb48167ed1e71cc0f0c2d8c8fdc99151a3 diff --git a/include/cluster/artifact.hpp b/include/cluster/artifact.hpp new file mode 100644 index 0000000..4e9ae8f --- /dev/null +++ b/include/cluster/artifact.hpp @@ -0,0 +1,396 @@ + +/************************************************************************* + * + * Copyright (c) 2023 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#ifndef __PIPELINE_ARTIFACT_H__ +#define __PIPELINE_ARTIFACT_H__ + +#include +#include +#include +#include +#include +#include + +namespace pl { + +/** + * @brief Representation of different artifact types + */ +enum class ArtifactType {UNKNOWN, ACT, SIM_CONFIG, SIGLIST, SIM_OUTPUT}; +static std::unordered_map art_type_str = { + {"unknown", ArtifactType::UNKNOWN}, + {"act", ArtifactType::ACT}, + {"testcases", ArtifactType::SIM_CONFIG}, + {"sig_list", ArtifactType::SIGLIST}, + {"sim_output", ArtifactType::SIM_OUTPUT} +}; + +inline std::string to_string(ArtifactType const &value) { + std::string str = "unknown"; + + for (auto& it : art_type_str) { + if (it.second == value) { + str = it.first; + break; + } + } + + return str; +}; + +inline std::ostream& operator<<(std::ostream& os, ArtifactType const &rhs) { + os << to_string(rhs); + return os; +}; + +/** List of artifact names as well as types */ +typedef std::vector> artifact_list; + + +/** + * @brief An artifact is a data point which can be consumed or produced by a pipeline module + */ +class Artifact { + + public: + + /** + * @brief Construct a new blank Artifact + * + */ + Artifact() = default; + + /** + * @brief Get the type of the artifact the object is holding. + * + * Substitute for getting the object type + * + * @return ArtifactType + */ + virtual ArtifactType get_type() { return ArtifactType::UNKNOWN; }; + + + /** + * @brief Get the content of the artifact + * + * @tparam T Content type the artifact is holding + * @return T Content of the artifact + */ + template + T get_content(); + + + /** + * @brief Get the number of elements in this artifact + * + * This is mostly relevant when uploading an artifact to the database cluster. + * + * @return long The number of elements in this artifact + */ + virtual long get_size() = 0; + + + /** + * @brief Inform a tool whether this artifact can be handled by multiple nodes + * + * By default, one artifact should only be handled once. However, there might be + * situations where multiple nodes can handle a single artifact. This could for example + * be a situation where a simulation for a design has a coverage model for constrained + * random testing. In this case we want to cover everything as fast as possible and + * simulation can be distributed. + * + * @return true The artifact can be handled by multiple nodes + * @return false The artifact cannot be handled by multiple nodes + */ + bool parallelizable() { return false; }; +}; + + +/** + * @brief ACT design artifact + */ +class ActArtifact: public Artifact { + public: + + /** + * @brief Construct a new ACT design artifact + * + * @param design The design object the artifact should point to + */ + ActArtifact(std::shared_ptr design) { this->design = design; }; + + /** + * @brief Get the content of the artifact + * + * @return std::shared_ptr Pointer to the artifact's design file. + */ + std::shared_ptr get_content() { return design; }; + + + /** + * @brief Returns the type of this artifact, which is ACT. + * + * @return ArtifactType Will return the ACT artifact type. + */ + ArtifactType get_type() override { return ArtifactType::ACT; }; + + + /** + * @brief Get the number of elements in this artifact, which is 1 per definition + * + * @return long + */ + long get_size() override { return 1; }; + + private: + + std::shared_ptr design; + +}; + + +/** + * @brief Struct to capture one testcase. + */ +struct testcase_t { + /** Commands to be executed for this testcase */ + std::vector commands; + /** Top process for the simulation */ + std::string top; + /** This simulation should be run by multiple noes */ + bool parallelizable; +}; + +/** + * @brief Artifact containing one or more configurations for actsim + */ +class SimConfigArtifact: public Artifact { + public: + + SimConfigArtifact() = default; + SimConfigArtifact(bool has_reference) { this->has_reference_ = has_reference; }; + + /** + * @brief Get the content of the artifact + * + * @return std::vector Vector of all generated testcase structures + */ + std::vector& get_content() { return testcases; }; + + + /** + * @brief Add a testcase to the artifact + * + * @param testcase + */ + inline void add_testcase(testcase_t testcase) { this->testcases.emplace_back(testcase); }; + + + /** + * @brief Returns the type of this artifact, which is SIM_CONFIG. + * + * @return ArtifactType Will return the SIM_CONFIG artifact type. + */ + ArtifactType get_type() override { return ArtifactType::SIM_CONFIG; }; + + + /** + * @brief Get the number of testcases stored in this artifact + * + * @return long The size of the testcase vector + */ + long get_size() override { return testcases.size(); }; + + const bool& has_reference = has_reference_; + + /** + * @brief Can this simulation be handled by multiple nodes + * + * By default, one artifact should only be handled once. However, there might be + * situations where multiple nodes can handle a single artifact. This could for example + * be a situation where a simulation for a design has a coverage model for constrained + * random testing. In this case we want to cover everything as fast as possible and + * simulation can be distributed. + * + * @return true The simulation can be handled by multiple nodes + * @return false The simulation cannot be handled by multiple nodes + */ + bool parallelizable() { + if (this->testcases.size() != 1) return false; + return this->testcases.front().parallelizable; + }; + + private: + + bool has_reference_; + std::vector testcases; +}; + +/** Reference to a signal in an ACT design */ +typedef std::string signal_t; + +/** + * @brief Artifact containing a list of signals + */ +class SignalListArtifact: public Artifact { + public: + + /** + * @brief Get the content of the artifact + * + * @return std::vector Vector of all captured signals structures + */ + std::vector get_content() { return signals; }; + + + /** + * @brief Add a signal to the artifact + * + * @param testcase + */ + inline void add_signal(signal_t signal) { this->signals.emplace_back(signal); }; + + + /** + * @brief Returns the type of this artifact, which is SIGLIST. + * + * @return ArtifactType Will return the SIGLIST artifact type. + */ + ArtifactType get_type() override { return ArtifactType::SIGLIST; }; + + + /** + * @brief Get the number of signals stored in this artifact + * + * @return long The size of the signals vector + */ + long get_size() override { return signals.size(); }; + + private: + + std::vector signals; +}; + +/** + * @brief Artifact containing simulator output from actsim + */ +class SimOutputArtifact: public Artifact { + public: + + SimOutputArtifact() {}; + SimOutputArtifact(const std::vector& sim_log, const std::vector& error_log) { + this->sim_log = sim_log; + this->sim_err_log = error_log; + has_content_ = true; + }; + + /** + * @brief Get the content of the artifact + * + * @return std::vector Vector of all generated simulator output lines + */ + inline std::pair&, std::vector&> get_content() { return {sim_log, sim_err_log}; }; + inline std::vector& get_output_token_timings() { return output_token_timings; }; + inline void set_output_token_timings(std::vector timings) { this->output_token_timings = timings; }; + + /** + * @brief Add a log line to the artifact + * + * @param log_line + */ + inline void add_log_output(std::string log_line) { this->sim_log.emplace_back(log_line); has_content_ = true; }; + + + /** + * @brief Add a error log line to the artifact + * + * @param log_line + */ + inline void add_err_output(std::string log_line) { this->sim_err_log.emplace_back(log_line); has_content_ = true; }; + + inline void clear_logs() { + // save the size + size_ = this->sim_err_log.size() + this->sim_log.size(); + + this->sim_err_log.clear(); + this->sim_log.clear(); + has_content_ = false; + }; + + inline void add_output_token_timing(uint32_t timing) { this->output_token_timings.emplace_back(timing); }; + + + /** + * @brief Returns the type of this artifact, which is SIM_OUTPUT. + * + * @return ArtifactType Will return the SIM_OUTPUT artifact type. + */ + inline ArtifactType get_type() override { return ArtifactType::SIM_OUTPUT; }; + + + /** + * @brief Get the number of log lines stored in this artifact + * + * @return long The size of the log lines vector + */ + inline long get_size() override { if (has_content_) {return sim_log.size();} else {return size_;} }; + inline void set_size(long size) { if (!has_content_) this->size_ = size; }; + + const bool& fault_timing_deviation = fault_timing_deviation_; + const bool& fault_value = fault_value_; + const bool& fault_coding = fault_coding_; + const bool& fault_glitch = fault_glitch_; + const bool& fault_deadlock = fault_deadlock_; + const bool& fault_token_count = fault_token_count_; + + const int& output_tokens = output_tokens_; + + inline void set_fault_timing_deviation (bool fault_timing_deviation) { this->fault_timing_deviation_ = fault_timing_deviation; }; + inline void set_fault_value (bool fault_value) { this->fault_value_ = fault_value; }; + inline void set_fault_coding (bool fault_coding) { this->fault_coding_ = fault_coding; }; + inline void set_fault_glitch (bool fault_glitch) { this->fault_glitch_ = fault_glitch; }; + inline void set_fault_deadlock (bool fault_deadlock) { this->fault_deadlock_ = fault_deadlock; }; + inline void set_fault_token_count (bool fault_token_count) { this->fault_token_count_ = fault_token_count; }; + + inline void set_output_tokens(int output_tokens) { this->output_tokens_ = output_tokens; }; + + private: + + std::vector sim_log; + std::vector sim_err_log; + std::vector output_token_timings; + bool fault_timing_deviation_ = false; + bool fault_value_ = false; + bool fault_coding_ = false; + bool fault_glitch_ = false; + bool fault_deadlock_ = false; + bool fault_token_count_ = false; + + int output_tokens_; + long size_ = 0; + bool has_content_ = false; +}; + +}; + +#endif diff --git a/include/cluster/db_artifact.hpp b/include/cluster/db_artifact.hpp new file mode 100644 index 0000000..0baffa7 --- /dev/null +++ b/include/cluster/db_artifact.hpp @@ -0,0 +1,96 @@ + +/************************************************************************* + * + * Copyright (c) 2023 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#ifndef __DB_ARTIFACT__ +#define __DB_ARTIFACT__ + +/** + * @brief Data used for a partial artifact from the database + * + * Don't let this confuse you. This is the additional data one would get when + * pulling a (partial) artifact from the database. + * + * Database information is intentionally kept out of the artifact library, since this + * is not needed for local artifact processing (especially in the deploy tool). + * + * Since an agent might have special requirements in regards to additional information, + * no specific implementation is provided by this library. + * + * Combinations of specific artifact types should therefor implemented locally. + * This is just the minimal cross-section of what an agent will have to deal with. + * + */ + +#include "db_types.hpp" + +namespace db { + +class DBArtifact { + + public: + + /** + * @brief Construct a new Artifact with a database UUID attached to it. + * + * This should be used when downloading the artifact from the database. + * This ID is actually required when an update in the database is required. + * + * @param id UUID of the artifact this belongs to in the database + * @param source_pass The pass in the database which is responsible for the generation of this (partial) artifact + * @param target_artifact The UUID of the artifact this part belongs to + */ + DBArtifact(const db::uuid_t& id, const db::uuid_t& source_pass, const db::uuid_t& target_artifact); + + /** + * @brief The ID of this object in the database + * + * The ID should be auto-generated when this task is fetched from the database. + * It is generated automatically by said database. + * + * The new partial artifact must be generated immediately to indicate someone + * working on this. + * + */ + const db::uuid_t& id = id_; + + /** + * @brief The ID of the pass in the database which created this object + */ + const db::uuid_t& source_pass = source_pass_; + + /** + * @brief The artifact, which this partial artifact is a part of + */ + const db::uuid_t& target_artifact = target_artifact_; + + protected: + + db::uuid_t id_; + db::uuid_t source_pass_; + db::uuid_t target_artifact_; + +}; + +}; + +#endif \ No newline at end of file diff --git a/include/cluster/db_client.hpp b/include/cluster/db_client.hpp new file mode 100644 index 0000000..02f05de --- /dev/null +++ b/include/cluster/db_client.hpp @@ -0,0 +1,131 @@ + +/************************************************************************* + * + * Copyright (c) 2023 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + + +#ifndef __DB_CLIENT_H__ +#define __DB_CLIENT_H__ + + +#include +#include +#include +#include "db_types.hpp" + +namespace db { + +using namespace std; + +// some database constants +#define MAX_CON_RETRIES 5 + +class Connection { + + public: + + Connection( + db_credentials_t& credentials, + std::function setup_function + ); + + Connection( + db_credentials_t& credentials + ); + + ~Connection(); + + pqxx::connection* get_connection() { return c; }; + void setup() { + setup_function(this->c); + } + + bool connect(); + void disconnect(); + bool ensure_connected(); + int find_job(std::string p_name, std::string *f_name); + JobStatusType get_job_status(std::string job); + JobStatusType get_task_status(db::uuid_t); + + bool prepare_statements(vector> statements); + bool prepare_statement(std::string name, std::string statement); + bool unprepare_statement(std::string name); + + // Send request takes an arbitrary request method to the database and wraps error handling + // around it. It takes a function with an arbitrary set of arguments and the arguments to give + // that function and returns whatever the function returns. + // Careful with typing here! C++ doesn't quite do the strictest typing once you go deep like this! + // The function is defined here as it can be accessed from outside this library and is templated. + // This means it must be accessible to the compiler at all times. Failing this will cause a linker error. + template + bool send_request(std::function *db_function, Args... args) { + + retry_send: + + if (!ensure_connected()) { + cerr << "Could contact database. Broken database connection." << endl; + return false; + } + + try { + + pqxx::work txn(*c); + + try { + // we assume the lambda takes a pointer to a transaction object as first argument + // and whatever else was given to us as the rest + (*db_function)(&txn, args...); + + // commit the task to the database + txn.commit(); + } catch (const exception& e) { + + // if something happened during the transaction, we roll it back + txn.abort(); + + cerr << "Could not complete database operation. Error in execution." << endl; + cerr << e.what() << endl; + + return false; + } + + } catch (const pqxx::broken_connection& e) { + cerr << "Pipe broke during database operation... Retrying..." << endl; + cerr << e.what() << endl; + + // using a goto here since it'll gracefully retry connection + goto retry_send; + } + + return true; + }; + + private: + + db_credentials_t db_credentials; + pqxx::connection *c; + std::function setup_function; + bool connected; +}; + +}; + +#endif diff --git a/include/cluster/db_types.hpp b/include/cluster/db_types.hpp new file mode 100644 index 0000000..5230217 --- /dev/null +++ b/include/cluster/db_types.hpp @@ -0,0 +1,349 @@ + +/************************************************************************* + * + * Copyright (c) 2023 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#ifndef __DB_TYPES_HPP__ +#define __DB_TYPES_HPP__ + +#include +#include + +namespace db { + +// credentials for database access +struct db_credentials_t { + std::string server = ""; + bool server_override = false; + int port = 0; + bool port_override = false; + std::string uname = ""; + bool uname_override = false; + std::string pwd = ""; + bool pwd_override = false; + std::string dbase = ""; + bool dbase_override = false; + int version; +}; + +// create UUID type to store database 128 UUID type +struct uuid_t { + uint64_t uuid_upper, uuid_lower; + + uuid_t& operator=(uint64_t other) { + this->uuid_lower = other; + this->uuid_upper = 0; + return *this; + } +}; + + +// override comparison operator to make our life easier +inline bool operator==(uuid_t const & lhs, uuid_t const & rhs) { + return lhs.uuid_upper == rhs.uuid_upper && lhs.uuid_lower == rhs.uuid_lower; +} +inline bool operator==(uuid_t const & lhs, int const & rhs) { + return rhs >= 0 && lhs.uuid_lower == (uint64_t)rhs; +} +inline bool operator==(int const & lhs, uuid_t const & rhs) { + return lhs >= 0 && (uint64_t)lhs == rhs.uuid_lower; +} +inline bool operator!=(uuid_t const & lhs, uuid_t const & rhs) { + return lhs.uuid_upper != rhs.uuid_upper || lhs.uuid_lower != rhs.uuid_lower; +} +inline bool operator!=(uuid_t const & lhs, int const & rhs) { + return rhs < 0 || lhs.uuid_lower != (uint64_t)rhs; +} +inline bool operator!=(int const & lhs, uuid_t const & rhs) { + return lhs < 0 || (uint64_t)lhs != rhs.uuid_lower; +} + + +// easier conversion to string +inline std::string to_string(const uuid_t& value) { + std::ostringstream uuid_s; + uuid_s << std::hex; + uuid_s << std::setfill('0') << std::setw(16) << value.uuid_upper; + uuid_s << std::setfill('0') << std::setw(16) << value.uuid_lower; + return uuid_s.str(); +} + +// stream operations +inline std::ostream& operator<<(std::ostream& os, const uuid_t& value) { + os << to_string(value); + return os; +}; + +/** + * @brief Representation of the current status of a job. + * + */ +enum class JobStatusType {NOT_STARTED, IN_PROGRESS, FINISHED, HALTED, UNKNOWN}; + +// string mapping for job status type enum +static std::unordered_map job_status_type_st { + {"not_started", JobStatusType::NOT_STARTED}, + {"in_progress", JobStatusType::IN_PROGRESS}, + {"finished", JobStatusType::FINISHED}, + {"halted", JobStatusType::HALTED}, + {"unknown", JobStatusType::UNKNOWN} +}; + +inline std::string to_string(JobStatusType const &value) { + std::string str = "unknown"; + + for (auto& it : job_status_type_st) { + if (it.second == value) { + str = it.first; + break; + } + } + + return str; +}; + +inline std::ostream& operator<<(std::ostream& os, JobStatusType const &rhs) { + os << to_string(rhs); + return os; +}; + +// macro for registering enum classes for PQXX + +template +struct is_enum_class { + static constexpr bool value = std::is_enum_v && !std::is_convertible_v; +}; + +template +struct is_unordered_map_string { + static constexpr bool value = false; +}; + +template +struct is_unordered_map_string> { + static constexpr bool value = true; +}; + +template +struct is_map_key_type_same_as_enum { + static constexpr bool value = std::is_same_v && + std::is_same_v; +}; + +}; + +// hash operator to be able use uuid_t as a hashmap key +template<> +struct std::hash { + std::size_t operator()(const db::uuid_t& k) const { + using std::size_t; + using std::hash; + + return hash()(hash()(k.uuid_upper) ^ hash()(k.uuid_lower)); + } +}; + + +/** + * @brief Generate type conversion for enum class macros in the libpqxx library + * + * Calling this macro will generate the code necessary for converting enum class types in yaml-cpp internally. + * For this you additionally need a std::unordered_map, mapping string values to your + * custom enum class type. + * + * For more information see https://libpqxx.readthedocs.io/en/7.4.1/a01360.html + * + * @param T The type you want to generate the translation for + * @param lookup_table the unordered map, mapping string values of your type to the internal representation + * + */ +#define DB_REGISTER_ENUM_CLASS(T, lookup_table, name) \ + namespace pqxx { \ + \ + static_assert(db::is_enum_class::value, "DB_REGISTER_ENUM_TYPE: 'T' must be an enum class"); \ + static_assert(db::is_unordered_map_string::value, "DB_REGISTER_ENUM_TYPE: 'lookup_table' must be std::unordered_map"); \ + static_assert(db::is_map_key_type_same_as_enum::value, "DB_REGISTER_ENUM_TYPE: 'lookup_table' content must be of type T"); \ + \ + \ + template<> \ + std::string const type_name{name}; \ + \ + template<> \ + struct nullness : pqxx::no_null {}; \ + \ + template<> \ + struct string_traits { \ + \ + static T from_string(std::string_view text) { \ + \ + /* turn the string_view into a string */ \ + std::string str(text); \ + \ + /* make sure the enum value exists */ \ + if (lookup_table.find(str) == lookup_table.end()) throw pqxx::conversion_error("Could not convert " + str + " to " + name); \ + \ + return lookup_table[str]; \ + }; \ + \ + static zview to_buf(char *begin, char *end, T const &value) { \ + std::string str = ""; \ + \ + for (auto& it : lookup_table) { \ + if (it.second == value) { \ + str = it.first; \ + break; \ + } \ + } \ + \ + /* make sure we actually have enough space for the enum */ \ + if (std::distance(begin, end) < static_cast(str.size() + 1)) { \ + std::ostringstream str_str; \ + str_str << "Buffer for "; \ + str_str << name; \ + str_str << " to small"; \ + throw conversion_overrun(str_str.str()); \ + }\ + \ + const char* c_str = str.c_str(); \ + \ + std::memcpy(begin, c_str, sizeof(char) * str.length()); \ + \ + return zview(begin, str.length()); \ + }; \ + \ + static char *into_buf(char *begin, char *end, T const &value) { \ + std::string str = ""; \ + \ + for (auto& it : lookup_table) { \ + if (it.second == value) { \ + str = it.first; \ + break; \ + } \ + } \ + \ + /* make sure we actually have enough space for the enum */ \ + if (std::distance(begin, end) < static_cast(str.size() + 1)) { \ + std::ostringstream str_str; \ + str_str << "Buffer for "; \ + str_str << name; \ + str_str << " to small"; \ + throw conversion_overrun(str_str.str()); \ + }\ + \ + const char* c_str = str.c_str(); \ + \ + std::memcpy(begin, c_str, sizeof(char) * str.length()); \ + \ + return begin; \ + }; \ + \ + static std::size_t size_buffer(T const &value) noexcept { \ + \ + std::string str; \ + str = ""; \ + \ + for (auto& it : lookup_table) { \ + if (it.second == value) { \ + str = it.first; \ + break; \ + } \ + } \ + \ + return str.size() + 1; \ + }; \ + }; \ + }; + + +/** + * @brief Conversion extension for uuid_t for the pqxx library + * + * For more information see https://libpqxx.readthedocs.io/en/7.4.1/a01360.html + * + */ +namespace pqxx { + +template<> +std::string const type_name{"uuid_t"}; + +template<> +struct nullness : pqxx::no_null {}; + +template<> +struct string_traits { + + static db::uuid_t from_string(std::string_view text) { + + if (text.size() != 36) { + throw pqxx::conversion_error("Invalid binary string size for uuid_t. Expected: 36; Actual: " + std::to_string(text.size())); + } + + // turn the string_view into a string and remove the '-' + std::string form(text); + form.erase(std::remove(form.begin(), form.end(), '-'), form.end()); + + // then parse into numbers + std::istringstream high_stream(form.substr(0, 16)); + std::istringstream low_stream(form.substr(16, 16)); + + uint64_t low, high; + + high_stream >> std::hex >> high; + low_stream >> std::hex >> low; + + return {high, low}; + }; + + static zview to_buf(char *begin, char *end, db::uuid_t const &value) { + std::string str = db::to_string(value); + + // make sure we actually have enough space for the UUID + if (std::distance(begin, end) < static_cast(str.size() + 1)) throw conversion_overrun("Buffer for UUID to small"); + + const char* c_str = str.c_str(); + + std::memcpy(begin, c_str, sizeof(char) * str.length()); + + return zview(begin, str.length()); + }; + + static char *into_buf(char *begin, char *end, db::uuid_t const &value) { + std::string str = db::to_string(value); + + // make sure we actually have enough space for the UUID + if (std::distance(begin, end) < static_cast(str.size() + 1)) throw conversion_overrun("Buffer for UUID to small"); + + const char* c_str = str.c_str(); + + std::memcpy(begin, c_str, sizeof(char) * str.length()); + + return begin; + }; + + constexpr static std::size_t size_buffer([[maybe_unused]] db::uuid_t const &value) noexcept { return sizeof(char) * 33; }; +}; + +}; + +DB_REGISTER_ENUM_CLASS(db::JobStatusType, db::job_status_type_st, "job_status_type"); + + +#endif \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ff39f5b..183fb12 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ include_directories(../include) include_directories(../include/actsim_agent) add_subdirectory(actsim_agent) +add_subdirectory(cluster) file( GLOB proj_SRC @@ -23,7 +24,7 @@ target_link_libraries( ${PROJECT_NAME} act-cluster-lib ${actsim_agent_lib} - -lpqxx -lpq + pqxx ) # specify install targets diff --git a/src/actsim_agent/CMakeLists.txt b/src/actsim_agent/CMakeLists.txt index 478a55b..10e398b 100644 --- a/src/actsim_agent/CMakeLists.txt +++ b/src/actsim_agent/CMakeLists.txt @@ -20,7 +20,8 @@ add_library( target_link_libraries( ${actsim_agent_lib} act-cluster-lib - -lpqxx -lpq -latomic + pqxx + -latomic ) # specify install targets diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt new file mode 100644 index 0000000..dd4110a --- /dev/null +++ b/src/cluster/CMakeLists.txt @@ -0,0 +1,14 @@ + +include_directories($ENV{ACT_HOME}/include) +include_directories(../include) + +file( + GLOB act_cluster_lib_SRC + "*.cpp" +) + +add_library( + act-cluster-lib + STATIC + ${act_cluster_lib_SRC} +) \ No newline at end of file diff --git a/src/cluster/db_artifact.cpp b/src/cluster/db_artifact.cpp new file mode 100644 index 0000000..baa5a00 --- /dev/null +++ b/src/cluster/db_artifact.cpp @@ -0,0 +1,38 @@ + +/************************************************************************* + * + * Copyright (c) 2023 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + +#include "db_artifact.hpp" + +namespace db { + +DBArtifact::DBArtifact( + const db::uuid_t& id, + const db::uuid_t& source_pass, + const db::uuid_t& target_artifact +) { + this->id_ = id; + this->source_pass_ = source_pass; + this->target_artifact_ = target_artifact; +} + +}; diff --git a/src/cluster/db_client.cpp b/src/cluster/db_client.cpp new file mode 100644 index 0000000..0ab83d1 --- /dev/null +++ b/src/cluster/db_client.cpp @@ -0,0 +1,238 @@ + +/************************************************************************* + * + * Copyright (c) 2023 Fabian Posch + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + ************************************************************************** + */ + + +#include +#include "util.h" +#include "db_client.hpp" + +namespace db { + +using namespace std; + +string hostname; + +bool connected = false; + + +Connection::Connection( + db_credentials_t& credentials, + std::function setup +) : setup_function(move(setup)) { + this->db_credentials = credentials; +} + +Connection::Connection(db_credentials_t& credentials) { + this->db_credentials = credentials; +} + +Connection::~Connection() { + if (c != nullptr && c->is_open()) c->close(); +} + +bool Connection::connect() { + bool connection_established = false; + + for (int i = 0; i < MAX_CON_RETRIES; i++) { + try { + // create the connection object + this->c = new pqxx::connection( + "host=" + db_credentials.server + " " + "port=" + std::to_string(db_credentials.port) + " " + "user=" + db_credentials.uname + " " + "password=" + db_credentials.pwd + " " + "dbname=" + db_credentials.dbase + ); + + // make sure the database actually has the version we need + pqxx::work txn(*(this->c)); + + auto db_info = txn.exec1("SELECT * FROM info;"); + txn.commit(); + + if (db_info["db_version"].as() != this->db_credentials.version) { + this->disconnect(); + cerr << "Error: Unsupported database version! Command expects "; + cerr << this->db_credentials.version; + cerr << ", server provides " << db_info["db_version"].as() << "!" << endl; + return false; + } + + if (this->setup_function != nullptr) { + // execute the initialization function + this->setup_function(c); + } + + connection_established = true; + + } catch (const exception &e) { + cerr << "Error: Could not connect to database:" << endl; + cerr << e.what() << endl; + } + } + + return connection_established; +} + +bool Connection::ensure_connected() { + if (c == nullptr || !c->is_open()) { + c->close(); + return connect(); + } + return true; +} + +void Connection::disconnect() { + if (c != nullptr && c->is_open()) c->close(); +} + +bool Connection::prepare_statements(vector> statements) { + + try { + for (auto statement : statements) { + this->c->prepare(statement.first, statement.second); + } + } catch (const exception &e) { + cerr << "Error: Could not prepare statements!" << endl; + return false; + } + + return true; +} + +bool Connection::prepare_statement(std::string name, std::string statement) { + + try { + this->c->prepare(name, statement); + } catch (const exception &e) { + cerr << "Error: Could not prepare statements!" << endl; + return false; + } + + return true; +} + +bool Connection::unprepare_statement(std::string name) { + + auto unprepare_statement_lambda = [] (pqxx::work* txn, std::string statement) { + txn->exec0("DEALLOCATE " + statement + ";"); + }; + + std::function unprepare_statement_func = unprepare_statement_lambda; + + return this->send_request(&unprepare_statement_func, name); +} + +int Connection::find_job(std::string p_name, std::string *f_name) { + auto check_job_ids_lambda = [](pqxx::work *txn, int *ret, std::string p_name, std::string *f_name) { + pqxx::result res {txn->exec("SELECT id FROM jobs;")}; + + *ret = 0; + + for (auto row : res) { + // check if the ID starts with the partial name we've been given + if (row["id"].as().rfind(p_name, 0) == 0) { + // make sure we haven't found it before + if (*ret == 0) { + *ret = 1; + *f_name = row["id"].as(); + } else { + // guess the partial name is ambiguous + *ret = -1; + // we've already seen two, we don't need more + return; + } + } + } + }; + + std::function check_job_ids_func = check_job_ids_lambda; + + DEBUG_PRINT("Sending request..."); + + int ret; + if (!this->send_request(&check_job_ids_func, &ret, p_name, f_name)) { + std::cerr << "Error: Could not fetch job IDs from database!" << std::endl; + return 1; + } + + DEBUG_PRINT("Request complete."); + + return ret; + +} + +JobStatusType Connection::get_job_status(std::string job) { + auto get_jstatus_lambda = [](pqxx::work *txn, std::string job, JobStatusType *status) { + + try { + pqxx::row res {txn->exec1("SELECT job_status FROM jobs WHERE id='" + job + "';")}; + *status = res["job_status"].as(); + } catch (pqxx::unexpected_rows& e) { + std::cerr << "Error: Fetching job returned nothing or too many rows!" << std::endl; + *status = JobStatusType::UNKNOWN; + } + + }; + + std::function get_jstatus_func = get_jstatus_lambda; + + DEBUG_PRINT("Sending request..."); + + JobStatusType status; + if (!this->send_request(&get_jstatus_func, job, &status)) { + std::cerr << "Error: Could not fetch job status from database!" << std::endl; + return JobStatusType::UNKNOWN; + } + + DEBUG_PRINT("Request complete."); + + return status; + +} + +JobStatusType Connection::get_task_status(db::uuid_t task) { + + auto get_task_status_lambda = [](pqxx::work *txn, db::uuid_t *task, db::JobStatusType *status){ + auto res = txn->exec_params("SELECT part_status FROM artifacts WHERE id = $1 LIMIT 1;", *task); + + if (res.size() < 1) { + *status = db::JobStatusType::UNKNOWN; + } else { + *status = res[0]["part_status"].as(); + } + }; + + std::function get_task_status_func = get_task_status_lambda; + + db::JobStatusType status; + + if (!this->send_request(&get_task_status_func, &task, &status)) { + std::cerr << "Error: Status for task " << db::to_string(task) << " could not be fetched, returning unknown." << std::endl; + return db::JobStatusType::UNKNOWN; + } + + return status; +} + +}