actsim-cluster-agent/src/actsim_agent/worker.cpp

543 lines
20 KiB
C++

/*************************************************************************
*
* This file is part of the ACT library
*
* Copyright (c) 2024 Fabian Posch
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*
**************************************************************************
*/
#include <cstddef>
#include <cstring>
#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#include <cstdlib>
#include "util.h"
#include "log_parser.hpp"
#include "worker.hpp"
Worker::Worker(TaskInterface& interface) : interface(interface) {}
void Worker::start() {
DEBUG_PRINT("Starting worker thread...");
this->task_interrupted.store(false, std::memory_order_relaxed);
this->worker_thread = std::make_unique<std::thread>([this] () { this->thread_run(); });
}
void Worker::cancel_current() {
this->task_interrupted.store(true, std::memory_order_relaxed);
}
void Worker::join() {
if (this->worker_thread == nullptr) return;
this->worker_thread->join();
}
void Worker::thread_run() {
while (this->interface.running()) {
// this blocks until either a new task is available or the program was closed
this->interface.wait_for_fresh();
DEBUG_PRINT("Worker thread woken up");
// so first we check if we should still be running
if (!this->interface.running()) break;
// we're still good to go! get a task from the fresh queue
bool queue_empty;
auto task = this->interface.pop_fresh(queue_empty);
this->current_task.store(task->id, std::memory_order_relaxed);
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
DEBUG_PRINT("Worker has dequeued new task");
// get the design this task uses; we'll need that later
auto design = task->design;
// get the reference as well; here it's not yet important if the test actually has one
auto reference = task->reference;
// everything is good, perform the given task
bool complete;
auto output = this->perform_task(task, complete);
DEBUG_PRINT("Execution of task has ended");
// if the task was finished, push the task to be uploaded
// we need this since the task might have been interrupted
// half way though
if (complete) {
DEBUG_PRINT("Task " + db::to_string(output->id) + " was completed and scheduled for upload");
this->interface.push_finished(std::move(output));
// if this succeeded, we can decrease the number of
// tasks that require the design we needed for this task
this->interface.decrement_design(design);
// in case this run was compared to a reference, handle that ref counter too
if (reference != 0) {
this->interface.decrement_reference(reference);
}
} else {
// there are two possible reasons the task was not finished
if (this->task_interrupted.load(std::memory_order_relaxed)) {
DEBUG_PRINT("Task was interrupted by external trigger");
// we got interrupted since, the current task was halted; in this case
// we only wanna decrease our reference counter
this->interface.decrement_design(task->design);
if (reference != 0) {
this->interface.decrement_reference(reference);
}
this->task_interrupted.store(false, std::memory_order_relaxed);
} else {
DEBUG_PRINT("Something went wrong during task execution");
// something else went wrong
// we have to stop this agent
this->interface.stop();
this->interface.push_fresh(std::move(task));
}
}
}
}
std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& task, bool& finished) {
if (task->get_content().size() != 1) {
std::cerr << "Error: Simulation configuration in worker thread has more than one testcases to run!" << std::endl;
finished = false;
return std::make_unique<OutputType>(task->id, task->source_pass, task->target_artifact, task->design, task->reference, task->source_config);
}
auto testcase = task->get_content().front();
// execv expects mutable char*, so we have to copy our stuff first
auto design = this->interface.get_design(task->design);
design = "testbench.act";
char *design_char = new char[design.length()+1];
std::strcpy(design_char, design.c_str());
char *top_proc_char = new char[testcase.top.length() + 1];
std::strcpy(top_proc_char, testcase.top.c_str());
// wait for either the executing process to finish or
// for the condition variable indicating the process should be stopped
std::unique_ptr<OutputType> result;
// we will need pipes to communicate to actsim
// these macros will just make our life easier
#define READ_END 0
#define WRITE_END 1
int stdin_pipe[2];
int stdout_pipe[2];
int stderr_pipe[2];
// the pipe creation code is a lot of repetitive stuff since we need to check for any errors along the line
// don't be scared about the pipe_error thing, that is just some uhhh... code cleanup
// Pipe creation needs some error handling just in case
if (pipe(stdin_pipe) < 0) {
std::cerr << "Error: Pipe creation failed for stdin pipe. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (pipe(stdout_pipe) < 0) {
std::cerr << "Error: Pipe creation failed for stdout pipe. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (pipe(stderr_pipe) < 0) {
std::cerr << "Error: Pipe creation failed for stderr pipe. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
// our side needs nonblocking access to the pipes
if (fcntl(stdin_pipe[WRITE_END], F_SETFL, O_NONBLOCK) < 0) {
std::cerr << "Error: Could not set stdin pipe to nonblocking. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (fcntl(stdout_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
std::cerr << "Error: Could not set stdout pipe to nonblocking. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
if (fcntl(stderr_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
std::cerr << "Error: Could not set stderr pipe to nonblocking. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
DEBUG_PRINT("Starting simulator...");
pid_t pid;
// Time to have an identity crisis.
// I'd like to see this as one of those comic tropes; person gets split into
// a good and the bad self. Like the normal one and the evil clone.
/*
* Warning!
*
* Forking a multi-threaded program is *very dangerous!* The fork call will only clone this one
* thread! This also means that if there is any shared resource with other threads, and they
* were currently in a critical phase, you *will not gain access to them again!*
* There are much smarter ways to do this!
*
* Additionally, dynamically allocated memory will be duplicated as well! Beware of memory leaks!
*
* The only reason we can do this here is as follows:
* 1. There is no dynamic memory in this thread (everything is references)
* 2. We are not accessing the threading interface in the child after this point
* 3. All data used in the child after this point is local to this function
* 4. As soon as something goes wrong in the child process, it terminates
* 5. Even if some memory is duplicated, the process will eventually terminate
* and the memory will not leak
* 6. Signal handlers (like the one installed by the main thread) are process wide but will be
* replaced when execv() loads the image of the new binary
*
* As you can see, you better be aware of the consequences of this function call.
*
* If you really need do to something like this, either really know what you're doing or use
* things like std::async or std::future.
*
* Proceed with caution!
*
*/
pid = fork();
// Which side are we? The normal or the evil clone?
if (pid == 0) {
// Oh no, we're the evil clone!
// Just kidding, we're the child process
// from now on we can only assume to have access to function local data
// only this thread was duplicated by fork()
// redirect stdout, stdin, and stderr
// errors in here will just kill the program since different process at this point
if (dup2(stdin_pipe[READ_END], STDIN_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl;
exit(1);
}
if (dup2(stdout_pipe[WRITE_END], STDOUT_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl;
exit(1);
}
if (dup2(stderr_pipe[WRITE_END], STDERR_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl;
exit(1);
}
// close all the initial file descriptors so actsim (which we're about to call)
// doesn't know what's going on
if (close(stdin_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stdin_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stdout_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stdout_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stderr_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stderr_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
// I warned you about the syscall handling. A lot of error checks...
// build the execution vector
auto bin_str = std::string(std::getenv("ACT_HOME")) + "/bin/actsim";
char* bin = new char[bin_str.length() + 1];
std::strcpy(bin, bin_str.c_str());
std::string arg_str = "-m";
char* arg = new char[arg_str.length() + 1];
std::strcpy(arg, arg_str.c_str());
char* const argv[] = {bin, arg, design_char, top_proc_char, (char*)0};
// and call actsim
execv(argv[0], argv);
// we shouldn't land here
std::cerr << "Error: Failed to execute actsim binary! " << strerror(errno) << std::endl;
exit(1);
} else if (pid == -1) {
// Uh-oh looks like something went wrong in the cloning process
// This should stop the program gracefully, since we only had an issue in this
// thread. All the other threads should still be fine.
std::cerr << "Error: Worker thread was not able to spawn actsim process!" << std::endl;
finished = false;
return nullptr;
} else {
// We're the good side (cue angelic music)
// This is the main process.
// since we are the parent, we still have access to everything
// Close all the child process facing pipe ends
// since this is the parent process, we have to do all the stuff we did before
if (close(stdin_pipe[READ_END]) < 0 ||
close(stdout_pipe[WRITE_END]) < 0 ||
close(stderr_pipe[WRITE_END]) < 0
) {
std::cerr << "Error: Could not close parent facing pipe ends. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
}
// create the output artifact
result = std::make_unique<OutputType>(
task->id,
task->source_pass,
task->target_artifact,
task->design,
task->reference,
task->source_config
);
// create the output parser
std::unique_ptr<LogParser> parser;
if (task->reference == 0) {
parser = std::make_unique<LogParser>(result);
} else {
parser = std::make_unique<LogParser>(result, this->interface.get_reference(task->reference));
}
std::vector<std::string>& commands = task->get_content()[0].commands;
size_t command_n = 0;
size_t last_pos = 0;
std::string stdout_buf;
std::string stderr_buf;
// POSIX says a pipe has a buffer of at least 512 bytes
size_t rem_pipe_capacity = 512;
bool stdout_closed = false, stderr_closed = false;
// while the process is still going, read what is coming from the child process
int nread;
#define RD_BUFSIZE 512
char buf[512];
// replace with actual read condition
while (true) {
// if we haven't yet sent all commands, try to send one
if (command_n < commands.size()) {
std::string& cur_command = commands[command_n];
auto remaining_command = cur_command.substr(last_pos, cur_command.length()) + "\n";
size_t command_length = remaining_command.length();
// make sure we don't send more than the pipe can actually hold
if (rem_pipe_capacity < command_length) {
last_pos = last_pos + rem_pipe_capacity;
rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), rem_pipe_capacity);
} else {
rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), command_length);
last_pos = 0;
++command_n;
}
}
// read stdout
if (!stdout_closed) {
nread = read(stdout_pipe[READ_END], buf, RD_BUFSIZE);
switch (nread) {
case -1:
// This could simply happen when the pipe is empty, ignore that
if (errno == EWOULDBLOCK) {
break;
}
// something went wrong reading from the pipe
std::cerr << "Error: Worker failed to read pipe from child process. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
case 0:
// child process has closed the pipe
// make sure any remaining output is added to the log
if (stdout_buf != "") {
parser->parse_log(stdout_buf);
}
DEBUG_PRINT("STDOUT was closed by child");
stdout_closed = true;
break;
default:
// something was read from the pipe, parse into log
stdout_buf = stdout_buf + std::string(buf, buf + nread / sizeof buf[0]);
// while there is a new line in there, keep parsing into the log output
auto pos = stdout_buf.find('\n');
while (pos != std::string::npos) {
DEBUG_PRINT("Log output line was added");
parser->parse_log(stdout_buf.substr(0, pos));
if ((pos + 1) < stdout_buf.length()) {
stdout_buf = stdout_buf.substr(pos+1, stdout_buf.length());
} else {
stdout_buf = "";
}
pos = stdout_buf.find('\n');
}
break;
}
}
// read stderr
if (!stderr_closed) {
nread = read(stderr_pipe[READ_END], buf, RD_BUFSIZE);
switch (nread) {
case -1:
// This could simply happen when the pipe is empty, ignore that
if (errno == EWOULDBLOCK) {
break;
}
// something went wrong reading from the pipe
std::cerr << "Error: Worker failed to read pipe from child process. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
case 0:
// child process has closed the pipe
// make sure any remaining output is added to the log
if (stderr_buf != "") {
parser->parse_error(stderr_buf);
}
DEBUG_PRINT("STDERR was closed by child");
stderr_closed = true;
break;
default:
// something was read from the pipe, parse into log
stderr_buf = stderr_buf + std::string(buf, buf + nread / sizeof buf[0]);
// while there is a new line in there, keep parsing into the log output
auto pos = stderr_buf.find('\n');
while (pos != std::string::npos) {
DEBUG_PRINT("Error output line was added");
parser->parse_error(stderr_buf.substr(0, pos));
if ((pos + 1) < stderr_buf.length()) {
stderr_buf = stderr_buf.substr(pos+1, stderr_buf.length());
} else {
stderr_buf = "";
}
pos = stderr_buf.find('\n');
}
break;
}
}
// check if we need to interrupt the simulation
if (this->task_interrupted.load(std::memory_order_relaxed)) {
finished = false;
kill(pid, SIGKILL);
break;
}
// check if the process has ended ie. all pipes closed
if (stdout_closed && stderr_closed) {
finished = true;
int exit_code;
waitpid(pid, &exit_code, 0);
if (exit_code != 0) {
std::cerr << "SIMULATION EXITED ABNORMALLY!" << std::endl;
for (auto&& line : result->get_content().second) {
std::cerr << line << std::endl;
}
}
break;
}
// check if we need to abort due to a busy deadlock
if (parser->check_busy_deadlock()) {
finished = true;
kill(pid, SIGKILL);
break;
std::cout << "Killing deadlocked sim" << std::endl;
}
}
parser->finalize();
}
// Close all the remaining pipes
if (close(stdin_pipe[WRITE_END]) < 0 ||
close(stdout_pipe[READ_END]) < 0 ||
close(stderr_pipe[READ_END]) < 0
) {
std::cerr << "Error: Could not close child facing pipe ends. " << strerror(errno) << std::endl;
}
delete[] design_char;
delete[] top_proc_char;
return result;
}