Compare commits

...

31 commits

Author SHA1 Message Date
9ffea99281 fix sign error 2025-02-02 22:54:13 +01:00
fa0ce2f321 new attempt for deadlock and token fault detection 2025-02-02 22:53:27 +01:00
28d095cb4e possible fix for deadlock weirdness 2025-02-02 22:33:21 +01:00
7505af9c43 test for weirder deadlocks 2025-02-02 22:24:04 +01:00
d5133e3eaa test for weird deadlocks 2025-02-02 22:18:38 +01:00
cd0742cbaa change from view to prepared statement 2025-02-02 20:28:40 +01:00
d3f88a8acd add missing deadlock detection 2025-02-02 17:08:11 +01:00
7dcadc1ca2 final performant version 2025-02-02 16:33:52 +01:00
5786b7bf22 another dbase optimization 2025-02-01 21:36:46 +01:00
dd377a4f0f print error log on failed simulation 2025-01-31 13:55:36 +01:00
009ee509d5 add skip if buffer still full 2025-01-31 01:08:38 +01:00
e51064771b final fixes 2025-01-30 23:57:27 +01:00
25b4684669 optimize dbase interaction 2025-01-28 10:58:58 +01:00
dbe9bf9a9e remove logging for faster simulation 2025-01-24 16:36:09 +01:00
b121d96b37 fix missing build directives 2025-01-24 13:06:48 +01:00
a66f5b8b7e fix pqxx include path 2025-01-24 12:51:25 +01:00
32b8f078c1 fix include for cluster part 2025-01-24 12:48:17 +01:00
9b1bd73df1 merge with cluster lib because 2025-01-24 12:44:45 +01:00
0a3a235575 implement remaining failure modes and optimize reference data pull 2025-01-08 16:53:16 +01:00
fe9eaeb8f6 disable space optimization for now for less traffic 2025-01-08 16:51:48 +01:00
3d704c7e73 streamline code and fix running out of file descriptors 2025-01-03 12:58:53 +01:00
a9aff14f82 add missing build file update 2025-01-03 12:56:51 +01:00
af9343ac49 add debug code 2025-01-03 12:56:21 +01:00
127074f85e upload fault flags, streamline code 2025-01-03 12:55:44 +01:00
7f8efe0e73 add pulling token info and fix non-working downloader 2025-01-03 12:53:59 +01:00
182be82ac2 fix weird build issues 2025-01-03 12:52:56 +01:00
5b3b3b9810 implement log parser 2025-01-03 12:52:34 +01:00
d4667506dd add log parser 2024-01-29 16:14:19 -05:00
711808eafb implement reference run handling in threading; actual output parsing is missing as of now 2024-01-29 15:38:55 -05:00
9ddcd8ea71 remove superfluous debug print 2024-01-29 11:57:32 -05:00
978c58268a add reference store to task interface 2024-01-29 11:55:35 -05:00
25 changed files with 2202 additions and 312 deletions

3
.gitmodules vendored Normal file
View file

@ -0,0 +1,3 @@
[submodule "deps/libpqxx"]
path = deps/libpqxx
url = https://github.com/jtv/libpqxx

View file

@ -51,16 +51,14 @@ include_directories(./)
# Include shared act libraries
# We need this mostly for file elaboration ahead of cluster deployment
#add_library(ActLib SHARED IMPORTED)
#set_target_properties(ActLib PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libact.a)
add_library(act-lib SHARED IMPORTED)
set_target_properties(act-lib PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libact.a)
#add_library(ActCommon SHARED IMPORTED)
#set_target_properties(ActCommon PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libvlsilib.a)
# Include the cluster librar
add_library(act-cluster-lib SHARED IMPORTED)
set_target_properties(act-cluster-lib PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libact_cluster.so)
add_library(act-common-lib SHARED IMPORTED)
set_target_properties(act-common-lib PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME}/lib/libvlsilib.a)
# Add pqxx
add_subdirectory(deps/libpqxx build-pqxx EXCLUDE_FROM_ALL)
# ATTENTION
# The main executable is defined in the CMakeLists.txt in the ./src folder.
@ -71,30 +69,12 @@ set_target_properties(act-cluster-lib PROPERTIES IMPORTED_LOCATION $ENV{ACT_HOME
# Set the output directory of executables
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/bin)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/lib)
set(CMAKE_INSTALL_PREFIX $ENV{ACT_HOME} CACHE PATH "installation path" FORCE)
add_subdirectory(src)
# Link the needed libraries into it
target_link_libraries(
${PROJECT_NAME}
act-cluster-lib
actsim-agent-lib
)
# Add the Postgresql library
target_link_libraries(
${PROJECT_NAME}
-lpqxx -lpq
)
# specify install targets
install(
TARGETS actsim-cluster-agent
DESTINATION bin
)
# We don't provide a library
#install(
# TARGETS dflowchp

1
deps/libpqxx vendored Submodule

@ -0,0 +1 @@
Subproject commit 6af956bb48167ed1e71cc0f0c2d8c8fdc99151a3

View file

@ -30,7 +30,7 @@
#define DATABASE_VERSION 3
#define DOWNLOAD_BUFFER 5
#define DOWNLOAD_BUFFER 200
int start_agent(db::db_credentials_t db_cred, size_t worker_processes);

View file

