merge with cluster lib because

This commit is contained in:
Fabian Posch 2025-01-24 12:44:45 +01:00
parent 0a3a235575
commit 9b1bd73df1
12 changed files with 1272 additions and 6 deletions

3
.gitmodules vendored Normal file
View file

@ -0,0 +1,3 @@
[submodule "deps/libpqxx"]
path = deps/libpqxx
url = https://github.com/jtv/libpqxx

View file

@ -57,10 +57,8 @@ include_directories(./)
#add_library(ActCommon SHARED IMPORTED)
#set_target_properties(ActCommon PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libvlsilib.a)
# Include the cluster librar
add_library(act-cluster-lib SHARED IMPORTED)
set_target_properties(act-cluster-lib PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libact_cluster.so)
# Add pqxx
add_subdirectory(deps/libpqxx build-pqxx EXCLUDE_FROM_ALL)
# ATTENTION
# The main executable is defined in the CMakeLists.txt in the ./src folder.

1
deps/libpqxx vendored Submodule

@ -0,0 +1 @@
Subproject commit 6af956bb48167ed1e71cc0f0c2d8c8fdc99151a3

View file

@ -0,0 +1,396 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __PIPELINE_ARTIFACT_H__
#define __PIPELINE_ARTIFACT_H__
#include <act/act.h>
#include <cstdint>
#include <vector>
#include <unordered_map>
#include <string>
#include <memory>
namespace pl {
/**
* @brief Representation of different artifact types
*/
enum class ArtifactType {UNKNOWN, ACT, SIM_CONFIG, SIGLIST, SIM_OUTPUT};
static std::unordered_map<std::string, ArtifactType> art_type_str = {
{"unknown", ArtifactType::UNKNOWN},
{"act", ArtifactType::ACT},
{"testcases", ArtifactType::SIM_CONFIG},
{"sig_list", ArtifactType::SIGLIST},
{"sim_output", ArtifactType::SIM_OUTPUT}
};
inline std::string to_string(ArtifactType const &value) {
std::string str = "unknown";
for (auto& it : art_type_str) {
if (it.second == value) {
str = it.first;
break;
}
}
return str;
};
inline std::ostream& operator<<(std::ostream& os, ArtifactType const &rhs) {
os << to_string(rhs);
return os;
};
/** List of artifact names as well as types */
typedef std::vector<std::pair<std::string, ArtifactType>> artifact_list;
/**
* @brief An artifact is a data point which can be consumed or produced by a pipeline module
*/
class Artifact {
public:
/**
* @brief Construct a new blank Artifact
*
*/
Artifact() = default;
/**
* @brief Get the type of the artifact the object is holding.
*
* Substitute for getting the object type
*
* @return ArtifactType
*/
virtual ArtifactType get_type() { return ArtifactType::UNKNOWN; };
/**
* @brief Get the content of the artifact
*
* @tparam T Content type the artifact is holding
* @return T Content of the artifact
*/
template<typename T>
T get_content();
/**
* @brief Get the number of elements in this artifact
*
* This is mostly relevant when uploading an artifact to the database cluster.
*
* @return long The number of elements in this artifact
*/
virtual long get_size() = 0;
/**
* @brief Inform a tool whether this artifact can be handled by multiple nodes
*
* By default, one artifact should only be handled once. However, there might be
* situations where multiple nodes can handle a single artifact. This could for example
* be a situation where a simulation for a design has a coverage model for constrained
* random testing. In this case we want to cover everything as fast as possible and
* simulation can be distributed.
*
* @return true The artifact can be handled by multiple nodes
* @return false The artifact cannot be handled by multiple nodes
*/
bool parallelizable() { return false; };
};
/**
* @brief ACT design artifact
*/
class ActArtifact: public Artifact {
public:
/**
* @brief Construct a new ACT design artifact
*
* @param design The design object the artifact should point to
*/
ActArtifact(std::shared_ptr<Act> design) { this->design = design; };
/**
* @brief Get the content of the artifact
*
* @return std::shared_ptr<Act> Pointer to the artifact's design file.
*/
std::shared_ptr<Act> get_content() { return design; };
/**
* @brief Returns the type of this artifact, which is ACT.
*
* @return ArtifactType Will return the ACT artifact type.
*/
ArtifactType get_type() override { return ArtifactType::ACT; };
/**
* @brief Get the number of elements in this artifact, which is 1 per definition
*
* @return long
*/
long get_size() override { return 1; };
private:
std::shared_ptr<Act> design;
};
/**
* @brief Struct to capture one testcase.
*/
struct testcase_t {
/** Commands to be executed for this testcase */
std::vector<std::string> commands;
/** Top process for the simulation */
std::string top;
/** This simulation should be run by multiple noes */
bool parallelizable;
};
/**
* @brief Artifact containing one or more configurations for actsim
*/
class SimConfigArtifact: public Artifact {
public:
SimConfigArtifact() = default;
SimConfigArtifact(bool has_reference) { this->has_reference_ = has_reference; };
/**
* @brief Get the content of the artifact
*
* @return std::vector<testcase_t> Vector of all generated testcase structures
*/
std::vector<testcase_t>& get_content() { return testcases; };
/**
* @brief Add a testcase to the artifact
*
* @param testcase
*/
inline void add_testcase(testcase_t testcase) { this->testcases.emplace_back(testcase); };
/**
* @brief Returns the type of this artifact, which is SIM_CONFIG.
*
* @return ArtifactType Will return the SIM_CONFIG artifact type.
*/
ArtifactType get_type() override { return ArtifactType::SIM_CONFIG; };
/**
* @brief Get the number of testcases stored in this artifact
*
* @return long The size of the testcase vector
*/
long get_size() override { return testcases.size(); };
const bool& has_reference = has_reference_;
/**
* @brief Can this simulation be handled by multiple nodes
*
* By default, one artifact should only be handled once. However, there might be
* situations where multiple nodes can handle a single artifact. This could for example
* be a situation where a simulation for a design has a coverage model for constrained
* random testing. In this case we want to cover everything as fast as possible and
* simulation can be distributed.
*
* @return true The simulation can be handled by multiple nodes
* @return false The simulation cannot be handled by multiple nodes
*/
bool parallelizable() {
if (this->testcases.size() != 1) return false;
return this->testcases.front().parallelizable;
};
private:
bool has_reference_;
std::vector<testcase_t> testcases;
};
/** Reference to a signal in an ACT design */
typedef std::string signal_t;
/**
* @brief Artifact containing a list of signals
*/
class SignalListArtifact: public Artifact {
public:
/**
* @brief Get the content of the artifact
*
* @return std::vector<signal_t> Vector of all captured signals structures
*/
std::vector<signal_t> get_content() { return signals; };
/**
* @brief Add a signal to the artifact
*
* @param testcase
*/
inline void add_signal(signal_t signal) { this->signals.emplace_back(signal); };
/**
* @brief Returns the type of this artifact, which is SIGLIST.
*
* @return ArtifactType Will return the SIGLIST artifact type.
*/
ArtifactType get_type() override { return ArtifactType::SIGLIST; };
/**
* @brief Get the number of signals stored in this artifact
*
* @return long The size of the signals vector
*/
long get_size() override { return signals.size(); };
private:
std::vector<signal_t> signals;
};
/**
* @brief Artifact containing simulator output from actsim
*/
class SimOutputArtifact: public Artifact {
public:
SimOutputArtifact() {};
SimOutputArtifact(const std::vector<std::string>& sim_log, const std::vector<std::string>& error_log) {
this->sim_log = sim_log;
this->sim_err_log = error_log;
has_content_ = true;
};
/**
* @brief Get the content of the artifact
*
* @return std::vector<testcase_t> Vector of all generated simulator output lines
*/
inline std::pair<std::vector<std::string>&, std::vector<std::string>&> get_content() { return {sim_log, sim_err_log}; };
inline std::vector<uint32_t>& get_output_token_timings() { return output_token_timings; };
inline void set_output_token_timings(std::vector<uint32_t> timings) { this->output_token_timings = timings; };
/**
* @brief Add a log line to the artifact
*
* @param log_line
*/
inline void add_log_output(std::string log_line) { this->sim_log.emplace_back(log_line); has_content_ = true; };
/**
* @brief Add a error log line to the artifact
*
* @param log_line
*/
inline void add_err_output(std::string log_line) { this->sim_err_log.emplace_back(log_line); has_content_ = true; };
inline void clear_logs() {
// save the size
size_ = this->sim_err_log.size() + this->sim_log.size();
this->sim_err_log.clear();
this->sim_log.clear();
has_content_ = false;
};
inline void add_output_token_timing(uint32_t timing) { this->output_token_timings.emplace_back(timing); };
/**
* @brief Returns the type of this artifact, which is SIM_OUTPUT.
*
* @return ArtifactType Will return the SIM_OUTPUT artifact type.
*/
inline ArtifactType get_type() override { return ArtifactType::SIM_OUTPUT; };
/**
* @brief Get the number of log lines stored in this artifact
*
* @return long The size of the log lines vector
*/
inline long get_size() override { if (has_content_) {return sim_log.size();} else {return size_;} };
inline void set_size(long size) { if (!has_content_) this->size_ = size; };
const bool& fault_timing_deviation = fault_timing_deviation_;
const bool& fault_value = fault_value_;
const bool& fault_coding = fault_coding_;
const bool& fault_glitch = fault_glitch_;
const bool& fault_deadlock = fault_deadlock_;
const bool& fault_token_count = fault_token_count_;
const int& output_tokens = output_tokens_;
inline void set_fault_timing_deviation (bool fault_timing_deviation) { this->fault_timing_deviation_ = fault_timing_deviation; };
inline void set_fault_value (bool fault_value) { this->fault_value_ = fault_value; };
inline void set_fault_coding (bool fault_coding) { this->fault_coding_ = fault_coding; };
inline void set_fault_glitch (bool fault_glitch) { this->fault_glitch_ = fault_glitch; };
inline void set_fault_deadlock (bool fault_deadlock) { this->fault_deadlock_ = fault_deadlock; };
inline void set_fault_token_count (bool fault_token_count) { this->fault_token_count_ = fault_token_count; };
inline void set_output_tokens(int output_tokens) { this->output_tokens_ = output_tokens; };
private:
std::vector<std::string> sim_log;
std::vector<std::string> sim_err_log;
std::vector<uint32_t> output_token_timings;
bool fault_timing_deviation_ = false;
bool fault_value_ = false;
bool fault_coding_ = false;
bool fault_glitch_ = false;
bool fault_deadlock_ = false;
bool fault_token_count_ = false;
int output_tokens_;
long size_ = 0;
bool has_content_ = false;
};
};
#endif

