/************************************************************************* * * This file is part of the ACT library * * Copyright (c) 2024 Fabian Posch * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. * ************************************************************************** */ #include #include #include #include #include "util.h" #include "worker.hpp" Worker::Worker(TaskInterface& interface) : interface(interface) {} void Worker::start() { DEBUG_PRINT("Starting worker thread..."); this->task_interrupted.store(false, std::memory_order_relaxed); this->worker_thread = std::make_unique([this] () { this->thread_run(); }); } void Worker::cancel_current() { std::cout << "[WORKER] Current simulation cancelled." << std::endl; this->task_interrupted.store(true, std::memory_order_relaxed); // set a condition variable and notify // must not be blocking } void Worker::join() { if (this->worker_thread == nullptr) return; this->worker_thread->join(); } void Worker::thread_run() { while (this->interface.running()) { // this blocks until either a new task is available or the program was closed this->interface.wait_for_fresh(); // so first we check if we should still be running if (!this->interface.running()) break; // we're still good to go! get a task from the fresh queue bool queue_empty; auto task = this->interface.pop_fresh(queue_empty); this->current_task.store(task->id, std::memory_order_relaxed); // we need to make sure the queue wasn't emptied between waiting and getting new data if (queue_empty) continue; // get the design this task uses; we'll need that later auto design = task->design; // everything is good, perform the given task bool complete; auto output = this->perform_task(task, complete); // testing code complete = !this->task_interrupted.load(std::memory_order_relaxed); this->task_interrupted.store(false, std::memory_order_relaxed); // if the task was finished, push the task to be uploaded // we need this since the task might have been interrupted // half way though if (complete) { this->interface.push_finished(std::move(output)); // if this succeeded, we can decrease the number of // tasks that require the design we needed for this task this->interface.decrement_design(design); } else { // if the task was not completed and we have reached the end of execution // the tasks have to be reopened in the database; the download thread // will handle everything remaining in the buffer, so this is where our // unfinished tasks goes back into if (!this->interface.running()) { this->interface.push_fresh(std::move(task)); } else { // the only other reason this could have failed is that we got // interrupted since the corresponding task was halted; in this case // we only wanna decrease our reference counter this->interface.decrement_design(task->design); } } } } inline std::unique_ptr Worker::pipe_error(bool& finished) { std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl; this->interface.stop(); finished = false; return nullptr; } std::unique_ptr Worker::perform_task(std::unique_ptr& task, bool& finished) { std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl; finished = true; if (task->get_content().size() != 1) { std::cerr << "Error: Simulation configuration in worker thread has more than one testcases to run!" << std::endl; finished = false; return std::make_unique(); } auto testcase = task->get_content().front(); // execv expects mutable char*, so we have to copy our stuff first auto design = this->interface.get_design(task->design); char *design_char = new char[design.length()+1]; std::strcpy(design_char, design.c_str()); char *top_proc_char = new char[testcase.top.length() + 1]; std::strcpy(top_proc_char, testcase.top.c_str()); // wait for either the executing process to finish or // for the condition variable indicating the process should be stopped std::unique_ptr result; // we will need pipes to communicate to actsim // these macros will just make our life easier #define READ_END 0 #define WRITE_END 1 int stdin_pipe[2]; int stdout_pipe[2]; int stderr_pipe[2]; // the pipe creation code is a lot of repetitive stuff since we need to check for any errors along the line // don't be scared about the pipe_error thing, that is just some uhhh... code cleanup // Pipe creation needs some error handling just in case if (pipe(stdin_pipe) < 0) { return pipe_error(finished); } if (pipe(stdout_pipe) < 0) { return pipe_error(finished); } if (pipe(stderr_pipe) < 0) { return pipe_error(finished); } // our side needs nonblocking access to the pipes if (fcntl(stdin_pipe[WRITE_END], F_SETFL, O_NONBLOCK) < 0) { return pipe_error(finished); } if (fcntl(stdout_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) { return pipe_error(finished); } if (fcntl(stderr_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) { return pipe_error(finished); } pid_t pid; // Time to have an identity crisis. // I'd like to see this as one of those comic tropes; person gets split into // a good and the bad self. Like the normal one and the evil clone. /* * Warning! * * Forking a multi-threaded program is *very dangerous!* The fork call will only clone this one * thread! This also means that if there is any shared resource with other threads, and they * were currently in a critical phase, you *will not gain access to them again!* * There are much smarter ways to do this! * * Additionally, dynamically allocated memory will be duplicated as well! Beware of memory leaks! * * The only reason we can do this here is as follows: * 1. There is no dynamic memory in this thread (everything is references) * 2. We are not accessing the threading interface in the child after this point * 3. All data used in the child after this point is local to this function * 4. As soon as something goes wrong in the child process, it terminates * 5. Even if some memory is duplicated, the process will eventually terminate * and the memory will not leak * 6. Signal handlers (like the one installed by the main thread) are process wide but will be * replaced when execv() loads the image of the new binary * * As you can see, you better be aware of the consequences of this function call. * * If you really need do to something like this, either really know what you're doing or use * things like std::async or std::future. * * Proceed with caution! * */ pid = fork(); // Which side are we? The normal or the evil clone? if (pid == 0) { // Oh no, we're the evil clone! // Just kidding, we're the child process // from now on we can only assume to have access to function local data // only this thread was duplicated by fork() // redirect stdout, stdin, and stderr // errors in here will just kill the program since different process at this point if (dup2(stdin_pipe[READ_END], STDIN_FILENO) < 0) { std::cerr << "Error: Pipe redirect failed." << std::endl; exit(1); } if (dup2(stdout_pipe[WRITE_END], STDOUT_FILENO) < 0) { std::cerr << "Error: Pipe redirect failed." << std::endl; exit(1); } if (dup2(stderr_pipe[WRITE_END], STDERR_FILENO) < 0) { std::cerr << "Error: Pipe redirect failed." << std::endl; exit(1); } // close all the initial file descriptors so actsim (which we're about to call) // doesn't know what's going on if (close(stdin_pipe[READ_END]) < 0) { std::cerr << "Error: Closing pipe end failed." << std::endl; } if (close(stdin_pipe[WRITE_END]) < 0) { std::cerr << "Error: Closing pipe end failed." << std::endl; } if (close(stdout_pipe[READ_END]) < 0) { std::cerr << "Error: Closing pipe end failed." << std::endl; } if (close(stdout_pipe[WRITE_END]) < 0) { std::cerr << "Error: Closing pipe end failed." << std::endl; } if (close(stderr_pipe[READ_END]) < 0) { std::cerr << "Error: Closing pipe end failed." << std::endl; } if (close(stderr_pipe[WRITE_END]) < 0) { std::cerr << "Error: Closing pipe end failed." << std::endl; } // I warned you about the syscall handling. A lot of error checks... // build the execution vector char* const argv[] = {"${ACT_HOME}/bin/actsim", design_char, top_proc_char}; // and call actsim execv(argv[0], argv); // we shouldn't land here std::cerr << "Error: Failed to execute actsim binary!" << std::endl; exit(1); } else if (pid == -1) { // Uh-oh looks like something went wrong in the cloning process // This should stop the program gracefully, since we only had an issue in this // thread. All the other threads should still be fine. std::cerr << "Error: Worker thread was not able to spawn actsim process!" << std::endl; this->interface.stop(); finished = false; return nullptr; } else { // We're the good side (cue angelic music) // This is the main process. // since we are the parent, we still have access to everything // Close all the child process facing pipe ends // since this is the parent process, we have to do all the stuff we did before if (close(stdin_pipe[READ_END]) < 0) { return pipe_error(finished); } if (close(stdout_pipe[WRITE_END]) < 0) { return pipe_error(finished); } if (close(stderr_pipe[WRITE_END]) < 0) { return pipe_error(finished); } // create the output artifact result = std::make_unique( task->id, task->source_pass, task->target_artifact, task->design, task->source_config ); std::vector& commands = task->get_content()[0].commands; size_t command_n = 0; size_t last_pos = 0; std::string stdout_buf; std::string stderr_buf; // POSIX says a pipe has a buffer of at least 512 bytes size_t rem_pipe_capacity = 512; bool stdout_closed = false, stderr_closed = false; // replace with actual read condition while (true) { // if we haven't yet sent all commands, try to send one if (command_n < commands.size()) { std::string& cur_command = commands[command_n]; const char* command_buffer = (cur_command.substr(last_pos, cur_command.length()) + "\n").c_str(); size_t command_length = commands[command_n].length() + 1; // make sure we don't send more than the pipe can actually hold if (rem_pipe_capacity < command_length) { last_pos = last_pos + rem_pipe_capacity; rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, rem_pipe_capacity); } else { rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, command_length); last_pos = 0; ++command_n; } } // while the process is still going, read what is coming from the child process int nread; #define RD_BUFSIZE 512 char buf[512]; // read stdout if (!stdout_closed) { nread = read(stdout_pipe[READ_END], buf, RD_BUFSIZE); switch (nread) { case -1: // something went wrong reading from the pipe std::cerr << "Error: Worker failed to read pipe from child process." << std::endl; this->interface.stop(); return nullptr; case 0: // child process has closed the pipe stdout_closed = true; break; default: // something was read from the pipe, parse into log stdout_buf = stdout_buf + std::string(buf, buf + nread / sizeof buf[0]); // while there is a new line in there, keep parsing into the log output auto pos = stdout_buf.find('\n'); while (pos != std::string::npos) { result->add_log_output(stdout_buf.substr(0, pos)); if ((pos + 1) < stdout_buf.length()) { stdout_buf = stdout_buf.substr(pos+1, stdout_buf.length()); } else { stdout_buf = ""; } } break; } } // read stderr if (!stderr_closed) { nread = read(stderr_pipe[READ_END], buf, RD_BUFSIZE); switch (nread) { case -1: // something went wrong reading from the pipe std::cerr << "Error: Worker failed to read pipe from child process." << std::endl; this->interface.stop(); return nullptr; case 0: // child process has closed the pipe stderr_closed = true; break; default: // something was read from the pipe, parse into log stderr_buf = stderr_buf + std::string(buf, buf + nread / sizeof buf[0]); // while there is a new line in there, keep parsing into the log output auto pos = stderr_buf.find('\n'); while (pos != std::string::npos) { result->add_err_output(stderr_buf.substr(0, pos)); if ((pos + 1) < stderr_buf.length()) { stderr_buf = stderr_buf.substr(pos+1, stderr_buf.length()); } else { stderr_buf = ""; } } break; } } // check if we need to interrupt the simulation if (this->task_interrupted.load(std::memory_order_relaxed)) { finished = false; kill(pid, SIGKILL); break; } // check if the process has ended ie. all pipes closed if (stdout_closed && stderr_closed) { finished = true; break; } } } delete design_char; delete top_proc_char; return result; }