fixed so many issues; memory leak, actsim binary not called, error when pipes just nonblocking, queued for upload even when task failed, debug prints

This commit is contained in:
Fabian Posch 2024-01-17 17:06:37 -05:00
parent 2a66beb6db
commit 10ee8e5960

View file

@ -23,10 +23,14 @@
**************************************************************************
*/
#define DEBUG
#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <errno.h>
#include <cstdlib>
#include "util.h"
#include "worker.hpp"
@ -39,10 +43,7 @@ void Worker::start() {
}
void Worker::cancel_current() {
std::cout << "[WORKER] Current simulation cancelled." << std::endl;
this->task_interrupted.store(true, std::memory_order_relaxed);
// set a condition variable and notify
// must not be blocking
}
void Worker::join() {
@ -56,6 +57,8 @@ void Worker::thread_run() {
// this blocks until either a new task is available or the program was closed
this->interface.wait_for_fresh();
DEBUG_PRINT("Worker thread woken up");
// so first we check if we should still be running
if (!this->interface.running()) break;
@ -67,21 +70,22 @@ void Worker::thread_run() {
// we need to make sure the queue wasn't emptied between waiting and getting new data
if (queue_empty) continue;
DEBUG_PRINT("Worker has dequeued new task");
// get the design this task uses; we'll need that later
auto design = task->design;
// everything is good, perform the given task
bool complete;
auto output = this->perform_task(task, complete);
// testing code
complete = !this->task_interrupted.load(std::memory_order_relaxed);
this->task_interrupted.store(false, std::memory_order_relaxed);
DEBUG_PRINT("Execution of task has ended");
// if the task was finished, push the task to be uploaded
// we need this since the task might have been interrupted
// half way though
if (complete) {
DEBUG_PRINT("Task " + db::to_string(output->id) + " was completed and scheduled for upload");
this->interface.push_finished(std::move(output));
// if this succeeded, we can decrease the number of
@ -89,18 +93,20 @@ void Worker::thread_run() {
this->interface.decrement_design(design);
} else {
// if the task was not completed and we have reached the end of execution
// the tasks have to be reopened in the database; the download thread
// will handle everything remaining in the buffer, so this is where our
// unfinished tasks goes back into
if (!this->interface.running()) {
this->interface.push_fresh(std::move(task));
} else {
// there are two possible reasons the task was not finished
if (this->task_interrupted.load(std::memory_order_relaxed)) {
DEBUG_PRINT("Task was interrupted by external trigger");
// the only other reason this could have failed is that we got
// interrupted since the corresponding task was halted; in this case
// we got interrupted since, the current task was halted; in this case
// we only wanna decrease our reference counter
this->interface.decrement_design(task->design);
this->task_interrupted.store(false, std::memory_order_relaxed);
} else {
DEBUG_PRINT("Something went wrong during task execution");
// something else went wrong
// we have to stop this agent
this->interface.stop();
this->interface.push_fresh(std::move(task));
}
}
}
@ -108,14 +114,11 @@ void Worker::thread_run() {
inline std::unique_ptr<OutputType> Worker::pipe_error(bool& finished) {
std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl;
this->interface.stop();
finished = false;
return nullptr;
}
std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& task, bool& finished) {
std::cout << "[WORKER] Worker performed task. Please implement me!" << std::endl;
finished = true;
if (task->get_content().size() != 1) {
std::cerr << "Error: Simulation configuration in worker thread has more than one testcases to run!" << std::endl;
@ -219,57 +222,60 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// redirect stdout, stdin, and stderr
// errors in here will just kill the program since different process at this point
if (dup2(stdin_pipe[READ_END], STDIN_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed." << std::endl;
std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl;
exit(1);
}
if (dup2(stdout_pipe[WRITE_END], STDOUT_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed." << std::endl;
std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl;
exit(1);
}
if (dup2(stderr_pipe[WRITE_END], STDERR_FILENO) < 0) {
std::cerr << "Error: Pipe redirect failed." << std::endl;
std::cerr << "Error: Pipe redirect failed. " << strerror(errno) << std::endl;
exit(1);
}
// close all the initial file descriptors so actsim (which we're about to call)
// doesn't know what's going on
if (close(stdin_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stdin_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stdout_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stdout_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stderr_pipe[READ_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
if (close(stderr_pipe[WRITE_END]) < 0) {
std::cerr << "Error: Closing pipe end failed." << std::endl;
std::cerr << "Error: Closing pipe end failed. " << strerror(errno) << std::endl;
}
// I warned you about the syscall handling. A lot of error checks...
// build the execution vector
char bin[] = "${ACT_HOME}/bin/actsim";
char* const argv[] = {bin, design_char, top_proc_char};
auto bin_str = std::string(std::getenv("ACT_HOME")) + "/bin/actsim";
char* bin = new char[bin_str.length() + 1];
std::strcpy(bin, bin_str.c_str());
char* const argv[] = {bin, design_char, top_proc_char, (char*)0};
// and call actsim
execv(argv[0], argv);
// we shouldn't land here
std::cerr << "Error: Failed to execute actsim binary!" << std::endl;
std::cerr << "Error: Failed to execute actsim binary! " << strerror(errno) << std::endl;
exit(1);
} else if (pid == -1) {
@ -277,7 +283,6 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// This should stop the program gracefully, since we only had an issue in this
// thread. All the other threads should still be fine.
std::cerr << "Error: Worker thread was not able to spawn actsim process!" << std::endl;
this->interface.stop();
finished = false;
return nullptr;
@ -323,6 +328,12 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
bool stdout_closed = false, stderr_closed = false;
// while the process is still going, read what is coming from the child process
int nread;
#define RD_BUFSIZE 512
char buf[512];
// replace with actual read condition
while (true) {
@ -344,11 +355,6 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
}
}
// while the process is still going, read what is coming from the child process
int nread;
#define RD_BUFSIZE 512
char buf[512];
// read stdout
if (!stdout_closed) {
@ -356,13 +362,25 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
switch (nread) {
case -1:
// This could simply happen when the pipe is empty, ignore that
if (errno == EWOULDBLOCK) {
break;
}
// something went wrong reading from the pipe
std::cerr << "Error: Worker failed to read pipe from child process." << std::endl;
this->interface.stop();
std::cerr << "Error: Worker failed to read pipe from child process. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
case 0:
// child process has closed the pipe
// make sure any remaining output is added to the log
if (stdout_buf != "") {
result->add_log_output(stdout_buf);
}
DEBUG_PRINT("STDOUT was closed by child");
stdout_closed = true;
break;
@ -373,6 +391,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// while there is a new line in there, keep parsing into the log output
auto pos = stdout_buf.find('\n');
while (pos != std::string::npos) {
DEBUG_PRINT("Log output line was added");
result->add_log_output(stdout_buf.substr(0, pos));
if ((pos + 1) < stdout_buf.length()) {
@ -380,6 +399,8 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
} else {
stdout_buf = "";
}
pos = stdout_buf.find('\n');
}
break;
}
@ -391,13 +412,25 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
switch (nread) {
case -1:
// This could simply happen when the pipe is empty, ignore that
if (errno == EWOULDBLOCK) {
break;
}
// something went wrong reading from the pipe
std::cerr << "Error: Worker failed to read pipe from child process." << std::endl;
this->interface.stop();
std::cerr << "Error: Worker failed to read pipe from child process. " << strerror(errno) << std::endl;
finished = false;
return nullptr;
case 0:
// child process has closed the pipe
// make sure any remaining output is added to the log
if (stderr_buf != "") {
result->add_err_output(stderr_buf);
}
DEBUG_PRINT("STDERR was closed by child");
stderr_closed = true;
break;
@ -408,6 +441,7 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
// while there is a new line in there, keep parsing into the log output
auto pos = stderr_buf.find('\n');
while (pos != std::string::npos) {
DEBUG_PRINT("Error output line was added");
result->add_err_output(stderr_buf.substr(0, pos));
if ((pos + 1) < stderr_buf.length()) {
@ -415,6 +449,8 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
} else {
stderr_buf = "";
}
pos = stderr_buf.find('\n');
}
break;
}