View file

@ -0,0 +1,96 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DB_ARTIFACT__
#define __DB_ARTIFACT__
/**
* @brief Data used for a partial artifact from the database
*
* Don't let this confuse you. This is the additional data one would get when
* pulling a (partial) artifact from the database.
*
* Database information is intentionally kept out of the artifact library, since this
* is not needed for local artifact processing (especially in the deploy tool).
*
* Since an agent might have special requirements in regards to additional information,
* no specific implementation is provided by this library.
*
* Combinations of specific artifact types should therefor implemented locally.
* This is just the minimal cross-section of what an agent will have to deal with.
*
*/
#include "db_types.hpp"
namespace db {
class DBArtifact {
public:
/**
* @brief Construct a new Artifact with a database UUID attached to it.
*
* This should be used when downloading the artifact from the database.
* This ID is actually required when an update in the database is required.
*
* @param id UUID of the artifact this belongs to in the database
* @param source_pass The pass in the database which is responsible for the generation of this (partial) artifact
* @param target_artifact The UUID of the artifact this part belongs to
*/
DBArtifact(const db::uuid_t& id, const db::uuid_t& source_pass, const db::uuid_t& target_artifact);
/**
* @brief The ID of this object in the database
*
* The ID should be auto-generated when this task is fetched from the database.
* It is generated automatically by said database.
*
* The new partial artifact must be generated immediately to indicate someone
* working on this.
*
*/
const db::uuid_t& id = id_;
/**
* @brief The ID of the pass in the database which created this object
*/
const db::uuid_t& source_pass = source_pass_;
/**
* @brief The artifact, which this partial artifact is a part of
*/
const db::uuid_t& target_artifact = target_artifact_;
protected:
db::uuid_t id_;
db::uuid_t source_pass_;
db::uuid_t target_artifact_;
};
};
#endif

