implement actsim call

This commit is contained in:
Fabian Posch 2024-01-15 16:55:37 -05:00
parent 6f7e729b33
commit 4ce580f272
2 changed files with 285 additions and 25 deletions

View file

@ -47,7 +47,8 @@ class Worker {
void thread_run();
std::unique_ptr<OutputType> perform_task(std::unique_ptr<InputType>& task, bool& finished);
void run_actsim();
std::unique_ptr<OutputType> pipe_error(bool& finished);
std::unique_ptr<std::thread> worker_thread;
std::atomic<db::uuid_t> current_task;

View file

@ -25,6 +25,8 @@
#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include "util.h"
#include "worker.hpp"
@ -104,6 +106,13 @@ void Worker::thread_run() {
}
}
inline std::unique_ptr<OutputType> Worker::pipe_error(bool& finished) {
std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl;
this->interface.stop();
finished = false;
return nullptr;
}
std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& task, bool& finished) {
std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl;
finished = true;
@ -124,44 +133,133 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
char *top_proc_char = new char[testcase.top.length() + 1];
std::strcpy(top_proc_char, testcase.top.c_str());
auto commands = testcase.commands;
std::cout << "[WORKER] Here's the copied design string: " << design_char << std::endl;
std::cout << "[WORKER] Here's the copied top string: " << top_proc_char << std::endl;
usleep(10000000);
return std::make_unique<OutputType>();
// wait for either the executing process to finish or
// for the condition variable indicating the process should be stopped
std::unique_ptr<OutputType> result;
// we will need pipes to communicate to actsim
// these macros will just make our life easier
#define READ_END 0
#define WRITE_END 1
int stdin_pipe[2];
int stdout_pipe[2];
int stderr_pipe[2];
pipe(stdin_pipe);
pipe(stdout_pipe);
pipe(stderr_pipe);
// the pipe creation code is a lot of repetitive stuff since we need to check for any errors along the line
// don't be scared about the pipe_error thing, that is just some uhhh... code cleanup
// Pipe creation needs some error handling just in case
if (pipe(stdin_pipe) < 0) {
return pipe_error(finished);
}
if (pipe(stdout_pipe) < 0) {
return pipe_error(finished);
}
if (pipe(stderr_pipe) < 0) {
return pipe_error(finished);
}
// our side needs nonblocking access to the pipes
if (fcntl(stdin_pipe[WRITE_END], F_SETFL, O_NONBLOCK) < 0) {
return pipe_error(finished);
}
if (fcntl(stdout_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
return pipe_error(finished);
}
if (fcntl(stderr_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
return pipe_error(finished);
}
pid_t pid;
// Time to have an identity crisis.
// I'd like to see this as one of those comic tropes; person gets split into
// a good and the bad self. Like the normal one and the evil clone.
/*
* Warning!
*
* Forking a multi-threaded program is *very dangerous!* The fork call will only clone this one
* thread! This also means that if there is any shared resource with other threads, and they
* were currently in a critical phase, you *will not gain access to them again!*
* There are much smarter ways to do this!
*
* Additionally, dynamically allocated memory will be duplicated as well! Beware of memory leaks!
*
* The only reason we can do this here is as follows:
* 1. There is no dynamic memory in this thread (everything is references)
* 2. We are not accessing the threading interface in the child after this point
* 3. All data used in the child after this point is local to this function
* 4. As soon as something goes wrong in the child process, it terminates
* 5. Even if some memory is duplicated, the process will eventually terminate
* and the memory will not leak
* 6. Signal handlers (like the one installed by the main thread) are process wide but will be
* replaced when execv() loads the image of the new binary
*
* As you can see, you better be aware of the consequences of this function call.
*
* If you really need do to something like this, either really know what you're doing or use
* things like std::async or std::future.
*
* Proceed with caution!
*
*/
pid = fork();
// Which side are we? The normal or the evil clone?
if (!fork()) {
if (pid == 0) {
// Oh no, we're the evil clone!
// Just kidding, we're the child process
// from now on we can only assume to have access to function local data
// only this thread was duplicated by fork()
// redirect stdout, stdin, and stderr
dup2(stdin_pipe[0], STDIN_FILENO);
dup2(stdout_pipe[1], STDOUT_FILENO);
dup2(stderr_pipe[1], STDERR_FILENO);
// errors in here will just kill the program since different process at this point
if (dup2(stdin_pipe[READ_END], STDIN_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed." << std::endl;
exit(1);
}
if (dup2(stdout_pipe[WRITE_END], STDOUT_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed." << std::endl;
exit(1);
}
if (dup2(stderr_pipe[WRITE_END], STDERR_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed." << std::endl;
exit(1);
}
// close all the initial file descriptors so actsim (which we're about to call)
// doesn't know what's going on
close(stdin_pipe[0]);
close(stdin_pipe[1]);
close(stdout_pipe[0]);
close(stdout_pipe[1]);
close(stderr_pipe[0]);
close(stderr_pipe[1]);
if (close(stdin_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
}
if (close(stdin_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
}
if (close(stdout_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
}
if (close(stdout_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
}
if (close(stderr_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
}
if (close(stderr_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
}
// I warned you about the syscall handling. A lot of error checks...
// build the execution vector
char* const argv[] = {"${ACT_HOME}/bin/actsim", design_char, top_proc_char};
@ -170,16 +268,177 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
execv(argv[0], argv);
// we shouldn't land here
std::cerr << "Error: Failed to execute actsim binary!" << std::endl;
exit(1);
} else if (pid == -1) {
// Uh-oh looks like something went wrong in the cloning process
// This should stop the program gracefully, since we only had an issue in this
// thread. All the other threads should still be fine.
std::cerr << "Error: Worker thread was not able to spawn actsim process!" << std::endl;
this->interface.stop();
finished = false;
return nullptr;
} else {
// stuff for the main process
// We're the good side (cue angelic music)
// This is the main process.
// since we are the parent, we still have access to everything
// Close all the child process facing pipe ends
// since this is the parent process, we have to do all the stuff we did before
if (close(stdin_pipe[READ_END]) < 0) {
return pipe_error(finished);
}
if (close(stdout_pipe[WRITE_END]) < 0) {
return pipe_error(finished);
}
if (close(stderr_pipe[WRITE_END]) < 0) {
return pipe_error(finished);
}
// create the output artifact
result = std::make_unique<OutputType>(
task->id,
task->source_pass,
task->target_artifact,
task->design,
task->source_config
);
std::vector<std::string>& commands = task->get_content()[0].commands;
size_t command_n = 0;
size_t last_pos = 0;
std::string stdout_buf;
std::string stderr_buf;
// POSIX says a pipe has a buffer of at least 512 bytes
size_t rem_pipe_capacity = 512;
bool stdout_closed = false, stderr_closed = false;
// replace with actual read condition
while (true) {
// if we haven't yet sent all commands, try to send one
if (command_n < commands.size()) {
std::string& cur_command = commands[command_n];
const char* command_buffer = (cur_command.substr(last_pos, cur_command.length()) + "\n").c_str();
size_t command_length = commands[command_n].length() + 1;
// make sure we don't send more than the pipe can actually hold
if (rem_pipe_capacity < command_length) {
last_pos = last_pos + rem_pipe_capacity;
rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, rem_pipe_capacity);
} else {
rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, command_length);
last_pos = 0;
++command_n;
}
}
// while the process is still going, read what is coming from the child process
int nread;
#define RD_BUFSIZE 512
char buf[512];
// read stdout
if (!stdout_closed) {
nread = read(stdout_pipe[READ_END], buf, RD_BUFSIZE);
switch (nread) {
case -1:
// something went wrong reading from the pipe
std::cerr << "Error: Worker failed to read pipe from child process." << std::endl;
this->interface.stop();
return nullptr;
case 0:
// child process has closed the pipe
stdout_closed = true;
break;
default:
// something was read from the pipe, parse into log
stdout_buf = stdout_buf + std::string(buf, buf + nread / sizeof buf[0]);
// while there is a new line in there, keep parsing into the log output
auto pos = stdout_buf.find('\n');
while (pos != std::string::npos) {
result->add_log_output(stdout_buf.substr(0, pos));
if ((pos + 1) < stdout_buf.length()) {
stdout_buf = stdout_buf.substr(pos+1, stdout_buf.length());
} else {
stdout_buf = "";
}
}
break;
}
}
// read stderr
if (!stderr_closed) {
nread = read(stderr_pipe[READ_END], buf, RD_BUFSIZE);
switch (nread) {
case -1:
// something went wrong reading from the pipe
std::cerr << "Error: Worker failed to read pipe from child process." << std::endl;
this->interface.stop();
return nullptr;
case 0:
// child process has closed the pipe
stderr_closed = true;
break;
default:
// something was read from the pipe, parse into log
stderr_buf = stderr_buf + std::string(buf, buf + nread / sizeof buf[0]);
// while there is a new line in there, keep parsing into the log output
auto pos = stderr_buf.find('\n');
while (pos != std::string::npos) {
result->add_err_output(stderr_buf.substr(0, pos));
if ((pos + 1) < stderr_buf.length()) {
stderr_buf = stderr_buf.substr(pos+1, stderr_buf.length());
} else {
stderr_buf = "";
}
}
break;
}
}
// check if we need to interrupt the simulation
if (this->task_interrupted.load(std::memory_order_relaxed)) {
finished = false;
kill(pid, SIGKILL);
break;
}
// check if the process has ended ie. all pipes closed
if (stdout_closed && stderr_closed) {
finished = true;
break;
}
}
}
delete design_char;
delete top_proc_char;
}
void Worker::run_actsim() {
return std::move(result);
}