diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index 58d266e..be72cb8 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -23,10 +23,14 @@ ************************************************************************** */ +#define DEBUG + #include #include #include #include +#include +#include #include "util.h" #include "worker.hpp" @@ -39,10 +43,7 @@ void Worker::start() { } void Worker::cancel_current() { - std::cout << "[WORKER] Current simulation cancelled." << std::endl; this->task_interrupted.store(true, std::memory_order_relaxed); - // set a condition variable and notify - // must not be blocking } void Worker::join() { @@ -56,6 +57,8 @@ void Worker::thread_run() { // this blocks until either a new task is available or the program was closed this->interface.wait_for_fresh(); + DEBUG_PRINT("Worker thread woken up"); + // so first we check if we should still be running if (!this->interface.running()) break; @@ -67,21 +70,22 @@ void Worker::thread_run() { // we need to make sure the queue wasn't emptied between waiting and getting new data if (queue_empty) continue; + DEBUG_PRINT("Worker has dequeued new task"); + // get the design this task uses; we'll need that later auto design = task->design; // everything is good, perform the given task bool complete; auto output = this->perform_task(task, complete); - - // testing code - complete = !this->task_interrupted.load(std::memory_order_relaxed); - this->task_interrupted.store(false, std::memory_order_relaxed); + DEBUG_PRINT("Execution of task has ended"); + // if the task was finished, push the task to be uploaded // we need this since the task might have been interrupted // half way though if (complete) { + DEBUG_PRINT("Task " + db::to_string(output->id) + " was completed and scheduled for upload"); this->interface.push_finished(std::move(output)); // if this succeeded, we can decrease the number of @@ -89,18 +93,20 @@ void Worker::thread_run() { this->interface.decrement_design(design); } else { - // if the task was not completed and we have reached the end of execution - // the tasks have to be reopened in the database; the download thread - // will handle everything remaining in the buffer, so this is where our - // unfinished tasks goes back into - if (!this->interface.running()) { - this->interface.push_fresh(std::move(task)); - } else { + // there are two possible reasons the task was not finished + if (this->task_interrupted.load(std::memory_order_relaxed)) { + DEBUG_PRINT("Task was interrupted by external trigger"); - // the only other reason this could have failed is that we got - // interrupted since the corresponding task was halted; in this case + // we got interrupted since, the current task was halted; in this case // we only wanna decrease our reference counter this->interface.decrement_design(task->design); + this->task_interrupted.store(false, std::memory_order_relaxed); + } else { + DEBUG_PRINT("Something went wrong during task execution"); + // something else went wrong + // we have to stop this agent + this->interface.stop(); + this->interface.push_fresh(std::move(task)); } } } @@ -108,14 +114,11 @@ void Worker::thread_run() { inline std::unique_ptr Worker::pipe_error(bool& finished) { std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl; - this->interface.stop(); finished = false; return nullptr; } std::unique_ptr Worker::perform_task(std::unique_ptr& task, bool& finished) { - std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl; - finished = true; if (task->get_content().size() != 1) { std::cerr << "Error: Simulation configuration in worker thread has more than one testcases to run!" << std::endl; @@ -219,57 +222,60 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas // redirect stdout, stdin, and stderr // errors in here will just kill the program since different process at this point if (dup2(stdin_pipe[READ_END], STDIN_FILENO) < 0) { - std::cerr << "Error: Pipe redirect failed." << std::endl; + std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl; exit(1); } if (dup2(stdout_pipe[WRITE_END], STDOUT_FILENO) < 0) { - std::cerr << "Error: Pipe redirect failed." << std::endl; + std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl; exit(1); } if (dup2(stderr_pipe[WRITE_END], STDERR_FILENO) < 0) { - std::cerr << "Error: Pipe redirect failed." << std::endl; + std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl; exit(1); } // close all the initial file descriptors so actsim (which we're about to call) // doesn't know what's going on if (close(stdin_pipe[READ_END]) < 0) { - std::cerr << "Error: Closing pipe end failed." << std::endl; + std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl; } if (close(stdin_pipe[WRITE_END]) < 0) { - std::cerr << "Error: Closing pipe end failed." << std::endl; + std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl; } if (close(stdout_pipe[READ_END]) < 0) { - std::cerr << "Error: Closing pipe end failed." << std::endl; + std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl; } if (close(stdout_pipe[WRITE_END]) < 0) { - std::cerr << "Error: Closing pipe end failed." << std::endl; + std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl; } if (close(stderr_pipe[READ_END]) < 0) { - std::cerr << "Error: Closing pipe end failed." << std::endl; + std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl; } if (close(stderr_pipe[WRITE_END]) < 0) { - std::cerr << "Error: Closing pipe end failed." << std::endl; + std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl; } // I warned you about the syscall handling. A lot of error checks... // build the execution vector - char bin[] = "${ACT_HOME}/bin/actsim"; - char* const argv[] = {bin, design_char, top_proc_char}; + auto bin_str = std::string(std::getenv("ACT_HOME")) + "/bin/actsim"; + char* bin = new char[bin_str.length() + 1]; + std::strcpy(bin, bin_str.c_str()); + + char* const argv[] = {bin, design_char, top_proc_char, (char*)0}; // and call actsim execv(argv[0], argv); // we shouldn't land here - std::cerr << "Error: Failed to execute actsim binary!" << std::endl; + std::cerr << "Error: Failed to execute actsim binary! " << strerror(errno) << std::endl; exit(1); } else if (pid == -1) { @@ -277,7 +283,6 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas // This should stop the program gracefully, since we only had an issue in this // thread. All the other threads should still be fine. std::cerr << "Error: Worker thread was not able to spawn actsim process!" << std::endl; - this->interface.stop(); finished = false; return nullptr; @@ -323,6 +328,12 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas bool stdout_closed = false, stderr_closed = false; + // while the process is still going, read what is coming from the child process + int nread; + + #define RD_BUFSIZE 512 + char buf[512]; + // replace with actual read condition while (true) { @@ -344,11 +355,6 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas } } - // while the process is still going, read what is coming from the child process - int nread; - - #define RD_BUFSIZE 512 - char buf[512]; // read stdout if (!stdout_closed) { @@ -356,13 +362,25 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas switch (nread) { case -1: + // This could simply happen when the pipe is empty, ignore that + if (errno == EWOULDBLOCK) { + break; + } + // something went wrong reading from the pipe - std::cerr << "Error: Worker failed to read pipe from child process." << std::endl; - this->interface.stop(); + std::cerr << "Error: Worker failed to read pipe from child process. " << strerror(errno) << std::endl; + finished = false; return nullptr; case 0: // child process has closed the pipe + + // make sure any remaining output is added to the log + if (stdout_buf != "") { + result->add_log_output(stdout_buf); + } + + DEBUG_PRINT("STDOUT was closed by child"); stdout_closed = true; break; @@ -373,6 +391,7 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas // while there is a new line in there, keep parsing into the log output auto pos = stdout_buf.find('\n'); while (pos != std::string::npos) { + DEBUG_PRINT("Log output line was added"); result->add_log_output(stdout_buf.substr(0, pos)); if ((pos + 1) < stdout_buf.length()) { @@ -380,6 +399,8 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas } else { stdout_buf = ""; } + + pos = stdout_buf.find('\n'); } break; } @@ -391,13 +412,25 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas switch (nread) { case -1: + // This could simply happen when the pipe is empty, ignore that + if (errno == EWOULDBLOCK) { + break; + } + // something went wrong reading from the pipe - std::cerr << "Error: Worker failed to read pipe from child process." << std::endl; - this->interface.stop(); + std::cerr << "Error: Worker failed to read pipe from child process. " << strerror(errno) << std::endl; + finished = false; return nullptr; case 0: // child process has closed the pipe + + // make sure any remaining output is added to the log + if (stderr_buf != "") { + result->add_err_output(stderr_buf); + } + + DEBUG_PRINT("STDERR was closed by child"); stderr_closed = true; break; @@ -408,6 +441,7 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas // while there is a new line in there, keep parsing into the log output auto pos = stderr_buf.find('\n'); while (pos != std::string::npos) { + DEBUG_PRINT("Error output line was added"); result->add_err_output(stderr_buf.substr(0, pos)); if ((pos + 1) < stderr_buf.length()) { @@ -415,6 +449,8 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas } else { stderr_buf = ""; } + + pos = stderr_buf.find('\n'); } break; }