View file

@ -0,0 +1,131 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DB_CLIENT_H__
#define __DB_CLIENT_H__
#include <cstdint>
#include <iostream>
#include <pqxx/pqxx>
#include "db_types.hpp"
namespace db {
using namespace std;
// some database constants
#define MAX_CON_RETRIES 5
class Connection {
public:
Connection(
db_credentials_t& credentials,
std::function<void(pqxx::connection *c)> setup_function
);
Connection(
db_credentials_t& credentials
);
~Connection();
pqxx::connection* get_connection() { return c; };
void setup() {
setup_function(this->c);
}
bool connect();
void disconnect();
bool ensure_connected();
int find_job(std::string p_name, std::string *f_name);
JobStatusType get_job_status(std::string job);
JobStatusType get_task_status(db::uuid_t);
bool prepare_statements(vector<pair<string, string>> statements);
bool prepare_statement(std::string name, std::string statement);
bool unprepare_statement(std::string name);
// Send request takes an arbitrary request method to the database and wraps error handling
// around it. It takes a function with an arbitrary set of arguments and the arguments to give
// that function and returns whatever the function returns.
// Careful with typing here! C++ doesn't quite do the strictest typing once you go deep like this!
// The function is defined here as it can be accessed from outside this library and is templated.
// This means it must be accessible to the compiler at all times. Failing this will cause a linker error.
template<typename... Args>
bool send_request(std::function<void(pqxx::work*, Args...)> *db_function, Args... args) {
retry_send:
if (!ensure_connected()) {
cerr << "Could contact database. Broken database connection." << endl;
return false;
}
try {
pqxx::work txn(*c);
try {
// we assume the lambda takes a pointer to a transaction object as first argument
// and whatever else was given to us as the rest
(*db_function)(&txn, args...);
// commit the task to the database
txn.commit();
} catch (const exception& e) {
// if something happened during the transaction, we roll it back
txn.abort();
cerr << "Could not complete database operation. Error in execution." << endl;
cerr << e.what() << endl;
return false;
}
} catch (const pqxx::broken_connection& e) {
cerr << "Pipe broke during database operation... Retrying..." << endl;
cerr << e.what() << endl;
// using a goto here since it'll gracefully retry connection
goto retry_send;
}
return true;
};
private:
db_credentials_t db_credentials;
pqxx::connection *c;
std::function<void(pqxx::connection *c)> setup_function;
bool connected;
};
};
#endif