@ -38,13 +38,23 @@ class DBSimArtifact : public db::DBArtifact {
const db::uuid_t& source_pass,
const db::uuid_t& target_artifact,
const db::uuid_t& design,
const db::uuid_t& reference,
const db::uuid_t& source_config
);
) : DBArtifact(id, source_pass, target_artifact) {
this->design_ = design;
this->reference_ = reference;
this->source_config_ = source_config;
};
/**
* @brief The UUID of the design this simulation uses
*/
const db::uuid_t& design = design_;
/**
* @brief The UUID of the reference run this simulation uses
*/
const db::uuid_t& reference = reference_;
/**
* @brief The UUID of the simulator configuration
@ -53,6 +63,7 @@ class DBSimArtifact : public db::DBArtifact {
private:
db::uuid_t reference_;
db::uuid_t design_;
db::uuid_t source_config_;
};
@ -65,8 +76,9 @@ class DBSimConfigArtifact : public DBSimArtifact, public pl::SimConfigArtifact {
const db::uuid_t& source_pass,
const db::uuid_t& target_artifact,
const db::uuid_t& design,
const db::uuid_t& reference,
const db::uuid_t& source_config
) : DBSimArtifact(id, source_pass, target_artifact, design,source_config) {};
) : DBSimArtifact(id, source_pass, target_artifact, design, reference, source_config) {};
};
class DBSimOutputArtifact : public DBSimArtifact, public pl::SimOutputArtifact {
public:
@ -76,8 +88,9 @@ class DBSimOutputArtifact : public DBSimArtifact, public pl::SimOutputArtifact {
const db::uuid_t& source_pass,
const db::uuid_t& target_artifact,
const db::uuid_t& design,
const db::uuid_t& reference,
const db::uuid_t& source_config
) : DBSimArtifact(id, source_pass, target_artifact, design,source_config) {};
) : DBSimArtifact(id, source_pass, target_artifact, design, reference, source_config) {};
};

View file

@ -31,8 +31,6 @@
#include <cluster/db_client.hpp>
#include "task_interface.hpp"
#define NOTHING_AVAILABLE_SLEEP_TIME 500ms
class Downloader {
public:
@ -47,6 +45,7 @@ class Downloader {
void thread_run();
bool fetch_tasks(size_t n);
bool fetch_design(const db::uuid_t& id, std::string& design);
std::shared_ptr<pl::SimOutputArtifact> fetch_reference_run(const db::uuid_t& id);
void reopen_task(const db::uuid_t& id, bool halt);
std::unique_ptr<std::thread> downloader_thread;

View file

@ -0,0 +1,72 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __LOG_PARSER__
#define __LOG_PARSER__
#include <cluster/artifact.hpp>
#include <cstddef>
#include <cstdint>
#include <vector>
#include "agent_artifact.hpp"
class LogParser {
public:
LogParser(std::unique_ptr<DBSimOutputArtifact>& artifact, std::shared_ptr<pl::SimOutputArtifact> reference);
LogParser(std::unique_ptr<DBSimOutputArtifact>& artifact);
void parse_log(const std::string& line);
void parse_error(const std::string& line);
void finalize();
bool check_busy_deadlock();
private:
using timing_it_t = std::vector<uint32_t>::const_iterator;
void check_token_count(const std::string& line);
void check_value_timing_fault(const std::string& line);
void check_coding_fault(const std::string& line);
void check_glitch(const std::string& line);
uint32_t extract_timestamp(const std::string& line);
void handle_output_token(const std::string& line);
std::unique_ptr<DBSimOutputArtifact>& artifact;
std::shared_ptr<pl::SimOutputArtifact> reference;
bool has_reference;
timing_it_t timing_it;
timing_it_t reference_ott_end;
bool failure_mode = false;
size_t dut_output_tokens_ = 0;
size_t output_token_difference_ = 0;
};
#endif

View file

@ -26,12 +26,15 @@
#ifndef __TASK_INTERFACE__
#define __TASK_INTERFACE__
#include <cstddef>
#include <queue>
#include <unordered_map>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
#include "agent_artifact.hpp"
#include <chrono>
/**
* If you want to use this interface for different types, you only need to change it here.
@ -40,6 +43,8 @@
using InputType = DBSimConfigArtifact;
using OutputType = DBSimOutputArtifact;
#define NOTHING_AVAILABLE_SLEEP_TIME 500ms
class TaskInterface {
public:
@ -48,33 +53,46 @@ class TaskInterface {
~TaskInterface();
void wait_for_fresh();
void wait_for_finished();
void wait_for_finished(size_t min_size);
void wait_for_buffer_consume();
void wait_for_cleanup_ready();
void wait_for_download_halt();
void wait_for_available();
void notify_cleanup_ready();
void notify_workers_program_halt();
void notify_download_halt();
void push_fresh(std::unique_ptr<InputType> task);
void push_fresh(std::vector<std::unique_ptr<InputType>>& tasks);
std::unique_ptr<InputType> pop_fresh(bool& empty);
void push_finished(std::unique_ptr<OutputType> task);
std::unique_ptr<OutputType> pop_finished(bool& empty);
std::vector<std::unique_ptr<OutputType>> pop_finished(bool& empty);
size_t get_buffer_space();
/*
* Store a design entry locally
*/
bool increment_design(const db::uuid_t& id);
void decrement_design(const db::uuid_t& id);
std::string get_design(const db::uuid_t& id);
void store_design(const db::uuid_t&, std::string& design);
/*
* Store a reference run locally
*/
bool increment_reference(const db::uuid_t& id);
void decrement_reference(const db::uuid_t& id);
std::shared_ptr<pl::SimOutputArtifact> get_reference(const db::uuid_t& id);
void store_reference(const db::uuid_t&, std::shared_ptr<pl::SimOutputArtifact> reference_run);
bool running() { return this->running_.load(std::memory_order_relaxed); };
bool is_stop_immediate() { return this->immediate_stop.load(std::memory_order_relaxed); };
void stop();
void stop_immediately() { this->immediate_stop.store(true, std::memory_order_relaxed); };
bool fresh_queue_empty();
bool finished_queue_empty();
bool finished_queue_empty(size_t min_size);
private:
@ -87,6 +105,10 @@ class TaskInterface {
volatile std::atomic_bool immediate_stop;
std::unordered_map<db::uuid_t, std::pair<size_t, std::string>> designs;
std::unordered_map<db::uuid_t, std::pair<size_t, std::shared_ptr<pl::SimOutputArtifact>>> references;
bool worker_waiting = false;
bool downloader_waiting = false;
////// Mutexes //////
@ -95,6 +117,9 @@ class TaskInterface {
// design map access
std::mutex designs_mutex;
// reference map access
std::mutex references_mutex;
// notify upload thread that the finished queue is ready for cleanup
std::atomic_bool cleanup_ready;

View file

@ -28,9 +28,12 @@
#include <cluster/db_types.hpp>
#include <cluster/db_client.hpp>
#include <memory>
#include <thread>
#include "task_interface.hpp"
std::string build_fault_flags(const std::unique_ptr<OutputType>& task);
class Uploader {
public:
@ -43,7 +46,7 @@ class Uploader {
private:
void thread_run();
bool upload_task(std::unique_ptr<OutputType> task);
bool upload_task(std::vector<std::unique_ptr<OutputType>>& tasks);
std::unique_ptr<std::thread> uploader_thread;
std::unique_ptr<db::Connection> conn;

View file

@ -48,8 +48,6 @@ class Worker {
void thread_run();
std::unique_ptr<OutputType> perform_task(std::unique_ptr<InputType>& task, bool& finished);
std::unique_ptr<OutputType> pipe_error(bool& finished);
std::unique_ptr<std::thread> worker_thread;
std::atomic<db::uuid_t> current_task;

View file

@ -0,0 +1,397 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __PIPELINE_ARTIFACT_H__
#define __PIPELINE_ARTIFACT_H__
#include <act/act.h>
#include <cstdint>
#include <vector>
#include <unordered_map>
#include <string>
#include <memory>
namespace pl {
/**
* @brief Representation of different artifact types
*/
enum class ArtifactType {UNKNOWN, ACT, SIM_CONFIG, SIGLIST, SIM_OUTPUT};
static std::unordered_map<std::string, ArtifactType> art_type_str = {
{"unknown", ArtifactType::UNKNOWN},
{"act", ArtifactType::ACT},
{"testcases", ArtifactType::SIM_CONFIG},
{"sig_list", ArtifactType::SIGLIST},
{"sim_output", ArtifactType::SIM_OUTPUT}
};
inline std::string to_string(ArtifactType const &value) {
std::string str = "unknown";
for (auto& it : art_type_str) {
if (it.second == value) {
str = it.first;
break;
}
}
return str;
};
inline std::ostream& operator<<(std::ostream& os, ArtifactType const &rhs) {
os << to_string(rhs);
return os;
};
/** List of artifact names as well as types */
typedef std::vector<std::pair<std::string, ArtifactType>> artifact_list;
/**
* @brief An artifact is a data point which can be consumed or produced by a pipeline module
*/
class Artifact {
public:
/**
* @brief Construct a new blank Artifact
*
*/
Artifact() = default;
/**
* @brief Get the type of the artifact the object is holding.
*
* Substitute for getting the object type
*
* @return ArtifactType
*/
virtual ArtifactType get_type() { return ArtifactType::UNKNOWN; };
/**
* @brief Get the content of the artifact
*
* @tparam T Content type the artifact is holding
* @return T Content of the artifact
*/
template<typename T>
T get_content();
/**
* @brief Get the number of elements in this artifact
*
* This is mostly relevant when uploading an artifact to the database cluster.
*
* @return long The number of elements in this artifact
*/
virtual long get_size() = 0;
/**
* @brief Inform a tool whether this artifact can be handled by multiple nodes
*
* By default, one artifact should only be handled once. However, there might be
* situations where multiple nodes can handle a single artifact. This could for example
* be a situation where a simulation for a design has a coverage model for constrained
* random testing. In this case we want to cover everything as fast as possible and
* simulation can be distributed.
*
* @return true The artifact can be handled by multiple nodes
* @return false The artifact cannot be handled by multiple nodes
*/
bool parallelizable() { return false; };
};
/**
* @brief ACT design artifact
*/
class ActArtifact: public Artifact {
public:
/**
* @brief Construct a new ACT design artifact
*
* @param design The design object the artifact should point to
*/
ActArtifact(std::shared_ptr<Act> design) { this->design = design; };
/**
* @brief Get the content of the artifact
*
* @return std::shared_ptr<Act> Pointer to the artifact's design file.
*/
std::shared_ptr<Act> get_content() { return design; };
/**
* @brief Returns the type of this artifact, which is ACT.
*
* @return ArtifactType Will return the ACT artifact type.
*/
ArtifactType get_type() override { return ArtifactType::ACT; };
/**
* @brief Get the number of elements in this artifact, which is 1 per definition
*
* @return long
*/
long get_size() override { return 1; };
private:
std::shared_ptr<Act> design;
};
/**
* @brief Struct to capture one testcase.
*/
struct testcase_t {
/** Commands to be executed for this testcase */
std::vector<std::string> commands;
/** Top process for the simulation */
std::string top;
/** This simulation should be run by multiple noes */
bool parallelizable;
};
/**
* @brief Artifact containing one or more configurations for actsim
*/
class SimConfigArtifact: public Artifact {
public:
SimConfigArtifact() = default;
SimConfigArtifact(bool has_reference) { this->has_reference_ = has_reference; };
/**
* @brief Get the content of the artifact
*
* @return std::vector<testcase_t> Vector of all generated testcase structures
*/
std::vector<testcase_t>& get_content() { return testcases; };
/**
* @brief Add a testcase to the artifact
*
* @param testcase
*/
inline void add_testcase(testcase_t testcase) { this->testcases.emplace_back(testcase); };
/**
* @brief Returns the type of this artifact, which is SIM_CONFIG.
*
* @return ArtifactType Will return the SIM_CONFIG artifact type.
*/
ArtifactType get_type() override { return ArtifactType::SIM_CONFIG; };
/**
* @brief Get the number of testcases stored in this artifact
*
* @return long The size of the testcase vector
*/
long get_size() override { return testcases.size(); };
const bool& has_reference = has_reference_;
/**
* @brief Can this simulation be handled by multiple nodes
*
* By default, one artifact should only be handled once. However, there might be
* situations where multiple nodes can handle a single artifact. This could for example
* be a situation where a simulation for a design has a coverage model for constrained
* random testing. In this case we want to cover everything as fast as possible and
* simulation can be distributed.
*
* @return true The simulation can be handled by multiple nodes
* @return false The simulation cannot be handled by multiple nodes
*/
bool parallelizable() {
if (this->testcases.size() != 1) return false;
return this->testcases.front().parallelizable;
};
private:
bool has_reference_ = false;
std::vector<testcase_t> testcases;
};
/** Reference to a signal in an ACT design */
typedef std::string signal_t;
/**
* @brief Artifact containing a list of signals
*/
class SignalListArtifact: public Artifact {
public:
/**
* @brief Get the content of the artifact
*
* @return std::vector<signal_t> Vector of all captured signals structures
*/
std::vector<signal_t> get_content() { return signals; };
/**
* @brief Add a signal to the artifact
*
* @param testcase
*/
inline void add_signal(signal_t signal) { this->signals.emplace_back(signal); };
/**
* @brief Returns the type of this artifact, which is SIGLIST.
*
* @return ArtifactType Will return the SIGLIST artifact type.
*/
ArtifactType get_type() override { return ArtifactType::SIGLIST; };
/**
* @brief Get the number of signals stored in this artifact
*
* @return long The size of the signals vector
*/
long get_size() override { return signals.size(); };
private:
std::vector<signal_t> signals;
};
/**
* @brief Artifact containing simulator output from actsim
*/
class SimOutputArtifact: public Artifact {
public:
SimOutputArtifact() {};
SimOutputArtifact(const std::vector<std::string>& sim_log, const std::vector<std::string>& error_log) {
this->sim_log = sim_log;
this->sim_err_log = error_log;
has_content_ = true;
};
/**
* @brief Get the content of the artifact
*
* @return std::vector<testcase_t> Vector of all generated simulator output lines
*/
inline std::pair<std::vector<std::string>&, std::vector<std::string>&> get_content() { return {sim_log, sim_err_log}; };
inline std::vector<uint32_t>& get_output_token_timings() { return output_token_timings; };
inline void set_output_token_timings(std::vector<uint32_t> timings) { this->output_token_timings = timings; };
/**
* @brief Add a log line to the artifact
*
* @param log_line
*/
inline void add_log_output(std::string log_line) { this->sim_log.emplace_back(log_line); has_content_ = true; };
/**
* @brief Add a error log line to the artifact
*
* @param log_line
*/
inline void add_err_output(std::string log_line) { this->sim_err_log.emplace_back(log_line); has_content_ = true; };
inline void clear_logs() {
// save the size
size_ = this->sim_err_log.size() + this->sim_log.size();
this->sim_err_log.clear();
this->sim_log.clear();
has_content_ = false;
};
inline void add_output_token_timing(uint32_t timing) { this->output_token_timings.emplace_back(timing); };
/**
* @brief Returns the type of this artifact, which is SIM_OUTPUT.
*
* @return ArtifactType Will return the SIM_OUTPUT artifact type.
*/
inline ArtifactType get_type() override { return ArtifactType::SIM_OUTPUT; };
/**
* @brief Get the number of log lines stored in this artifact
*
* @return long The size of the log lines vector
*/
inline long get_size() override { if (has_content_) {return sim_log.size();} else {return size_;} };
inline void set_size(long size) { if (!has_content_) this->size_ = size; };
inline void fix_size() { this->size_ = sim_log.size();};
const bool& fault_timing_deviation = fault_timing_deviation_;
const bool& fault_value = fault_value_;
const bool& fault_coding = fault_coding_;
const bool& fault_glitch = fault_glitch_;
const bool& fault_deadlock = fault_deadlock_;
const bool& fault_token_count = fault_token_count_;
const int& output_tokens = output_tokens_;
inline void set_fault_timing_deviation (bool fault_timing_deviation) { this->fault_timing_deviation_ = fault_timing_deviation; };
inline void set_fault_value (bool fault_value) { this->fault_value_ = fault_value; };
inline void set_fault_coding (bool fault_coding) { this->fault_coding_ = fault_coding; };
inline void set_fault_glitch (bool fault_glitch) { this->fault_glitch_ = fault_glitch; };
inline void set_fault_deadlock (bool fault_deadlock) { this->fault_deadlock_ = fault_deadlock; };
inline void set_fault_token_count (bool fault_token_count) { this->fault_token_count_ = fault_token_count; };
inline void set_output_tokens(int output_tokens) { this->output_tokens_ = output_tokens; };
private:
std::vector<std::string> sim_log;
std::vector<std::string> sim_err_log;
std::vector<uint32_t> output_token_timings;
bool fault_timing_deviation_ = false;
bool fault_value_ = false;
bool fault_coding_ = false;
bool fault_glitch_ = false;
bool fault_deadlock_ = false;
bool fault_token_count_ = false;
int output_tokens_;
long size_ = 0;
bool has_content_ = false;
};
};
#endif

View file

@ -0,0 +1,96 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DB_ARTIFACT__
#define __DB_ARTIFACT__
/**
* @brief Data used for a partial artifact from the database
*
* Don't let this confuse you. This is the additional data one would get when
* pulling a (partial) artifact from the database.
*
* Database information is intentionally kept out of the artifact library, since this
* is not needed for local artifact processing (especially in the deploy tool).
*
* Since an agent might have special requirements in regards to additional information,
* no specific implementation is provided by this library.
*
* Combinations of specific artifact types should therefor implemented locally.
* This is just the minimal cross-section of what an agent will have to deal with.
*
*/
#include "db_types.hpp"
namespace db {
class DBArtifact {
public:
/**
* @brief Construct a new Artifact with a database UUID attached to it.
*
* This should be used when downloading the artifact from the database.
* This ID is actually required when an update in the database is required.
*
* @param id UUID of the artifact this belongs to in the database
* @param source_pass The pass in the database which is responsible for the generation of this (partial) artifact
* @param target_artifact The UUID of the artifact this part belongs to
*/
DBArtifact(const db::uuid_t& id, const db::uuid_t& source_pass, const db::uuid_t& target_artifact);
/**
* @brief The ID of this object in the database
*
* The ID should be auto-generated when this task is fetched from the database.
* It is generated automatically by said database.
*
* The new partial artifact must be generated immediately to indicate someone
* working on this.
*
*/
const db::uuid_t& id = id_;
/**
* @brief The ID of the pass in the database which created this object
*/
const db::uuid_t& source_pass = source_pass_;
/**
* @brief The artifact, which this partial artifact is a part of
*/
const db::uuid_t& target_artifact = target_artifact_;
protected:
db::uuid_t id_;
db::uuid_t source_pass_;
db::uuid_t target_artifact_;
};
};
#endif

View file

@ -0,0 +1,157 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DB_CLIENT_H__
#define __DB_CLIENT_H__
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <pqxx/pqxx>
#include "db_types.hpp"
namespace db {
using namespace std;
// some database constants
#define MAX_CON_RETRIES 5
class Connection {
public:
Connection(
db_credentials_t& credentials,
std::function<void(pqxx::connection *c)> setup_function
);
Connection(
db_credentials_t& credentials
);
~Connection();
pqxx::connection* get_connection() { return c; };
void setup() {
setup_function(this->c);
}
bool connect();
void disconnect();
bool ensure_connected();
int find_job(std::string p_name, std::string *f_name);
JobStatusType get_job_status(std::string job);
JobStatusType get_task_status(db::uuid_t);
bool prepare_statements(vector<pair<string, string>> statements);
bool prepare_statement(std::string name, std::string statement);
bool unprepare_statement(std::string name);
pqxx::work open_transaction(bool& success) {
retry_send:
if (!ensure_connected()) {
cerr << "Could contact database. Broken database connection." << endl;
success = false;
}
try {
return pqxx::work(*c);
} catch (const pqxx::broken_connection& e) {
cerr << "Pipe broke during database operation... Retrying..." << endl;
cerr << e.what() << endl;
// using a goto here since it'll gracefully retry connection
goto retry_send;
}
success = true;
}
// Send request takes an arbitrary request method to the database and wraps error handling
// around it. It takes a function with an arbitrary set of arguments and the arguments to give
// that function and returns whatever the function returns.
// Careful with typing here! C++ doesn't quite do the strictest typing once you go deep like this!
// The function is defined here as it can be accessed from outside this library and is templated.
// This means it must be accessible to the compiler at all times. Failing this will cause a linker error.
template<typename... Args>
bool send_request(std::function<void(pqxx::work*, Args...)> *db_function, Args... args) {
retry_send:
if (!ensure_connected()) {
cerr << "Could contact database. Broken database connection." << endl;
return false;
}
try {
pqxx::work txn(*c);
try {
// we assume the lambda takes a pointer to a transaction object as first argument
// and whatever else was given to us as the rest
(*db_function)(&txn, args...);
// commit the task to the database
txn.commit();
} catch (const exception& e) {
// if something happened during the transaction, we roll it back
txn.abort();
cerr << "Could not complete database operation. Error in execution." << endl;
cerr << e.what() << endl;
return false;
}
} catch (const pqxx::broken_connection& e) {
cerr << "Pipe broke during database operation... Retrying..." << endl;
cerr << e.what() << endl;
// using a goto here since it'll gracefully retry connection
goto retry_send;
}
return true;
};
private:
db_credentials_t db_credentials;
pqxx::connection *c;
std::function<void(pqxx::connection *c)> setup_function;
bool connected;
};
};
#endif

View file

@ -0,0 +1,349 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#ifndef __DB_TYPES_HPP__
#define __DB_TYPES_HPP__
#include <iomanip>
#include <pqxx/pqxx>
namespace db {
// credentials for database access
struct db_credentials_t {
std::string server = "";
bool server_override = false;
int port = 0;
bool port_override = false;
std::string uname = "";
bool uname_override = false;
std::string pwd = "";
bool pwd_override = false;
std::string dbase = "";
bool dbase_override = false;
int version;
};
// create UUID type to store database 128 UUID type
struct uuid_t {
uint64_t uuid_upper, uuid_lower;
uuid_t& operator=(uint64_t other) {
this->uuid_lower = other;
this->uuid_upper = 0;
return *this;
}
};
// override comparison operator to make our life easier
inline bool operator==(uuid_t const & lhs, uuid_t const & rhs) {
return lhs.uuid_upper == rhs.uuid_upper && lhs.uuid_lower == rhs.uuid_lower;
}
inline bool operator==(uuid_t const & lhs, int const & rhs) {
return rhs >= 0 && lhs.uuid_lower == (uint64_t)rhs;
}
inline bool operator==(int const & lhs, uuid_t const & rhs) {
return lhs >= 0 && (uint64_t)lhs == rhs.uuid_lower;
}
inline bool operator!=(uuid_t const & lhs, uuid_t const & rhs) {
return lhs.uuid_upper != rhs.uuid_upper || lhs.uuid_lower != rhs.uuid_lower;
}
inline bool operator!=(uuid_t const & lhs, int const & rhs) {
return rhs < 0 || lhs.uuid_lower != (uint64_t)rhs;
}
inline bool operator!=(int const & lhs, uuid_t const & rhs) {
return lhs < 0 || (uint64_t)lhs != rhs.uuid_lower;
}
// easier conversion to string
inline std::string to_string(const uuid_t& value) {
std::ostringstream uuid_s;
uuid_s << std::hex;
uuid_s << std::setfill('0') << std::setw(16) << value.uuid_upper;
uuid_s << std::setfill('0') << std::setw(16) << value.uuid_lower;
return uuid_s.str();
}
// stream operations
inline std::ostream& operator<<(std::ostream& os, const uuid_t& value) {
os << to_string(value);
return os;
};
/**
* @brief Representation of the current status of a job.
*
*/
enum class JobStatusType {NOT_STARTED, IN_PROGRESS, FINISHED, HALTED, UNKNOWN};
// string mapping for job status type enum
static std::unordered_map<std::string, JobStatusType> job_status_type_st {
{"not_started", JobStatusType::NOT_STARTED},
{"in_progress", JobStatusType::IN_PROGRESS},
{"finished", JobStatusType::FINISHED},
{"halted", JobStatusType::HALTED},
{"unknown", JobStatusType::UNKNOWN}
};
inline std::string to_string(JobStatusType const &value) {
std::string str = "unknown";
for (auto& it : job_status_type_st) {
if (it.second == value) {
str = it.first;
break;
}
}
return str;
};
inline std::ostream& operator<<(std::ostream& os, JobStatusType const &rhs) {
os << to_string(rhs);
return os;
};
// macro for registering enum classes for PQXX
template<typename T>
struct is_enum_class {
static constexpr bool value = std::is_enum_v<T> && !std::is_convertible_v<T, int>;
};
template<typename T>
struct is_unordered_map_string {
static constexpr bool value = false;
};
template<typename T>
struct is_unordered_map_string<std::unordered_map<std::string, T>> {
static constexpr bool value = true;
};
template<typename Map, typename Enum>
struct is_map_key_type_same_as_enum {
static constexpr bool value = std::is_same_v<typename Map::key_type, std::string> &&
std::is_same_v<typename Map::mapped_type, Enum>;
};
};
// hash operator to be able use uuid_t as a hashmap key
template<>
struct std::hash<db::uuid_t> {
std::size_t operator()(const db::uuid_t& k) const {
using std::size_t;
using std::hash;
return hash<size_t>()(hash<uint64_t>()(k.uuid_upper) ^ hash<uint64_t>()(k.uuid_lower));
}
};
/**
* @brief Generate type conversion for enum class macros in the libpqxx library
*
* Calling this macro will generate the code necessary for converting enum class types in yaml-cpp internally.
* For this you additionally need a std::unordered_map<std::string, Type)>, mapping string values to your
* custom enum class type.
*
* For more information see https://libpqxx.readthedocs.io/en/7.4.1/a01360.html
*
* @param T The type you want to generate the translation for
* @param lookup_table the unordered map, mapping string values of your type to the internal representation
*
*/
#define DB_REGISTER_ENUM_CLASS(T, lookup_table, name) \
namespace pqxx { \
\
static_assert(db::is_enum_class<T>::value, "DB_REGISTER_ENUM_TYPE: 'T' must be an enum class"); \
static_assert(db::is_unordered_map_string<decltype(lookup_table)>::value, "DB_REGISTER_ENUM_TYPE: 'lookup_table' must be std::unordered_map<std::string, T>"); \
static_assert(db::is_map_key_type_same_as_enum<decltype(lookup_table), T>::value, "DB_REGISTER_ENUM_TYPE: 'lookup_table' content must be of type T"); \
\
\
template<> \
std::string const type_name<T>{name}; \
\
template<> \
struct nullness<T> : pqxx::no_null<T> {}; \
\
template<> \
struct string_traits<T> { \
\
static T from_string(std::string_view text) { \
\
/* turn the string_view into a string */ \
std::string str(text); \
\
/* make sure the enum value exists */ \
if (lookup_table.find(str) == lookup_table.end()) throw pqxx::conversion_error("Could not convert " + str + " to " + name); \
\
return lookup_table[str]; \
}; \
\
static zview to_buf(char *begin, char *end, T const &value) { \
std::string str = ""; \
\
for (auto& it : lookup_table) { \
if (it.second == value) { \
str = it.first; \
break; \
} \
} \
\
/* make sure we actually have enough space for the enum */ \
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) { \
std::ostringstream str_str; \
str_str << "Buffer for "; \
str_str << name; \
str_str << " to small"; \
throw conversion_overrun(str_str.str()); \
}\
\
const char* c_str = str.c_str(); \
\
std::memcpy(begin, c_str, sizeof(char) * str.length()); \
\
return zview(begin, str.length()); \
}; \
\
static char *into_buf(char *begin, char *end, T const &value) { \
std::string str = ""; \
\
for (auto& it : lookup_table) { \
if (it.second == value) { \
str = it.first; \
break; \
} \
} \
\
/* make sure we actually have enough space for the enum */ \
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) { \
std::ostringstream str_str; \
str_str << "Buffer for "; \
str_str << name; \
str_str << " to small"; \
throw conversion_overrun(str_str.str()); \
}\
\
const char* c_str = str.c_str(); \
\
std::memcpy(begin, c_str, sizeof(char) * str.length()); \
\
return begin; \
}; \
\
static std::size_t size_buffer(T const &value) noexcept { \
\
std::string str; \
str = ""; \
\
for (auto& it : lookup_table) { \
if (it.second == value) { \
str = it.first; \
break; \
} \
} \
\
return str.size() + 1; \
}; \
}; \
};
/**
* @brief Conversion extension for uuid_t for the pqxx library
*
* For more information see https://libpqxx.readthedocs.io/en/7.4.1/a01360.html
*
*/
namespace pqxx {
template<>
std::string const type_name<db::uuid_t>{"uuid_t"};
template<>
struct nullness<db::uuid_t> : pqxx::no_null<db::uuid_t> {};
template<>
struct string_traits<db::uuid_t> {
static db::uuid_t from_string(std::string_view text) {
if (text.size() != 36) {
throw pqxx::conversion_error("Invalid binary string size for uuid_t. Expected: 36; Actual: " + std::to_string(text.size()));
}
// turn the string_view into a string and remove the '-'
std::string form(text);
form.erase(std::remove(form.begin(), form.end(), '-'), form.end());
// then parse into numbers
std::istringstream high_stream(form.substr(0, 16));
std::istringstream low_stream(form.substr(16, 16));
uint64_t low, high;
high_stream >> std::hex >> high;
low_stream >> std::hex >> low;
return {high, low};
};
static zview to_buf(char *begin, char *end, db::uuid_t const &value) {
std::string str = db::to_string(value);
// make sure we actually have enough space for the UUID
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) throw conversion_overrun("Buffer for UUID to small");
const char* c_str = str.c_str();
std::memcpy(begin, c_str, sizeof(char) * str.length());
return zview(begin, str.length());
};
static char *into_buf(char *begin, char *end, db::uuid_t const &value) {
std::string str = db::to_string(value);
// make sure we actually have enough space for the UUID
if (std::distance(begin, end) < static_cast<signed long>(str.size() + 1)) throw conversion_overrun("Buffer for UUID to small");
const char* c_str = str.c_str();
std::memcpy(begin, c_str, sizeof(char) * str.length());
return begin;
};
constexpr static std::size_t size_buffer([[maybe_unused]] db::uuid_t const &value) noexcept { return sizeof(char) * 33; };
};
};
DB_REGISTER_ENUM_CLASS(db::JobStatusType, db::job_status_type_st, "job_status_type");
#endif

View file

@ -3,8 +3,10 @@
include_directories($ENV{ACT_HOME}/include)
include_directories(../include)
include_directories(../include/actsim_agent)
include_directories(../../deps/libpqxx/include)
add_subdirectory(actsim_agent)
add_subdirectory(cluster)
file(
GLOB proj_SRC
@ -17,3 +19,17 @@ add_executable(
${PROJECT_NAME}
${proj_SRC}
)
# Link the needed libraries into it
target_link_libraries(
${PROJECT_NAME}
act-cluster-lib
${actsim_agent_lib}
pqxx
)
# specify install targets
install(
TARGETS actsim-cluster-agent
DESTINATION bin
)

View file

@ -1,21 +1,32 @@
include_directories(../../include)
include_directories(../../include/db_lib)
include_directories(../../include/actsim_agent)
include_directories($ENV{ACT_HOME}/include)
include_directories(../../deps/libpqxx/include)
file(
GLOB actsim_agent_SRC
"*.cpp"
)
set(actsim_agent_lib actsim_agent)
set(actsim_agent_lib ${actsim_agent_lib} PARENT_SCOPE)
add_library(
actsim-agent-lib
${actsim_agent_lib}
SHARED
${actsim_agent_SRC}
)
target_link_libraries(
actsim-agent-lib
${actsim_agent_lib}
act-cluster-lib
-lpqxx -lpq -latomic
)
pqxx
-latomic
)
# specify install targets
install(
TARGETS ${actsim_agent_lib}
DESTINATION lib
)

View file

@ -24,7 +24,6 @@
*/
#include <iostream>
#include <thread>
#include <csignal>
#include <atomic>
#include <cluster/db_types.hpp>

View file

@ -23,13 +23,25 @@
**************************************************************************
*/
#include "../../include/cluster/db_client.hpp"
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <memory>
#include <string>
#include <cluster/artifact.hpp>
#include <filesystem>
#include <cstdio>
#include <cstdlib>
#include <pqxx/pqxx>
#include <functional>
#include <chrono>
#include <vector>
#include "actsim_agent.hpp"
#include "agent_artifact.hpp"
#include "cluster/db_types.hpp"
#include "pqxx/internal/statement_parameters.hxx"
#include "pqxx/prepared_statement.hxx"
#include "task_interface.hpp"
#include "util.h"
#include "downloader.hpp"
@ -51,8 +63,6 @@ void Downloader::join() {
void Downloader::thread_run() {
using namespace std::chrono_literals;
// connect to the database
if (!this->conn->connect()) {
std::cerr << "Error: Upload thread could not connect to the database!" << std::endl;
@ -61,6 +71,31 @@ void Downloader::thread_run() {
return;
}
this->conn->prepare_statements({
{"mark_tasks", "UPDATE sim_configs SET part_status = 'finished' WHERE id = $1;"},
{"fetch_tasks",
"SELECT "
" ap.design_file AS design, "
" ap.top_proc, "
" ap.outputs AS target_artifact, "
" ap.id AS source_pass, "
" sc.id AS source_config, "
" sc.sim_commands, "
" sc.has_reference AS reference, "
" sc.part_status "
"FROM "
" sim_configs sc "
"JOIN "
" actsim_passes ap ON sc.artifact = ap.sim_configs "
"LEFT JOIN "
" sim_outputs so ON sc.has_reference = so.sim_config "
"WHERE "
" sc.part_status = 'not_started' "
" AND (sc.has_reference IS NULL OR so.part_status = 'finished') "
"LIMIT $1;" }
}
);
while (this->interface.running()) {
// wait until there is more space in the buffer to fill
@ -69,11 +104,13 @@ void Downloader::thread_run() {
// make sure we weren't woken up because the program closed
if (!this->interface.running()) break;
if (this->interface.get_buffer_space() < DOWNLOAD_BUFFER * 0.8) continue;
// if the download buffer is not full, fetch some more tasks
if (!this->fetch_tasks(this->interface.get_buffer_space())) {
// we can sleep for a certain amount of time, nothing to do
DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit...");
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
// DEBUG_PRINT("Going to sleep. Checking for more tasks in a bit...");
this->interface.wait_for_available();
}
}
@ -98,156 +135,42 @@ void Downloader::thread_run() {
bool Downloader::fetch_tasks(size_t n) {
DEBUG_PRINT("Downloader fetching " + std::to_string(n) + " tasks...");
for (size_t i = 0; i < n; ++i) {
// fetch a new task from the database
auto fetch_task_lambda = [](
pqxx::work *txn,
bool *task_avail,
pl::testcase_t *testcase,
db::uuid_t *target_artifact,
db::uuid_t *source_pass,
db::uuid_t *design,
db::uuid_t *source_config,
db::uuid_t *id
) {
// Fetch a new task by a couple rules:
// 1.) It has not been started yet or the output row doesn't exist
// 2.) Passes that are already in progress are preferred
// 3.) New passes are started in the order they were added to the database
// 4.) Passes are only started if all their dependencies are fulfilled
auto res = txn->exec(
"SELECT "
" ap.design_file AS design, "
" ap.top_proc, "
" ap.outputs AS target_artifact, "
" ap.id AS source_pass, "
" sc.id AS source_config, "
" sc.sim_commands, "
" so.id AS id "
"FROM "
" actsim_passes ap "
"JOIN "
" sim_configs sc ON ap.sim_configs = sc.artifact "
"LEFT JOIN "
" sim_outputs so ON sc.id = so.sim_config "
"JOIN "
" jobs j ON ap.job = j.id "
"WHERE "
" (so.id IS NULL OR so.part_status = 'not_started') "
" AND ap.id IN ( "
" SELECT ap_dep.id "
" FROM actsim_passes ap_dep "
" JOIN passes p ON ap_dep.id = p.id "
" WHERE NOT EXISTS ( "
" SELECT 1 "
" FROM passes dep "
" WHERE dep.id = ANY(p.depends_on) "
" AND dep.pass_status != 'finished' "
" ) "
" ) "
"ORDER BY "
" ap.pass_status = 'in_progress' DESC, "
" j.time_added ASC "
"LIMIT 1;"
);
bool txn_opened;
auto txn = this->conn->open_transaction(txn_opened);
pqxx::result res;
// seems like there is nothing to do right now
if (res.size() < 1) {
*task_avail = false;
return;
}
try {
res = txn.exec(pqxx::prepped("fetch_tasks"), n);
auto row = res[0];
// we got something back, sort the data
*task_avail = true;
*target_artifact = row["target_artifact"].as<db::uuid_t>();
*source_pass = row["source_pass"].as<db::uuid_t>();
*design = row["design"].as<db::uuid_t>();
*source_config = row["source_config"].as<db::uuid_t>();
std::vector<std::string> commands;
auto arr = row["sim_commands"].as_array();
std::pair<pqxx::array_parser::juncture, std::string> elem;
do {
elem = arr.get_next();
if (elem.first == pqxx::array_parser::juncture::string_value) {
commands.push_back(elem.second);
}
} while (elem.first != pqxx::array_parser::juncture::done);
*testcase = {
commands,
row["top_proc"].as<std::string>(),
false
};
if (row["id"].is_null()) {
// create a new sim output row in the database
pqxx::row new_row = txn->exec_params1(
"INSERT INTO sim_outputs (artifact, source_pass, sim_config) "
" VALUES ($1, $2, $3) "
" RETURNING id;",
*target_artifact,
*source_pass,
*source_config
);
// and get the auto-generated ID
*id = new_row["id"].as<db::uuid_t>();
} else {
// get the ID
*id = row["id"].as<db::uuid_t>();
// and since the sim output row already exists, update its status
txn->exec_params0(
"UPDATE sim_outputs SET part_status = 'in_progress' WHERE id = $1;",
*id
);
}
};
std::function<void(pqxx::work*, bool*, pl::testcase_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*, db::uuid_t*)> fetch_task_func = fetch_task_lambda;
bool task_avail;
pl::testcase_t testcase;
db::uuid_t target_artifact;
db::uuid_t source_pass;
db::uuid_t design;
db::uuid_t source_config;
db::uuid_t id;
if (!this->conn->send_request(
&fetch_task_func,
&task_avail,
&testcase,
&target_artifact,
&source_pass,
&design,
&source_config,
&id
)) {
// if we lost connection, there's nothing else we can really do
std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl;
this->interface.stop();
this->interface.stop_immediately();
for (auto&& row : res) {
txn.exec(pqxx::prepped("mark_tasks"), {row["source_config"].as<db::uuid_t>()});
}
// seems like there are no tasks to do right now; tell the calling function
// that there is still space in the buffer and we should just wait a while
if (!task_avail) {
DEBUG_PRINT("Buffer could not be filled - no more tasks available in the database!");
return false;
}
txn.commit();
} catch (const std::exception& e) {
DEBUG_PRINT("Fetched task with id " + db::to_string(id) + ", stemming from pass " + db::to_string(source_pass) + ", outputting to artifact " + db::to_string(target_artifact));
DEBUG_PRINT("Design used is " + db::to_string(design) + ", simulation config " + db::to_string(source_config));
auto task = std::make_unique<DBSimConfigArtifact>(id, source_pass, target_artifact, design, source_config);
task->add_testcase(testcase);
// if something happened during the transaction, we roll it back
txn.abort();
std::cerr << "Error: Lost connection while trying to fetch more tasks! Aborting!" << std::endl;
std::cerr << e.what() << std::endl;
this->interface.stop();
this->interface.stop_immediately();
return false;
}
// seems like there is nothing to do right now
if (res.size() < 1) {
return false;
}
for (auto row : res) {
auto target_artifact = row["target_artifact"].as<db::uuid_t>();
auto source_pass = row["source_pass"].as<db::uuid_t>();
auto design = row["design"].as<db::uuid_t>();
auto source_config = row["source_config"].as<db::uuid_t>();
// see if we already have the design locally; if not, load it
if (!this->interface.increment_design(design)) {
@ -257,30 +180,62 @@ bool Downloader::fetch_tasks(size_t n) {
// if we could not load the design, reopen it in the database
if (!this->fetch_design(design, design_path)) {
std::cerr << "Error: Could not load design for task " << task->id << ", reopening it." << std::endl;
this->reopen_task(task->id, true);
std::cerr << "Error: Could not load design for task " << source_config << ", reopening it." << std::endl;
this->reopen_task(source_config, true);
continue;
}
this->interface.store_design(design, design_path);
}
// push the task to the list of open tasks
this->interface.push_fresh(std::move(task));
}
db::uuid_t reference;
reference = 0;
if (!row["reference"].is_null()) {
reference = row["reference"].as<db::uuid_t>();
// see if we already have the reference run locally; if not, load it
if (!this->interface.increment_reference(reference)) {
DEBUG_PRINT("Fetching new reference run with ID " + db::to_string(reference));
std::shared_ptr<pl::SimOutputArtifact> reference_run;
// if we could not load the reference run, reopen the task in the database
if ((reference_run = this->fetch_reference_run(reference)) == nullptr) {
std::cerr << "Error: Could not load reference run " << reference << "!" << std::endl;
// this->reopen_task(task->id, true);
continue;
}
this->interface.store_reference(reference, reference_run);
}
}
auto com_arr = row["sim_commands"].as_sql_array<std::string>();
std::vector<std::string> commands(com_arr.cbegin(), com_arr.cend());
pl::testcase_t testcase = {
commands,
row["top_proc"].as<std::string>(),
false
};
auto task = std::make_unique<DBSimConfigArtifact>(db::uuid_t(), source_pass, target_artifact, design, reference, source_config);
task->add_testcase(testcase);
this->interface.push_fresh(std::move(task));
}
return true;
}
bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
design = "test design";
DEBUG_PRINT("Loading design with ID " + db::to_string(id) + " from database.");
auto fetch_design_lambda = [](pqxx::work *txn, const db::uuid_t *design_id, std::string *design, bool *found) {
try {
auto res = txn->exec_params1("SELECT content FROM act_files WHERE artifact = $1;", *design_id);
auto res = txn->exec("SELECT content FROM act_files WHERE artifact = $1;", *design_id)[0];
// load design into string variable
*design = res["content"].as<std::string>();
*found = true;
@ -336,15 +291,70 @@ bool Downloader::fetch_design(const db::uuid_t& id, std::string& design) {
return true;
}
void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
std::shared_ptr<pl::SimOutputArtifact> Downloader::fetch_reference_run(const db::uuid_t& id) {
// open up the status of this partial output in the database again
auto task_reopen_lambda = [](pqxx::work *txn, const db::uuid_t *task, db::JobStatusType *status) {
txn->exec_params0("UPDATE sim_outputs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", *task, *status);
DEBUG_PRINT("Loading reference run with ID " + db::to_string(id) + " from database.");
auto fetch_design_lambda = [id](
pqxx::work *txn,
std::vector<uint32_t> *output_token_timings,
long *output_tokens,
long *log_size,
bool *found
) {
try {
auto res = txn->exec("SELECT output_tokens, output_token_timings, log_size FROM sim_outputs WHERE sim_config = $1;", id)[0];
// load the output token timings
auto arr_ott = res["output_token_timings"].as_sql_array<uint32_t>();
*output_token_timings = std::vector<uint32_t>(arr_ott.cbegin(), arr_ott.cend());
*output_tokens = res["output_tokens"].as<long>();
*log_size = res["log_size"].as<long>();
*found = true;
} catch (pqxx::unexpected_rows& e) {
std::cerr << "Error: Failed to fetch reference run " << id << ": " << e.what() << std::endl;
*found = false;
}
};
std::function<void(pqxx::work*, const db::uuid_t*, db::JobStatusType*)> task_reopen_func = task_reopen_lambda;
std::function<void(
pqxx::work *txn,
std::vector<uint32_t> *output_token_timings,
long *output_tokens,
long *log_size,
bool *found
)> fetch_design_func = fetch_design_lambda;
std::vector<uint32_t> output_token_timings;
long output_tokens;
long log_size;
bool ref_run_found;
if (!this->conn->send_request(&fetch_design_func, &output_token_timings, &output_tokens, &log_size, &ref_run_found)) {
// if we lost connection, there's nothing else we can really do
std::cerr << "Error: Lost connection while trying fetch a design! Aborting!" << std::endl;
this->interface.stop();
this->interface.stop_immediately();
return nullptr;
}
if (!ref_run_found) {
return nullptr;
}
auto reference = std::make_shared<pl::SimOutputArtifact>();
reference->set_output_token_timings(output_token_timings);
reference->set_output_tokens(output_tokens);
reference->set_size(log_size);
return reference;
}
void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
DEBUG_PRINT("Reopening task with ID " + db::to_string(id));
db::JobStatusType status;
@ -355,7 +365,14 @@ void Downloader::reopen_task(const db::uuid_t& id, bool halt) {
status = db::JobStatusType::NOT_STARTED;
}
if (!this->conn->send_request(&task_reopen_func, &id, &status)) {
// open up the status of this partial output in the database again
auto task_reopen_lambda = [id, status](pqxx::work *txn) {
txn->exec("UPDATE sim_configs SET part_status = $2 WHERE id = $1 AND part_status = 'in_progress';", {id, status});
};
std::function<void(pqxx::work*)> task_reopen_func = task_reopen_lambda;
if (!this->conn->send_request(&task_reopen_func)) {
// if we lost connection, there's nothing else we can really do
std::cerr << "Error: Lost connection while trying to reopen task " << id << "! Database might be compromised! Aborting!" << std::endl;
this->interface.stop();

View file

@ -0,0 +1,275 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include "../../include/cluster/artifact.hpp"
#include <regex>
#include "util.h"
#include "log_parser.hpp"
#include <cstdint>
#include <vector>
#include <string>
LogParser::LogParser(std::unique_ptr<DBSimOutputArtifact>& artifact, std::shared_ptr<pl::SimOutputArtifact> reference) : artifact(artifact) {
this->reference = reference;
this->has_reference = true;
// get the output token timing iterator
this->timing_it = reference->get_output_token_timings().begin();
this->reference_ott_end = reference->get_output_token_timings().end();
}
LogParser::LogParser(std::unique_ptr<DBSimOutputArtifact>& artifact) : artifact(artifact) {
this->has_reference = false;
};
void LogParser::parse_log(const std::string& line) {
// DEBUG_PRINT("Parsing log line " + line);
// check for output tokens
check_token_count(line);
// check for excl high constraint violations
check_coding_fault(line);
// check for value fault
check_value_timing_fault(line);
// check for glitch
check_glitch(line);
this->artifact->add_log_output(line);
}
void LogParser::parse_error([[maybe_unused]]const std::string& line) {
// DEBUG_PRINT("Parsing error line " + line);
// actsim actually outputs everything on stdout
// Only the warnings in the beginning are on stderr.
this->artifact->add_err_output(line);
}
void LogParser::finalize() {
this->artifact->fix_size();
this->artifact->clear_logs();
/*
So the only way to do this cleanly is to make sure that either
- the model has not sent all tokens yet -> deadlock
- or the model has sent everything but there is one missing -> token fault
This has the consequence that we cannot inject close to the end of the test!
This also means that we don't really know if a deadlock occurred without
a reference run to go off.
Only that there was a potential token count difference.
*/
if (has_reference) {
// model has not sent all tokens yet
if (dut_output_tokens_ < (reference->get_output_token_timings().size() -1)) {
// a deadlock must have occured
artifact->set_fault_deadlock(true);
failure_mode = true;
DEBUG_PRINT("Deadlock detected during finalization (compared to reference)");
DEBUG_PRINT("Reference had " +
std::to_string(reference->output_tokens) +
" tokens, task had " +
std::to_string(dut_output_tokens_) +
" + " + std::to_string(output_token_difference_));
// model has sent all tokens but DUT has not sent all of them
} else if (output_token_difference_ != 0) {
// a token amount error has occurred
artifact->set_fault_token_count(true);
failure_mode = true;
DEBUG_PRINT("Token count mismatch detected during finalization (compared to reference)");
}
// if there is no failure condition,
// we don't need to save the log
//if (!failure_mode) {
// artifact->clear_logs();
//}
} else {
// if token difference != 0: token count error
if (output_token_difference_ != 0) {
artifact->set_fault_token_count(true);
artifact->set_output_tokens(dut_output_tokens_);
DEBUG_PRINT("Token count mismatch detected during finalization.");
}
}
}
inline void LogParser::check_token_count(const std::string& line) {
// difference counter should be back at 0 when log is finished
// -> both model and DUT have emitted the same number of tokens
const std::string model_token = "Model response received";
const std::string dut_token = "DUT response received";
if (line.find(model_token) != std::string::npos) {
++output_token_difference_;
}
if (line.find(dut_token) != std::string::npos) {
--output_token_difference_;
++dut_output_tokens_;
}
}
inline void LogParser::check_value_timing_fault(const std::string& line) {
// simply check if the standard test failed output
// is given by the scoreboard
const std::string test_failed = "TEST FAILED";
if (line.find(test_failed) != std::string::npos) {
artifact->set_fault_value(true);
failure_mode = true;
DEBUG_PRINT("Value error detected");
handle_output_token(line);
return;
}
// if we passed the test we still need to check for timing issues
const std::string test_succeeded = "TEST SUCCESS";
if (line.find(test_succeeded) != std::string::npos) {
// DEBUG_PRINT("Successful output token detected");
handle_output_token(line);
return;
}
}
inline void LogParser::handle_output_token(const std::string& line) {
// add the timing to the artifact
auto timing = extract_timestamp(line);
artifact->add_output_token_timing(timing);
// check if there is also a timing error
if (has_reference) {
// make sure there is still a token to compare to left
if (timing_it == reference_ott_end) {
// there is a mismatch in tokens
artifact->set_fault_token_count(true);
failure_mode = true;
DEBUG_PRINT("Tried to compare token timing but no reference token left.");
return;
}
// check if the timings align
if (timing != *timing_it) {
// timings don't line up!
artifact->set_fault_timing_deviation(true);
failure_mode = true;
DEBUG_PRINT("Token timing does not line up.");
}
// increment the iterator
++timing_it;
}
}
inline void LogParser::check_coding_fault(const std::string& line) {
// check for actsim's excl-hi warning
const std::string excl_hi_violated = "WARNING: excl-hi constraint in";
if (line.find(excl_hi_violated) != std::string::npos) {
artifact->set_fault_coding(true);
failure_mode = true;
DEBUG_PRINT("Excl-hi constraint violated");
}
}
bool LogParser::check_busy_deadlock() {
// we allow for more than 3x the events to happen compared
// to the reference run, then we assume a deadlock
// if there is no reference, we have nothing to do
if (!has_reference) {
return false;
}
if (artifact->get_size() > (reference->get_size() * 3)) {
failure_mode = true;
DEBUG_PRINT("Busy deadlock detected, reference size is " +
std::to_string(reference->get_size()) +
", ours is " + std::to_string(artifact->get_size())
);
this->artifact->set_fault_deadlock(true);
std::cout << "Killing busy deadlock" << std::endl;
return true;
}
return false;
}
inline void LogParser::check_glitch(const std::string& line) {
// simply check if the standard glitch output is given
const std::string glitch_detected = "WARNING: weak-interference on";
if (line.find(glitch_detected) != std::string::npos) {
artifact->set_fault_glitch(true);
failure_mode = true;
DEBUG_PRINT("Glitch in interface detected");
}
}
inline uint32_t LogParser::extract_timestamp(const std::string& line) {
// regex match the timestamp format
std::regex pattern(R"(^\[\s*(\d+)\s*\])");
std::smatch match;
if (std::regex_search(line, match, pattern)) {
return std::stoi(match[1].str());
} else {
return 0;
}
}

View file

@ -23,7 +23,12 @@
**************************************************************************
*/
#include "util.h"
#include <cstddef>
#include <cstdio>
#include <thread>
#include <utility>
#include <vector>
#include "task_interface.hpp"
TaskInterface::TaskInterface(size_t buffer_size) {
@ -41,6 +46,8 @@ TaskInterface::~TaskInterface() {
void TaskInterface::push_fresh(std::unique_ptr<InputType> task) {
DEBUG_PRINT("New task in the queue!");
// lock the queue and insert into it
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
this->fresh_queue.push(std::move(task));
@ -49,14 +56,30 @@ void TaskInterface::push_fresh(std::unique_ptr<InputType> task) {
this->fresh_queue_empty_condition.notify_one();
}
void TaskInterface::push_fresh(std::vector<std::unique_ptr<InputType>>& tasks) {
DEBUG_PRINT("New task in the queue!");
// lock the queue and insert into it
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
while (!tasks.empty()) {
this->fresh_queue.push(std::move(tasks.front()));
tasks.erase(tasks.begin());
}
// we put one task in there so we notify one worker thread
this->fresh_queue_empty_condition.notify_one();
}
bool TaskInterface::fresh_queue_empty() {
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
return this->fresh_queue.empty();
}
bool TaskInterface::finished_queue_empty() {
bool TaskInterface::finished_queue_empty(size_t min_size) {
std::lock_guard<std::mutex> lock(this->finished_queue_mutex);
return this->finished_queue.empty();
return this->finished_queue.size() < min_size;
}
std::unique_ptr<InputType> TaskInterface::pop_fresh(bool& empty) {
@ -84,15 +107,21 @@ std::unique_ptr<InputType> TaskInterface::pop_fresh(bool& empty) {
void TaskInterface::wait_for_fresh() {
std::unique_lock<std::mutex> lock (this->fresh_queue_empty_mutex);
worker_waiting = true;
finished_queue_empty_condition.notify_all();
// we will be notified either when there is new data or the program has been stopped
this->fresh_queue_empty_condition.wait(lock, [&] { return !this->fresh_queue_empty() || !running(); });
DEBUG_PRINT("Worker released from block");
worker_waiting = false;
}
void TaskInterface::wait_for_finished() {
void TaskInterface::wait_for_finished(size_t min_size) {
std::unique_lock<std::mutex> lock (this->finished_queue_empty_mutex);
// we will be notified either when there is new data or the program has been stopped
this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty() || !running(); });
this->finished_queue_empty_condition.wait(lock, [&] { return !this->finished_queue_empty(min_size) || !running() || (worker_waiting && downloader_waiting);});
}
void TaskInterface::wait_for_buffer_consume() {
@ -102,6 +131,18 @@ void TaskInterface::wait_for_buffer_consume() {
this->fresh_queue_full_condition.wait(lock, [&] { return this->get_buffer_space() > 0 || !running(); });
}
void TaskInterface::wait_for_available() {
downloader_waiting = true;
finished_queue_empty_condition.notify_all();
using namespace std::chrono_literals;
std::this_thread::sleep_for(NOTHING_AVAILABLE_SLEEP_TIME);
downloader_waiting = false;
}
void TaskInterface::push_finished(std::unique_ptr<OutputType> task) {
std::lock_guard<std::mutex> lock(this->finished_queue_mutex);
this->finished_queue.push(std::move(task));
@ -110,21 +151,27 @@ void TaskInterface::push_finished(std::unique_ptr<OutputType> task) {
this->finished_queue_empty_condition.notify_one();
}
std::unique_ptr<OutputType> TaskInterface::pop_finished(bool& empty) {
std::vector<std::unique_ptr<OutputType>> TaskInterface::pop_finished(bool& empty) {
std::vector<std::unique_ptr<OutputType>> tasks;
// we first need exclusive access to the queue
std::lock_guard<std::mutex> lock (this->finished_queue_mutex);
// we have to make sure the queue wasn't emptied since the last empty call was made
// or at least inform the calling thread that the queue is currently empty
std::unique_ptr<OutputType> task;
if (!(empty = this->finished_queue.empty())) {
task = std::move(this->finished_queue.front());
this->finished_queue.pop();
// check if there is a minimum amount of tasks in the queue
if (this->finished_queue.empty()) {
empty = true;
return tasks;
}
return task;
empty = false;
while (!this->finished_queue.empty()) {
tasks.emplace_back(std::move(this->finished_queue.front()));
this->finished_queue.pop();
}
return tasks;
}
bool TaskInterface::increment_design(const db::uuid_t& id) {
@ -165,16 +212,16 @@ void TaskInterface::decrement_design(const db::uuid_t& id) {
// if the reference counter hit 0, erase the design entry from the list
// of available designs
if (design_entry.first == 0) {
DEBUG_PRINT("Reference counter has hit 0. Deleting temp file from disk...");
// if (design_entry.first == 0) {
// DEBUG_PRINT("Reference counter has hit 0. Deleting temp file from disk...");
// delete the temporary file from disk
DEBUG_PRINT("Deleting design file from disk.");
std::remove(design_entry.second.c_str());
// // delete the temporary file from disk
// DEBUG_PRINT("Deleting design file from disk.");
// std::remove(design_entry.second.c_str());
DEBUG_PRINT("Erasing design from store.");
this->designs.erase(id);
}
// DEBUG_PRINT("Erasing design from store.");
// this->designs.erase(id);
// }
}
std::string TaskInterface::get_design(const db::uuid_t& id) {
@ -192,7 +239,7 @@ std::string TaskInterface::get_design(const db::uuid_t& id) {
void TaskInterface::store_design(const db::uuid_t& id, std::string& design) {
std::lock_guard<std::mutex> lock (this->designs_mutex);
DEBUG_PRINT("Henlo Storing new design with ID " + db::to_string(id));
DEBUG_PRINT("Storing new design with ID " + db::to_string(id));
// make sure the design isn't already in the list of design entries
// if it is, just increment its reference counter
@ -206,6 +253,80 @@ void TaskInterface::store_design(const db::uuid_t& id, std::string& design) {
this->designs[id] = {1, design};
}
bool TaskInterface::increment_reference(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->references_mutex);
DEBUG_PRINT("Looking for reference run with ID " + db::to_string(id));
// make sure the requested reference run is in the list of available reference runs
if (this->references.find(id) == this->references.end()) {
DEBUG_PRINT("Reference run not found.");
return false;
}
auto& reference_run = references[id];
// if so, increment its reference counter
++reference_run.first;
DEBUG_PRINT("Reference run found. Incrementing reference counter. New counter is " + std::to_string(reference_run.first));
return true;
}
void TaskInterface::decrement_reference(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->references_mutex);
DEBUG_PRINT("Looking to decrement reference run with ID " + db::to_string(id));
// make sure the requested reference run is in the list of available reference runs
if (this->references.find(id) == this->references.end()) {
DEBUG_PRINT("Could not find reference run. Not decrementing.");
return;
}
auto& reference_run = references[id];
// if so, decrement its reference counters
--reference_run.first;
DEBUG_PRINT("Reference run found. Decrementing reference counter. New counter is " + std::to_string(reference_run.first));
// if the reference counter hit 0, erase the reference run entry from the list
// of available reference runs
// if (reference_run.first == 0) {
// DEBUG_PRINT("Reference counter has hit 0. Erasing reference run from map...");
// this->references.erase(id);
// }
}
std::shared_ptr<pl::SimOutputArtifact> TaskInterface::get_reference(const db::uuid_t& id) {
std::lock_guard<std::mutex> lock (this->references_mutex);
// make sure the requested reference run is in the list of available reference runs
if (this->references.find(id) == this->references.end()) {
std::cerr << "Error: Reference run was somehow deleted before it could reach the execution stage. This should really never happen!" << std::endl;
return std::make_shared<pl::SimOutputArtifact>();
}
return this->references[id].second;
}
void TaskInterface::store_reference(const db::uuid_t& id, std::shared_ptr<pl::SimOutputArtifact> reference) {
std::lock_guard<std::mutex> lock (this->references_mutex);
DEBUG_PRINT("Storing new reference with ID " + db::to_string(id));
// make sure the reference run isn't already in the list of reference runs
// if it is, just increment its reference counter
if (this->references.find(id) != this->references.end()) {
DEBUG_PRINT("Reference run is already in here, incrementing reference counter instead.");
++(this->references[id]).first;
return;
}
// otherwise, create a new entry for this design
this->references[id] = {1, reference};
}
size_t TaskInterface::get_buffer_space() {
std::lock_guard<std::mutex> lock(this->fresh_queue_mutex);
return this->buffer_size - this->fresh_queue.size();

View file

@ -23,9 +23,14 @@
**************************************************************************
*/
#include <cstddef>
#include <iostream>
#include <memory>
#include <pqxx/pqxx>
#include <functional>
#include <cluster/db_types.hpp>
#include <sstream>
#include "task_interface.hpp"
#include "util.h"
#include "uploader.hpp"
@ -53,21 +58,24 @@ void Uploader::thread_run() {
this->interface.stop();
return;
}
size_t min_size = 1;
while (this->interface.running()) {
// this blocks until either a new task is available for upload or the
// program was halted
this->interface.wait_for_finished();
this->interface.wait_for_finished(min_size);
DEBUG_PRINT("Uploader was worken up");
DEBUG_PRINT("Uploader was woken up");
// so first we check if we should still be running
if (!this->interface.running()) break;
// we're still good to go! get a task from the fresh queue
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
auto tasks = this->interface.pop_finished(queue_empty);
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
@ -75,7 +83,7 @@ void Uploader::thread_run() {
DEBUG_PRINT("Uploader dequeued new task");
// everything is good, upload the given task
bool success = this->upload_task(std::move(task));
bool success = this->upload_task(tasks);
// Uh oh, seems like we lost database connection! Close the program.
if (!success) {
@ -85,6 +93,7 @@ void Uploader::thread_run() {
}
DEBUG_PRINT("Task successfully uploaded");
min_size = 200;
}
DEBUG_PRINT("Uploader is starting the shutdown procedure, waiting for cleanup ready");
@ -96,50 +105,63 @@ void Uploader::thread_run() {
DEBUG_PRINT("Uploader has received go for cleanup");
// upload all the remaining tasks
while (!this->interface.finished_queue_empty()) {
bool queue_empty;
auto task = this->interface.pop_finished(queue_empty);
bool queue_empty;
auto tasks = this->interface.pop_finished(queue_empty);
// in case there are ever multiple upload threads,
// the same issues apply as before
if (!queue_empty) {
DEBUG_PRINT("Uploading finished task");
if (!this->upload_task(std::move(task))) {
std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl;
}
// in case there are ever multiple upload threads,
// the same issues apply as before
if (!queue_empty) {
DEBUG_PRINT("Uploading finished task");
if (!this->upload_task(tasks)) {
std::cerr << "Error: Lost database connection for uploading tasks during cleanup. Database integrity might be compromised." << std::endl;
}
}
DEBUG_PRINT("Uploader is done");
}
bool Uploader::upload_task(std::unique_ptr<OutputType> task) {
bool Uploader::upload_task(std::vector<std::unique_ptr<OutputType>>& tasks) {
// make sure any task that is uploaded isn't halted in the database
auto task_upload_lambda = [](
pqxx::work *txn,
const db::uuid_t *target,
std::vector<std::string> *sim_log,
std::vector<std::string> *sim_error
) {
txn->exec_params0(
"UPDATE sim_outputs SET sim_log = $1, error_log = $2, part_status = 'finished' WHERE id = $3 AND part_status != 'halted';",
*sim_log,
*sim_error,
*target
);
auto upload_results_lambda = [] (pqxx::work* txn, std::vector<std::unique_ptr<OutputType>>* tasks) {
pqxx::stream_to stream = pqxx::stream_to::table(*txn, {"sim_outputs"}, {"artifact","source_pass", "sim_config", "output_tokens", "output_token_timings", "fault_flags", "log_size", "part_status"});
for (auto&& task : (*tasks)) {
stream.write_values(task->target_artifact, task->source_pass, task->source_config, task->output_tokens, task->get_output_token_timings(), build_fault_flags(task), task->get_size(), "finished");
}
stream.complete();
};
std::function<void(
pqxx::work*,
const db::uuid_t*,
std::vector<std::string>*,
std::vector<std::string>*
)> task_upload_func = task_upload_lambda;
std::function<void(pqxx::work*, std::vector<std::unique_ptr<OutputType>>*)> task_upload_func = upload_results_lambda;
DEBUG_PRINT("Updating task " + db::to_string(task->id));
DEBUG_PRINT("Updating tasks");
return this->conn->send_request(&task_upload_func, &(task->id), &(task->get_content().first), &(task->get_content().second));
return this->conn->send_request(&task_upload_func, &tasks);
}
std::string build_fault_flags(const std::unique_ptr<OutputType>& task) {
// bit mask for faults is
// 0: timing
// 1: value
// 2: coding
// 3: glitch
// 4: deadlock
// 5: token count
// 54 3210
// XX XXXX
std::stringstream flags;
flags << (task->fault_token_count ? "1" : "0");
flags << (task->fault_deadlock ? "1" : "0");
flags << (task->fault_glitch ? "1" : "0");
flags << (task->fault_coding ? "1" : "0");
flags << (task->fault_value ? "1" : "0");
flags << (task->fault_timing_deviation ? "1" : "0");
return flags.str();
}

View file

@ -23,13 +23,17 @@
**************************************************************************
*/
#include <cstddef>
#include <cstring>
#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#include <cstdlib>
#include "util.h"
#include "log_parser.hpp"
#include "worker.hpp"
Worker::Worker(TaskInterface& interface) : interface(interface) {}
@ -73,6 +77,9 @@ void Worker::thread_run() {
// get the design this task uses; we'll need that later
auto design = task->design;
// get the reference as well; here it's not yet important if the test actually has one
auto reference = task->reference;
// everything is good, perform the given task
bool complete;
auto output = this->perform_task(task, complete);
@ -89,6 +96,11 @@ void Worker::thread_run() {
// if this succeeded, we can decrease the number of
// tasks that require the design we needed for this task
this->interface.decrement_design(design);
// in case this run was compared to a reference, handle that ref counter too
if (reference != 0) {
this->interface.decrement_reference(reference);
}
} else {
// there are two possible reasons the task was not finished
@ -98,6 +110,11 @@ void Worker::thread_run() {
// we got interrupted since, the current task was halted; in this case
// we only wanna decrease our reference counter
this->interface.decrement_design(task->design);
if (reference != 0) {
this->interface.decrement_reference(reference);
}
this->task_interrupted.store(false, std::memory_order_relaxed);
} else {
DEBUG_PRINT("Something went wrong during task execution");
@ -110,24 +127,19 @@ void Worker::thread_run() {
}
}
inline std::unique_ptr<OutputType> Worker::pipe_error(bool& finished) {
std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl;
finished = false;
return nullptr;
}
std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& task, bool& finished) {
if (task->get_content().size() != 1) {
std::cerr << "Error: Simulation configuration in worker thread has more than one testcases to run!" << std::endl;
finished = false;
return std::make_unique<OutputType>(task->id, task->source_pass, task->target_artifact, task->design, task->source_config);
return std::make_unique<OutputType>(task->id, task->source_pass, task->target_artifact, task->design, task->reference, task->source_config);
}
auto testcase = task->get_content().front();
// execv expects mutable char*, so we have to copy our stuff first
auto design = this->interface.get_design(task->design);
design = "testbench.act";
char *design_char = new char[design.length()+1];
std::strcpy(design_char, design.c_str());
@ -152,26 +164,41 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// Pipe creation needs some error handling just in case
if (pipe(stdin_pipe) < 0) {
return pipe_error(finished);
std::cerr << "Error: Pipe creation failed for stdin pipe. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (pipe(stdout_pipe) < 0) {
return pipe_error(finished);
std::cerr << "Error: Pipe creation failed for stdout pipe. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (pipe(stderr_pipe) < 0) {
return pipe_error(finished);
std::cerr << "Error: Pipe creation failed for stderr pipe. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
// our side needs nonblocking access to the pipes
if (fcntl(stdin_pipe[WRITE_END], F_SETFL, O_NONBLOCK) < 0) {
return pipe_error(finished);
std::cerr << "Error: Could not set stdin pipe to nonblocking. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (fcntl(stdout_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
return pipe_error(finished);
std::cerr << "Error: Could not set stdout pipe to nonblocking. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (fcntl(stderr_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
return pipe_error(finished);
std::cerr << "Error: Could not set stderr pipe to nonblocking. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
DEBUG_PRINT("Starting simulator...");
pid_t pid;
@ -266,8 +293,12 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
auto bin_str = std::string(std::getenv("ACT_HOME")) + "/bin/actsim";
char* bin = new char[bin_str.length() + 1];
std::strcpy(bin, bin_str.c_str());
std::string arg_str = "-m";
char* arg = new char[arg_str.length() + 1];
std::strcpy(arg, arg_str.c_str());
char* const argv[] = {bin, design_char, top_proc_char, (char*)0};
char* const argv[] = {bin, arg, design_char, top_proc_char, (char*)0};
// and call actsim
execv(argv[0], argv);
@ -292,28 +323,33 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// Close all the child process facing pipe ends
// since this is the parent process, we have to do all the stuff we did before
if (close(stdin_pipe[READ_END]) < 0) {
return pipe_error(finished);
if (close(stdin_pipe[READ_END]) < 0 ||
close(stdout_pipe[WRITE_END]) < 0 ||
close(stderr_pipe[WRITE_END]) < 0
) {
std::cerr << "Error: Could not close parent facing pipe ends. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (close(stdout_pipe[WRITE_END]) < 0) {
return pipe_error(finished);
}
if (close(stderr_pipe[WRITE_END]) < 0) {
return pipe_error(finished);
}
// create the output artifact
result = std::make_unique<OutputType>(
task->id,
task->source_pass,
task->target_artifact,
task->design,
task->reference,
task->source_config
);
// create the output parser
std::unique_ptr<LogParser> parser;
if (task->reference == 0) {
parser = std::make_unique<LogParser>(result);
} else {
parser = std::make_unique<LogParser>(result, this->interface.get_reference(task->reference));
}
std::vector<std::string>& commands = task->get_content()[0].commands;
size_t command_n = 0;
size_t last_pos = 0;
@ -339,15 +375,15 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
if (command_n < commands.size()) {
std::string& cur_command = commands[command_n];
const char* command_buffer = (cur_command.substr(last_pos, cur_command.length()) + "\n").c_str();
size_t command_length = commands[command_n].length() + 1;
auto remaining_command = cur_command.substr(last_pos, cur_command.length()) + "\n";
size_t command_length = remaining_command.length();
// make sure we don't send more than the pipe can actually hold
if (rem_pipe_capacity < command_length) {
last_pos = last_pos + rem_pipe_capacity;
rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, rem_pipe_capacity);
rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), rem_pipe_capacity);
} else {
rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, command_length);
rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), command_length);
last_pos = 0;
++command_n;
}
@ -375,7 +411,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// make sure any remaining output is added to the log
if (stdout_buf != "") {
result->add_log_output(stdout_buf);
parser->parse_log(stdout_buf);
}
DEBUG_PRINT("STDOUT was closed by child");
@ -390,7 +426,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
auto pos = stdout_buf.find('\n');
while (pos != std::string::npos) {
DEBUG_PRINT("Log output line was added");
result->add_log_output(stdout_buf.substr(0, pos));
parser->parse_log(stdout_buf.substr(0, pos));
if ((pos + 1) < stdout_buf.length()) {
stdout_buf = stdout_buf.substr(pos+1, stdout_buf.length());
@ -425,7 +461,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// make sure any remaining output is added to the log
if (stderr_buf != "") {
result->add_err_output(stderr_buf);
parser->parse_error(stderr_buf);
}
DEBUG_PRINT("STDERR was closed by child");
@ -440,7 +476,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
auto pos = stderr_buf.find('\n');
while (pos != std::string::npos) {
DEBUG_PRINT("Error output line was added");
result->add_err_output(stderr_buf.substr(0, pos));
parser->parse_error(stderr_buf.substr(0, pos));
if ((pos + 1) < stderr_buf.length()) {
stderr_buf = stderr_buf.substr(pos+1, stderr_buf.length());
@ -465,10 +501,38 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// check if the process has ended ie. all pipes closed
if (stdout_closed && stderr_closed) {
finished = true;
int exit_code;
waitpid(pid, &exit_code, 0);
if (exit_code != 0) {
std::cerr << "SIMULATION EXITED ABNORMALLY!" << std::endl;
for (auto&& line : result->get_content().second) {
std::cerr << line << std::endl;
}
}
break;
}
// check if we need to abort due to a busy deadlock
if (parser->check_busy_deadlock()) {
finished = true;
kill(pid, SIGKILL);
break;
std::cout << "Killing deadlocked sim" << std::endl;
}
}
parser->finalize();
}
// Close all the remaining pipes
if (close(stdin_pipe[WRITE_END]) < 0 ||
close(stdout_pipe[READ_END]) < 0 ||
close(stderr_pipe[READ_END]) < 0
) {
std::cerr << "Error: Could not close child facing pipe ends. " << strerror(errno) << std::endl;
}
delete[] design_char;

View file

@ -0,0 +1,33 @@
include_directories($ENV{ACT_HOME}/include)
include_directories(../../include/cluster)
include_directories(../../deps/libpqxx/include)
file(
GLOB act_cluster_lib_SRC
"*.cpp"
)
add_library(
act-cluster-lib
STATIC
${act_cluster_lib_SRC}
)
target_link_libraries(
act-cluster-lib
act-lib
act-common-lib
pqxx
)
install(
DIRECTORY ${CMAKE_SOURCE_DIR}/include/cluster
DESTINATION include/cluster
FILES_MATCHING PATTERN "*.h*"
)
install(
TARGETS act-cluster-lib
LIBRARY DESTINATION lib
)

View file

@ -1,9 +1,7 @@
/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -23,15 +21,18 @@
**************************************************************************
*/
#include "agent_artifact.hpp"
#include "db_artifact.hpp"
DBSimArtifact::DBSimArtifact(
namespace db {
DBArtifact::DBArtifact(
const db::uuid_t& id,
const db::uuid_t& source_pass,
const db::uuid_t& target_artifact,
const db::uuid_t& design,
const db::uuid_t& source_config
) : DBArtifact(id, source_pass, target_artifact) {
this->design_ = design;
this->source_config_ = source_config;
const db::uuid_t& target_artifact
) {
this->id_ = id;
this->source_pass_ = source_pass;
this->target_artifact_ = target_artifact;
}
};

238
src/cluster/db_client.cpp Normal file
View file

@ -0,0 +1,238 @@
/*************************************************************************
*
* Copyright (c) 2023 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include <iostream>
#include "util.h"
#include "db_client.hpp"
namespace db {
using namespace std;
string hostname;
bool connected = false;
Connection::Connection(
db_credentials_t& credentials,
std::function<void(pqxx::connection *c)> setup
) : setup_function(move(setup)) {
this->db_credentials = credentials;
}
Connection::Connection(db_credentials_t& credentials) {
this->db_credentials = credentials;
}
Connection::~Connection() {
if (c != nullptr && c->is_open()) c->close();
}
bool Connection::connect() {
bool connection_established = false;
for (int i = 0; i < MAX_CON_RETRIES; i++) {
try {
// create the connection object
this->c = new pqxx::connection(
"host=" + db_credentials.server + " "
"port=" + std::to_string(db_credentials.port) + " "
"user=" + db_credentials.uname + " "
"password=" + db_credentials.pwd + " "
"dbname=" + db_credentials.dbase
);
// make sure the database actually has the version we need
pqxx::work txn(*(this->c));
auto db_info = txn.exec1("SELECT * FROM info;");
txn.commit();
if (db_info["db_version"].as<int>() != this->db_credentials.version) {
this->disconnect();
cerr << "Error: Unsupported database version! Command expects ";
cerr << this->db_credentials.version;
cerr << ", server provides " << db_info["db_version"].as<int>() << "!" << endl;
return false;
}
if (this->setup_function != nullptr) {
// execute the initialization function
this->setup_function(c);
}
connection_established = true;
} catch (const exception &e) {
cerr << "Error: Could not connect to database:" << endl;
cerr << e.what() << endl;
}
}
return connection_established;
}
bool Connection::ensure_connected() {
if (c == nullptr || !c->is_open()) {
c->close();
return connect();
}
return true;
}
void Connection::disconnect() {
if (c != nullptr && c->is_open()) c->close();
}
bool Connection::prepare_statements(vector<pair<string, string>> statements) {
try {
for (auto statement : statements) {
this->c->prepare(statement.first, statement.second);
}
} catch (const exception &e) {
cerr << "Error: Could not prepare statements!" << endl;
return false;
}
return true;
}
bool Connection::prepare_statement(std::string name, std::string statement) {
try {
this->c->prepare(name, statement);
} catch (const exception &e) {
cerr << "Error: Could not prepare statements! " << e.what() << endl;
return false;
}
return true;
}
bool Connection::unprepare_statement(std::string name) {
auto unprepare_statement_lambda = [] (pqxx::work* txn, std::string statement) {
txn->exec0("DEALLOCATE " + statement + ";");
};
std::function<void(pqxx::work*, std::string)> unprepare_statement_func = unprepare_statement_lambda;
return this->send_request(&unprepare_statement_func, name);
}
int Connection::find_job(std::string p_name, std::string *f_name) {
auto check_job_ids_lambda = [](pqxx::work *txn, int *ret, std::string p_name, std::string *f_name) {
pqxx::result res {txn->exec("SELECT id FROM jobs;")};
*ret = 0;
for (auto row : res) {
// check if the ID starts with the partial name we've been given
if (row["id"].as<std::string>().rfind(p_name, 0) == 0) {
// make sure we haven't found it before
if (*ret == 0) {
*ret = 1;
*f_name = row["id"].as<std::string>();
} else {
// guess the partial name is ambiguous
*ret = -1;
// we've already seen two, we don't need more
return;
}
}
}
};
std::function<void(pqxx::work*, int*, std::string, std::string*)> check_job_ids_func = check_job_ids_lambda;
DEBUG_PRINT("Sending request...");
int ret;
if (!this->send_request(&check_job_ids_func, &ret, p_name, f_name)) {
std::cerr << "Error: Could not fetch job IDs from database!" << std::endl;
return 1;
}
DEBUG_PRINT("Request complete.");
return ret;
}
JobStatusType Connection::get_job_status(std::string job) {
auto get_jstatus_lambda = [](pqxx::work *txn, std::string job, JobStatusType *status) {
try {
pqxx::row res {txn->exec1("SELECT job_status FROM jobs WHERE id='" + job + "';")};
*status = res["job_status"].as<JobStatusType>();
} catch (pqxx::unexpected_rows& e) {
std::cerr << "Error: Fetching job returned nothing or too many rows!" << std::endl;
*status = JobStatusType::UNKNOWN;
}
};
std::function<void(pqxx::work*, std::string, JobStatusType*)> get_jstatus_func = get_jstatus_lambda;
DEBUG_PRINT("Sending request...");
JobStatusType status;
if (!this->send_request(&get_jstatus_func, job, &status)) {
std::cerr << "Error: Could not fetch job status from database!" << std::endl;
return JobStatusType::UNKNOWN;
}
DEBUG_PRINT("Request complete.");
return status;
}
JobStatusType Connection::get_task_status(db::uuid_t task) {
auto get_task_status_lambda = [](pqxx::work *txn, db::uuid_t *task, db::JobStatusType *status){
auto res = txn->exec_params("SELECT part_status FROM artifacts WHERE id = $1 LIMIT 1;", *task);
if (res.size() < 1) {
*status = db::JobStatusType::UNKNOWN;
} else {
*status = res[0]["part_status"].as<db::JobStatusType>();
}
};
std::function<void(pqxx::work*, db::uuid_t*, db::JobStatusType*)> get_task_status_func = get_task_status_lambda;
db::JobStatusType status;
if (!this->send_request(&get_task_status_func, &task, &status)) {
std::cerr << "Error: Status for task " << db::to_string(task) << " could not be fetched, returning unknown." << std::endl;
return db::JobStatusType::UNKNOWN;
}
return status;
}
}