From 3d704c7e73fa9ea204a196b503ff0ff2995822c3 Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Fri, 3 Jan 2025 12:58:53 +0100 Subject: [PATCH] streamline code and fix running out of file descriptors --- include/actsim_agent/worker.hpp | 2 - src/actsim_agent/worker.cpp | 78 +++++++++++++++++++++------------ 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/include/actsim_agent/worker.hpp b/include/actsim_agent/worker.hpp index 5d17488..13c3f75 100644 --- a/include/actsim_agent/worker.hpp +++ b/include/actsim_agent/worker.hpp @@ -48,8 +48,6 @@ class Worker { void thread_run(); std::unique_ptr perform_task(std::unique_ptr& task, bool& finished); - std::unique_ptr pipe_error(bool& finished); - std::unique_ptr worker_thread; std::atomic current_task; diff --git a/src/actsim_agent/worker.cpp b/src/actsim_agent/worker.cpp index 63f6302..a6b3b52 100644 --- a/src/actsim_agent/worker.cpp +++ b/src/actsim_agent/worker.cpp @@ -23,6 +23,8 @@ ************************************************************************** */ +#include +#include #include #include #include @@ -124,12 +126,6 @@ void Worker::thread_run() { } } -inline std::unique_ptr Worker::pipe_error(bool& finished) { - std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl; - finished = false; - return nullptr; -} - std::unique_ptr Worker::perform_task(std::unique_ptr& task, bool& finished) { if (task->get_content().size() != 1) { @@ -166,26 +162,41 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas // Pipe creation needs some error handling just in case if (pipe(stdin_pipe) < 0) { - return pipe_error(finished); + std::cerr << "Error: Pipe creation failed for stdin pipe. " << strerror(errno) << std::endl; + finished = false; + return nullptr; } if (pipe(stdout_pipe) < 0) { - return pipe_error(finished); + std::cerr << "Error: Pipe creation failed for stdout pipe. " << strerror(errno) << std::endl; + finished = false; + return nullptr; } if (pipe(stderr_pipe) < 0) { - return pipe_error(finished); + std::cerr << "Error: Pipe creation failed for stderr pipe. " << strerror(errno) << std::endl; + finished = false; + return nullptr; } // our side needs nonblocking access to the pipes if (fcntl(stdin_pipe[WRITE_END], F_SETFL, O_NONBLOCK) < 0) { - return pipe_error(finished); + std::cerr << "Error: Could not set stdin pipe to nonblocking. " << strerror(errno) << std::endl; + finished = false; + return nullptr; } if (fcntl(stdout_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) { - return pipe_error(finished); + std::cerr << "Error: Could not set stdout pipe to nonblocking. " << strerror(errno) << std::endl; + finished = false; + return nullptr; } if (fcntl(stderr_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) { - return pipe_error(finished); + std::cerr << "Error: Could not set stderr pipe to nonblocking. " << strerror(errno) << std::endl; + finished = false; + return nullptr; } + + DEBUG_PRINT("Starting simulator..."); + pid_t pid; @@ -306,19 +317,15 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas // Close all the child process facing pipe ends // since this is the parent process, we have to do all the stuff we did before - if (close(stdin_pipe[READ_END]) < 0) { - return pipe_error(finished); + if (close(stdin_pipe[READ_END]) < 0 || + close(stdout_pipe[WRITE_END]) < 0 || + close(stderr_pipe[WRITE_END]) < 0 + ) { + std::cerr << "Error: Could not close parent facing pipe ends. " << strerror(errno) << std::endl; + finished = false; + return nullptr; } - if (close(stdout_pipe[WRITE_END]) < 0) { - return pipe_error(finished); - } - - if (close(stderr_pipe[WRITE_END]) < 0) { - return pipe_error(finished); - } - - // create the output artifact result = std::make_unique( task->id, @@ -362,15 +369,15 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas if (command_n < commands.size()) { std::string& cur_command = commands[command_n]; - const char* command_buffer = (cur_command.substr(last_pos, cur_command.length()) + "\n").c_str(); - size_t command_length = commands[command_n].length() + 1; + auto remaining_command = cur_command.substr(last_pos, cur_command.length()) + "\n"; + size_t command_length = remaining_command.length(); // make sure we don't send more than the pipe can actually hold if (rem_pipe_capacity < command_length) { last_pos = last_pos + rem_pipe_capacity; - rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, rem_pipe_capacity); + rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), rem_pipe_capacity); } else { - rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, command_length); + rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), command_length); last_pos = 0; ++command_n; } @@ -490,8 +497,25 @@ std::unique_ptr Worker::perform_task(std::unique_ptr& tas finished = true; break; } + + // check if we need to abort due to a busy deadlock + if (parser->check_busy_deadlock()) { + finished = true; + kill(pid, SIGKILL); + break; + } } + parser->finalize(); + + } + + // Close all the remaining pipes + if (close(stdin_pipe[WRITE_END]) < 0 || + close(stdout_pipe[READ_END]) < 0 || + close(stderr_pipe[READ_END]) < 0 + ) { + std::cerr << "Error: Could not close child facing pipe ends. " << strerror(errno) << std::endl; } delete[] design_char;