View file

@ -0,0 +1,349 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DB_TYPES_HPP__
#define __DB_TYPES_HPP__
#include <iomanip>
#include <pqxx/pqxx>
namespace db {
// credentials for database access
struct db_credentials_t {
std::string server = "";
bool server_override = false;
int port = 0;
bool port_override = false;
std::string uname = "";
bool uname_override = false;
std::string pwd = "";
bool pwd_override = false;
std::string dbase = "";
bool dbase_override = false;
int version;
};
// create UUID type to store database 128 UUID type
struct uuid_t {
uint64_t uuid_upper, uuid_lower;
uuid_t& operator=(uint64_t other) {
this->uuid_lower = other;
this->uuid_upper = 0;
return *this;
}
};
// override comparison operator to make our life easier
inline bool operator==(uuid_t const & lhs, uuid_t const & rhs) {
return lhs.uuid_upper == rhs.uuid_upper && lhs.uuid_lower == rhs.uuid_lower;
}
inline bool operator==(uuid_t const & lhs, int const & rhs) {
return rhs >= 0 && lhs.uuid_lower == (uint64_t)rhs;
}
inline bool operator==(int const & lhs, uuid_t const & rhs) {
return lhs >= 0 && (uint64_t)lhs == rhs.uuid_lower;
}
inline bool operator!=(uuid_t const & lhs, uuid_t const & rhs) {
return lhs.uuid_upper != rhs.uuid_upper || lhs.uuid_lower != rhs.uuid_lower;
}
inline bool operator!=(uuid_t const & lhs, int const & rhs) {
return rhs < 0 || lhs.uuid_lower != (uint64_t)rhs;
}
inline bool operator!=(int const & lhs, uuid_t const & rhs) {
return lhs < 0 || (uint64_t)lhs != rhs.uuid_lower;
}
// easier conversion to string
inline std::string to_string(const uuid_t& value) {
std::ostringstream uuid_s;
uuid_s << std::hex;
uuid_s << std::setfill('0') << std::setw(16) << value.uuid_upper;
uuid_s << std::setfill('0') << std::setw(16) << value.uuid_lower;
return uuid_s.str();
}
// stream operations
inline std::ostream& operator<<(std::ostream& os, const uuid_t& value) {
os << to_string(value);
return os;
};
/**
* @brief Representation of the current status of a job.
*
*/
enum class JobStatusType {NOT_STARTED, IN_PROGRESS, FINISHED, HALTED, UNKNOWN};
// string mapping for job status type enum
static std::unordered_map<std::string, JobStatusType> job_status_type_st {
{"not_started", JobStatusType::NOT_STARTED},
{"in_progress", JobStatusType::IN_PROGRESS},
{"finished", JobStatusType::FINISHED},
{"halted", JobStatusType::HALTED},
{"unknown", JobStatusType::UNKNOWN}
};
inline std::string to_string(JobStatusType const &value) {
std::string str = "unknown";
for (auto& it : job_status_type_st) {
if (it.second == value) {
str = it.first;
break;
}
}
return str;
};
inline std::ostream& operator<<(std::ostream& os, JobStatusType const &rhs) {
os << to_string(rhs);
return os;
};
// macro for registering enum classes for PQXX
template<typename T>
struct is_enum_class {
static constexpr bool value = std::is_enum_v<T> && !std::is_convertible_v<T, int>;
};
template<typename T>
struct is_unordered_map_string {
static constexpr bool value = false;
};
template<typename T>
struct is_unordered_map_string<std::unordered_map<std::string, T>> {
static constexpr bool value = true;
};
template<typename Map, typename Enum>
struct is_map_key_type_same_as_enum {
static constexpr bool value = std::is_same_v<typename Map::key_type, std::string> &&
std::is_same_v<typename Map::mapped_type, Enum>;
};
};
// hash operator to be able use uuid_t as a hashmap key
template<>
struct std::hash<db::uuid_t> {
std::size_t operator()(const db::uuid_t& k) const {
using std::size_t;
using std::hash;
return hash<size_t>()(hash<uint64_t>()(k.uuid_upper) ^ hash<uint64_t>()(k.uuid_lower));
}
};
/**
* @brief Generate type conversion for enum class macros in the libpqxx library
*
* Calling this macro will generate the code necessary for converting enum class types in yaml-cpp internally.
* For this you additionally need a std::unordered_map<std::string, Type)>, mapping string values to your
* custom enum class type.
*
* For more information see https://libpqxx.readthedocs.io/en/7.4.1/a01360.html
*
* @param T The type you want to generate the translation for
* @param lookup_table the unordered map, mapping string values of your type to the internal representation
*
*/
#define DB_REGISTER_ENUM_CLASS(T, lookup_table, name) \
namespace pqxx { \
\
static_assert(db::is_enum_class<T>::value, "DB_REGISTER_ENUM_TYPE: 'T' must be an enum class"); \
static_assert(db::is_unordered_map_string<decltype(lookup_table)>::value, "DB_REGISTER_ENUM_TYPE: 'lookup_table' must be std::unordered_map<std::string, T>"); \
static_assert(db::is_map_key_type_same_as_enum<decltype(lookup_table), T>::value, "DB_REGISTER_ENUM_TYPE: 'lookup_table' content must be of type T"); \
\
\
template<> \
std::string const type_name<T>{name}; \
\
template<> \
struct nullness<T> : pqxx::no_null<T> {}; \
\
template<> \
struct string_traits<T> { \
\
static T from_string(std::string_view text) { \
\
/* turn the string_view into a string */ \
std::string str(text); \
\
/* make sure the enum value exists */ \
if (lookup_table.find(str) == lookup_table.end()) throw pqxx::conversion_error("Could not convert " + str + " to " + name); \
\
return lookup_table[str]; \
}; \
\
static zview to_buf(char *begin, char *end, T const &value) { \
std::string str = ""; \
\
for (auto& it : lookup_table) { \
if (it.second == value) { \
str = it.first; \
break; \
} \
} \
\
/* make sure we actually have enough space for the enum */ \
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) { \
std::ostringstream str_str; \
str_str << "Buffer for "; \
str_str << name; \
str_str << " to small"; \
throw conversion_overrun(str_str.str()); \
}\
\
const char* c_str = str.c_str(); \
\
std::memcpy(begin, c_str, sizeof(char) * str.length()); \
\
return zview(begin, str.length()); \
}; \
\
static char *into_buf(char *begin, char *end, T const &value) { \
std::string str = ""; \
\
for (auto& it : lookup_table) { \
if (it.second == value) { \
str = it.first; \
break; \
} \
} \
\
/* make sure we actually have enough space for the enum */ \
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) { \
std::ostringstream str_str; \
str_str << "Buffer for "; \
str_str << name; \
str_str << " to small"; \
throw conversion_overrun(str_str.str()); \
}\
\
const char* c_str = str.c_str(); \
\
std::memcpy(begin, c_str, sizeof(char) * str.length()); \
\
return begin; \
}; \
\
static std::size_t size_buffer(T const &value) noexcept { \
\
std::string str; \
str = ""; \
\
for (auto& it : lookup_table) { \
if (it.second == value) { \
str = it.first; \
break; \
} \
} \
\
return str.size() + 1; \
}; \
}; \
};
/**
* @brief Conversion extension for uuid_t for the pqxx library
*
* For more information see https://libpqxx.readthedocs.io/en/7.4.1/a01360.html
*
*/
namespace pqxx {
template<>
std::string const type_name<db::uuid_t>{"uuid_t"};
template<>
struct nullness<db::uuid_t> : pqxx::no_null<db::uuid_t> {};
template<>
struct string_traits<db::uuid_t> {
static db::uuid_t from_string(std::string_view text) {
if (text.size() != 36) {
throw pqxx::conversion_error("Invalid binary string size for uuid_t. Expected: 36; Actual: " + std::to_string(text.size()));
}
// turn the string_view into a string and remove the '-'
std::string form(text);
form.erase(std::remove(form.begin(), form.end(), '-'), form.end());
// then parse into numbers
std::istringstream high_stream(form.substr(0, 16));
std::istringstream low_stream(form.substr(16, 16));
uint64_t low, high;
high_stream >> std::hex >> high;
low_stream >> std::hex >> low;
return {high, low};
};
static zview to_buf(char *begin, char *end, db::uuid_t const &value) {
std::string str = db::to_string(value);
// make sure we actually have enough space for the UUID
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) throw conversion_overrun("Buffer for UUID to small");
const char* c_str = str.c_str();
std::memcpy(begin, c_str, sizeof(char) * str.length());
return zview(begin, str.length());
};
static char *into_buf(char *begin, char *end, db::uuid_t const &value) {
std::string str = db::to_string(value);
// make sure we actually have enough space for the UUID
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) throw conversion_overrun("Buffer for UUID to small");
const char* c_str = str.c_str();
std::memcpy(begin, c_str, sizeof(char) * str.length());
return begin;
};
constexpr static std::size_t size_buffer([[maybe_unused]] db::uuid_t const &value) noexcept { return sizeof(char) * 33; };
};
};
DB_REGISTER_ENUM_CLASS(db::JobStatusType, db::job_status_type_st, "job_status_type");
#endif

