diff --git a/include/actsim_agent/worker.hpp b/include/actsim_agent/worker.hpp index 0f7187c..5d17488 100644 --- a/include/actsim_agent/worker.hpp +++ b/include/actsim_agent/worker.hpp @@ -47,7 +47,8 @@ class Worker { void thread_run(); std::unique_ptr perform_task(std::unique_ptr& task, bool& finished); - void run_actsim(); + + std::unique_ptr pipe_error(bool& finished); std::unique_ptr worker_thread; std::atomic current_task; diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index bb7ed22..4fa7449 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -25,6 +25,8 @@ #include #include +#include +#include #include "util.h" #include "worker.hpp" @@ -104,6 +106,13 @@ void Worker::thread_run() { } } +inline std::unique_ptr Worker::pipe_error(bool& finished) { + std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl; + this->interface.stop(); + finished = false; + return nullptr; +} + std::unique_ptr Worker::perform_task(std::unique_ptr& task, bool& finished) { std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl; finished = true; @@ -124,44 +133,133 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas char *top_proc_char = new char[testcase.top.length() + 1]; std::strcpy(top_proc_char, testcase.top.c_str()); - auto commands = testcase.commands; - - std::cout << "[WORKER] Here's the copied design string: " << design_char << std::endl; - std::cout << "[WORKER] Here's the copied top string: " << top_proc_char << std::endl; - usleep(10000000); - - return std::make_unique(); // wait for either the executing process to finish or // for the condition variable indicating the process should be stopped + std::unique_ptr result; + + // we will need pipes to communicate to actsim + // these macros will just make our life easier + #define READ_END 0 + #define WRITE_END 1 + int stdin_pipe[2]; int stdout_pipe[2]; int stderr_pipe[2]; - pipe(stdin_pipe); - pipe(stdout_pipe); - pipe(stderr_pipe); + // the pipe creation code is a lot of repetitive stuff since we need to check for any errors along the line + // don't be scared about the pipe_error thing, that is just some uhhh... code cleanup + + // Pipe creation needs some error handling just in case + if (pipe(stdin_pipe) < 0) { + return pipe_error(finished); + } + if (pipe(stdout_pipe) < 0) { + return pipe_error(finished); + } + if (pipe(stderr_pipe) < 0) { + return pipe_error(finished); + } + + // our side needs nonblocking access to the pipes + if (fcntl(stdin_pipe[WRITE_END], F_SETFL, O_NONBLOCK) < 0) { + return pipe_error(finished); + } + if (fcntl(stdout_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) { + return pipe_error(finished); + } + if (fcntl(stderr_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) { + return pipe_error(finished); + } + + pid_t pid; + // Time to have an identity crisis. // I'd like to see this as one of those comic tropes; person gets split into // a good and the bad self. Like the normal one and the evil clone. + + /* + * Warning! + * + * Forking a multi-threaded program is *very dangerous!* The fork call will only clone this one + * thread! This also means that if there is any shared resource with other threads, and they + * were currently in a critical phase, you *will not gain access to them again!* + * There are much smarter ways to do this! + * + * Additionally, dynamically allocated memory will be duplicated as well! Beware of memory leaks! + * + * The only reason we can do this here is as follows: + * 1. There is no dynamic memory in this thread (everything is references) + * 2. We are not accessing the threading interface in the child after this point + * 3. All data used in the child after this point is local to this function + * 4. As soon as something goes wrong in the child process, it terminates + * 5. Even if some memory is duplicated, the process will eventually terminate + * and the memory will not leak + * 6. Signal handlers (like the one installed by the main thread) are process wide but will be + * replaced when execv() loads the image of the new binary + * + * As you can see, you better be aware of the consequences of this function call. + * + * If you really need do to something like this, either really know what you're doing or use + * things like std::async or std::future. + * + * Proceed with caution! + * + */ + pid = fork(); + // Which side are we? The normal or the evil clone? - if (!fork()) { + if (pid == 0) { // Oh no, we're the evil clone! // Just kidding, we're the child process + // from now on we can only assume to have access to function local data + // only this thread was duplicated by fork() + // redirect stdout, stdin, and stderr - dup2(stdin_pipe[0], STDIN_FILENO); - dup2(stdout_pipe[1], STDOUT_FILENO); - dup2(stderr_pipe[1], STDERR_FILENO); + // errors in here will just kill the program since different process at this point + if (dup2(stdin_pipe[READ_END], STDIN_FILENO) < 0) { + std::cerr << "Error: Pipe redirect failed." << std::endl; + exit(1); + } + + if (dup2(stdout_pipe[WRITE_END], STDOUT_FILENO) < 0) { + std::cerr << "Error: Pipe redirect failed." << std::endl; + exit(1); + } + + if (dup2(stderr_pipe[WRITE_END], STDERR_FILENO) < 0) { + std::cerr << "Error: Pipe redirect failed." << std::endl; + exit(1); + } // close all the initial file descriptors so actsim (which we're about to call) // doesn't know what's going on - close(stdin_pipe[0]); - close(stdin_pipe[1]); - close(stdout_pipe[0]); - close(stdout_pipe[1]); - close(stderr_pipe[0]); - close(stderr_pipe[1]); + if (close(stdin_pipe[READ_END]) < 0) { + std::cerr << "Error: Closing pipe end failed." << std::endl; + } + + if (close(stdin_pipe[WRITE_END]) < 0) { + std::cerr << "Error: Closing pipe end failed." << std::endl; + } + + if (close(stdout_pipe[READ_END]) < 0) { + std::cerr << "Error: Closing pipe end failed." << std::endl; + } + + if (close(stdout_pipe[WRITE_END]) < 0) { + std::cerr << "Error: Closing pipe end failed." << std::endl; + } + + if (close(stderr_pipe[READ_END]) < 0) { + std::cerr << "Error: Closing pipe end failed." << std::endl; + } + + if (close(stderr_pipe[WRITE_END]) < 0) { + std::cerr << "Error: Closing pipe end failed." << std::endl; + } + + // I warned you about the syscall handling. A lot of error checks... // build the execution vector char* const argv[] = {"${ACT_HOME}/bin/actsim", design_char, top_proc_char}; @@ -170,16 +268,177 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas execv(argv[0], argv); // we shouldn't land here + std::cerr << "Error: Failed to execute actsim binary!" << std::endl; + exit(1); + + } else if (pid == -1) { + // Uh-oh looks like something went wrong in the cloning process + // This should stop the program gracefully, since we only had an issue in this + // thread. All the other threads should still be fine. + std::cerr << "Error: Worker thread was not able to spawn actsim process!" << std::endl; + this->interface.stop(); + finished = false; + return nullptr; } else { - // stuff for the main process + // We're the good side (cue angelic music) + // This is the main process. + + // since we are the parent, we still have access to everything + + // Close all the child process facing pipe ends + // since this is the parent process, we have to do all the stuff we did before + if (close(stdin_pipe[READ_END]) < 0) { + return pipe_error(finished); + } + + if (close(stdout_pipe[WRITE_END]) < 0) { + return pipe_error(finished); + } + + if (close(stderr_pipe[WRITE_END]) < 0) { + return pipe_error(finished); + } + + + // create the output artifact + result = std::make_unique( + task->id, + task->source_pass, + task->target_artifact, + task->design, + task->source_config + ); + + std::vector& commands = task->get_content()[0].commands; + size_t command_n = 0; + size_t last_pos = 0; + + std::string stdout_buf; + std::string stderr_buf; + + // POSIX says a pipe has a buffer of at least 512 bytes + size_t rem_pipe_capacity = 512; + + bool stdout_closed = false, stderr_closed = false; + + // replace with actual read condition + while (true) { + + // if we haven't yet sent all commands, try to send one + if (command_n < commands.size()) { + std::string& cur_command = commands[command_n]; + + const char* command_buffer = (cur_command.substr(last_pos, cur_command.length()) + "\n").c_str(); + size_t command_length = commands[command_n].length() + 1; + + // make sure we don't send more than the pipe can actually hold + if (rem_pipe_capacity < command_length) { + last_pos = last_pos + rem_pipe_capacity; + rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, rem_pipe_capacity); + } else { + rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, command_length); + last_pos = 0; + ++command_n; + } + } + + // while the process is still going, read what is coming from the child process + int nread; + + #define RD_BUFSIZE 512 + char buf[512]; + + // read stdout + if (!stdout_closed) { + nread = read(stdout_pipe[READ_END], buf, RD_BUFSIZE); + + switch (nread) { + case -1: + // something went wrong reading from the pipe + std::cerr << "Error: Worker failed to read pipe from child process." << std::endl; + this->interface.stop(); + return nullptr; + + case 0: + // child process has closed the pipe + stdout_closed = true; + break; + + default: + // something was read from the pipe, parse into log + stdout_buf = stdout_buf + std::string(buf, buf + nread / sizeof buf[0]); + + // while there is a new line in there, keep parsing into the log output + auto pos = stdout_buf.find('\n'); + while (pos != std::string::npos) { + result->add_log_output(stdout_buf.substr(0, pos)); + + if ((pos + 1) < stdout_buf.length()) { + stdout_buf = stdout_buf.substr(pos+1, stdout_buf.length()); + } else { + stdout_buf = ""; + } + } + break; + } + } + + // read stderr + if (!stderr_closed) { + nread = read(stderr_pipe[READ_END], buf, RD_BUFSIZE); + + switch (nread) { + case -1: + // something went wrong reading from the pipe + std::cerr << "Error: Worker failed to read pipe from child process." << std::endl; + this->interface.stop(); + return nullptr; + + case 0: + // child process has closed the pipe + stderr_closed = true; + break; + + default: + // something was read from the pipe, parse into log + stderr_buf = stderr_buf + std::string(buf, buf + nread / sizeof buf[0]); + + // while there is a new line in there, keep parsing into the log output + auto pos = stderr_buf.find('\n'); + while (pos != std::string::npos) { + result->add_err_output(stderr_buf.substr(0, pos)); + + if ((pos + 1) < stderr_buf.length()) { + stderr_buf = stderr_buf.substr(pos+1, stderr_buf.length()); + } else { + stderr_buf = ""; + } + } + break; + } + } + + + // check if we need to interrupt the simulation + if (this->task_interrupted.load(std::memory_order_relaxed)) { + finished = false; + kill(pid, SIGKILL); + break; + } + + // check if the process has ended ie. all pipes closed + if (stdout_closed && stderr_closed) { + finished = true; + break; + } + } + } delete design_char; delete top_proc_char; -} - -void Worker::run_actsim() { + return std::move(result); }