View file

@ -5,6 +5,7 @@ include_directories(../include)
include_directories(../include/actsim_agent)
add_subdirectory(actsim_agent)
add_subdirectory(cluster)
file(
GLOB proj_SRC
@ -23,7 +24,7 @@ target_link_libraries(
${PROJECT_NAME}
act-cluster-lib
${actsim_agent_lib}
-lpqxx -lpq
pqxx
)
# specify install targets

View file

@ -20,7 +20,8 @@ add_library(
target_link_libraries(
${actsim_agent_lib}
act-cluster-lib
-lpqxx -lpq -latomic
pqxx
-latomic
)
# specify install targets

View file

@ -0,0 +1,14 @@
include_directories($ENV{ACT_HOME}/include)
include_directories(../include)
file(
GLOB act_cluster_lib_SRC
"*.cpp"
)
add_library(
act-cluster-lib
STATIC
${act_cluster_lib_SRC}
)

View file

@ -0,0 +1,38 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include "db_artifact.hpp"
namespace db {
DBArtifact::DBArtifact(
const db::uuid_t& id,
const db::uuid_t& source_pass,
const db::uuid_t& target_artifact
) {
this->id_ = id;
this->source_pass_ = source_pass;
this->target_artifact_ = target_artifact;
}
};

238
src/cluster/db_client.cpp Normal file
View file

@ -0,0 +1,238 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include <iostream>
#include "util.h"
#include "db_client.hpp"
namespace db {
using namespace std;
string hostname;
bool connected = false;
Connection::Connection(
db_credentials_t& credentials,
std::function<void(pqxx::connection *c)> setup
) : setup_function(move(setup)) {
this->db_credentials = credentials;
}
Connection::Connection(db_credentials_t& credentials) {
this->db_credentials = credentials;
}
Connection::~Connection() {
if (c != nullptr && c->is_open()) c->close();
}
bool Connection::connect() {
bool connection_established = false;
for (int i = 0; i < MAX_CON_RETRIES; i++) {
try {
// create the connection object
this->c = new pqxx::connection(
"host=" + db_credentials.server + " "
"port=" + std::to_string(db_credentials.port) + " "
"user=" + db_credentials.uname + " "
"password=" + db_credentials.pwd + " "
"dbname=" + db_credentials.dbase
);
// make sure the database actually has the version we need
pqxx::work txn(*(this->c));
auto db_info = txn.exec1("SELECT * FROM info;");
txn.commit();
if (db_info["db_version"].as<int>() != this->db_credentials.version) {
this->disconnect();
cerr << "Error: Unsupported database version! Command expects ";
cerr << this->db_credentials.version;
cerr << ", server provides " << db_info["db_version"].as<int>() << "!" << endl;
return false;
}
if (this->setup_function != nullptr) {
// execute the initialization function
this->setup_function(c);
}
connection_established = true;
} catch (const exception &e) {
cerr << "Error: Could not connect to database:" << endl;
cerr << e.what() << endl;
}
}
return connection_established;
}
bool Connection::ensure_connected() {
if (c == nullptr || !c->is_open()) {
c->close();
return connect();
}
return true;
}
void Connection::disconnect() {
if (c != nullptr && c->is_open()) c->close();
}
bool Connection::prepare_statements(vector<pair<string, string>> statements) {
try {
for (auto statement : statements) {
this->c->prepare(statement.first, statement.second);
}
} catch (const exception &e) {
cerr << "Error: Could not prepare statements!" << endl;
return false;
}
return true;
}
bool Connection::prepare_statement(std::string name, std::string statement) {
try {
this->c->prepare(name, statement);
} catch (const exception &e) {
cerr << "Error: Could not prepare statements!" << endl;
return false;
}
return true;
}
bool Connection::unprepare_statement(std::string name) {
auto unprepare_statement_lambda = [] (pqxx::work* txn, std::string statement) {
txn->exec0("DEALLOCATE " + statement + ";");
};
std::function<void(pqxx::work*, std::string)> unprepare_statement_func = unprepare_statement_lambda;
return this->send_request(&unprepare_statement_func, name);
}
int Connection::find_job(std::string p_name, std::string *f_name) {
auto check_job_ids_lambda = [](pqxx::work *txn, int *ret, std::string p_name, std::string *f_name) {
pqxx::result res {txn->exec("SELECT id FROM jobs;")};
*ret = 0;
for (auto row : res) {
// check if the ID starts with the partial name we've been given
if (row["id"].as<std::string>().rfind(p_name, 0) == 0) {
// make sure we haven't found it before
if (*ret == 0) {
*ret = 1;
*f_name = row["id"].as<std::string>();
} else {
// guess the partial name is ambiguous
*ret = -1;
// we've already seen two, we don't need more
return;
}
}
}
};
std::function<void(pqxx::work*, int*, std::string, std::string*)> check_job_ids_func = check_job_ids_lambda;
DEBUG_PRINT("Sending request...");
int ret;
if (!this->send_request(&check_job_ids_func, &ret, p_name, f_name)) {
std::cerr << "Error: Could not fetch job IDs from database!" << std::endl;
return 1;
}
DEBUG_PRINT("Request complete.");
return ret;
}
JobStatusType Connection::get_job_status(std::string job) {
auto get_jstatus_lambda = [](pqxx::work *txn, std::string job, JobStatusType *status) {
try {
pqxx::row res {txn->exec1("SELECT job_status FROM jobs WHERE id='" + job + "';")};
*status = res["job_status"].as<JobStatusType>();
} catch (pqxx::unexpected_rows& e) {
std::cerr << "Error: Fetching job returned nothing or too many rows!" << std::endl;
*status = JobStatusType::UNKNOWN;
}
};
std::function<void(pqxx::work*, std::string, JobStatusType*)> get_jstatus_func = get_jstatus_lambda;
DEBUG_PRINT("Sending request...");
JobStatusType status;
if (!this->send_request(&get_jstatus_func, job, &status)) {
std::cerr << "Error: Could not fetch job status from database!" << std::endl;
return JobStatusType::UNKNOWN;
}
DEBUG_PRINT("Request complete.");
return status;
}
JobStatusType Connection::get_task_status(db::uuid_t task) {
auto get_task_status_lambda = [](pqxx::work *txn, db::uuid_t *task, db::JobStatusType *status){
auto res = txn->exec_params("SELECT part_status FROM artifacts WHERE id = $1 LIMIT 1;", *task);
if (res.size() < 1) {
*status = db::JobStatusType::UNKNOWN;
} else {
*status = res[0]["part_status"].as<db::JobStatusType>();
}
};
std::function<void(pqxx::work*, db::uuid_t*, db::JobStatusType*)> get_task_status_func = get_task_status_lambda;
db::JobStatusType status;
if (!this->send_request(&get_task_status_func, &task, &status)) {
std::cerr << "Error: Status for task " << db::to_string(task) << " could not be fetched, returning unknown." << std::endl;
return db::JobStatusType::UNKNOWN;
}
return status;
}
}