streamline code and fix running out of file descriptors
This commit is contained in:
parent
a9aff14f82
commit
3d704c7e73
2 changed files with 51 additions and 29 deletions
|
|
@ -48,8 +48,6 @@ class Worker {
|
|||
void thread_run();
|
||||
std::unique_ptr<OutputType> perform_task(std::unique_ptr<InputType>& task, bool& finished);
|
||||
|
||||
std::unique_ptr<OutputType> pipe_error(bool& finished);
|
||||
|
||||
std::unique_ptr<std::thread> worker_thread;
|
||||
std::atomic<db::uuid_t> current_task;
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@
|
|||
**************************************************************************
|
||||
*/
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
|
|
@ -124,12 +126,6 @@ void Worker::thread_run() {
|
|||
}
|
||||
}
|
||||
|
||||
inline std::unique_ptr<OutputType> Worker::pipe_error(bool& finished) {
|
||||
std::cerr << "Error: Pipe creation failed. No actsim process can be spawned." << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& task, bool& finished) {
|
||||
|
||||
if (task->get_content().size() != 1) {
|
||||
|
|
@ -166,26 +162,41 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
|
|||
|
||||
// Pipe creation needs some error handling just in case
|
||||
if (pipe(stdin_pipe) < 0) {
|
||||
return pipe_error(finished);
|
||||
std::cerr << "Error: Pipe creation failed for stdin pipe. " << strerror(errno) << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
if (pipe(stdout_pipe) < 0) {
|
||||
return pipe_error(finished);
|
||||
std::cerr << "Error: Pipe creation failed for stdout pipe. " << strerror(errno) << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
if (pipe(stderr_pipe) < 0) {
|
||||
return pipe_error(finished);
|
||||
std::cerr << "Error: Pipe creation failed for stderr pipe. " << strerror(errno) << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// our side needs nonblocking access to the pipes
|
||||
if (fcntl(stdin_pipe[WRITE_END], F_SETFL, O_NONBLOCK) < 0) {
|
||||
return pipe_error(finished);
|
||||
std::cerr << "Error: Could not set stdin pipe to nonblocking. " << strerror(errno) << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
if (fcntl(stdout_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
|
||||
return pipe_error(finished);
|
||||
std::cerr << "Error: Could not set stdout pipe to nonblocking. " << strerror(errno) << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
if (fcntl(stderr_pipe[READ_END], F_SETFL, O_NONBLOCK) < 0) {
|
||||
return pipe_error(finished);
|
||||
std::cerr << "Error: Could not set stderr pipe to nonblocking. " << strerror(errno) << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
DEBUG_PRINT("Starting simulator...");
|
||||
|
||||
pid_t pid;
|
||||
|
||||
|
||||
|
|
@ -306,19 +317,15 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
|
|||
|
||||
// Close all the child process facing pipe ends
|
||||
// since this is the parent process, we have to do all the stuff we did before
|
||||
if (close(stdin_pipe[READ_END]) < 0) {
|
||||
return pipe_error(finished);
|
||||
if (close(stdin_pipe[READ_END]) < 0 ||
|
||||
close(stdout_pipe[WRITE_END]) < 0 ||
|
||||
close(stderr_pipe[WRITE_END]) < 0
|
||||
) {
|
||||
std::cerr << "Error: Could not close parent facing pipe ends. " << strerror(errno) << std::endl;
|
||||
finished = false;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (close(stdout_pipe[WRITE_END]) < 0) {
|
||||
return pipe_error(finished);
|
||||
}
|
||||
|
||||
if (close(stderr_pipe[WRITE_END]) < 0) {
|
||||
return pipe_error(finished);
|
||||
}
|
||||
|
||||
|
||||
// create the output artifact
|
||||
result = std::make_unique<OutputType>(
|
||||
task->id,
|
||||
|
|
@ -362,15 +369,15 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
|
|||
if (command_n < commands.size()) {
|
||||
std::string& cur_command = commands[command_n];
|
||||
|
||||
const char* command_buffer = (cur_command.substr(last_pos, cur_command.length()) + "\n").c_str();
|
||||
size_t command_length = commands[command_n].length() + 1;
|
||||
auto remaining_command = cur_command.substr(last_pos, cur_command.length()) + "\n";
|
||||
size_t command_length = remaining_command.length();
|
||||
|
||||
// make sure we don't send more than the pipe can actually hold
|
||||
if (rem_pipe_capacity < command_length) {
|
||||
last_pos = last_pos + rem_pipe_capacity;
|
||||
rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, rem_pipe_capacity);
|
||||
rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), rem_pipe_capacity);
|
||||
} else {
|
||||
rem_pipe_capacity = write(stdin_pipe[WRITE_END], command_buffer, command_length);
|
||||
rem_pipe_capacity = write(stdin_pipe[WRITE_END], remaining_command.c_str(), command_length);
|
||||
last_pos = 0;
|
||||
++command_n;
|
||||
}
|
||||
|
|
@ -490,8 +497,25 @@ std::unique_ptr<OutputType> Worker::perform_task(std::unique_ptr<InputType>& tas
|
|||
finished = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// check if we need to abort due to a busy deadlock
|
||||
if (parser->check_busy_deadlock()) {
|
||||
finished = true;
|
||||
kill(pid, SIGKILL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
parser->finalize();
|
||||
|
||||
}
|
||||
|
||||
// Close all the remaining pipes
|
||||
if (close(stdin_pipe[WRITE_END]) < 0 ||
|
||||
close(stdout_pipe[READ_END]) < 0 ||
|
||||
close(stderr_pipe[READ_END]) < 0
|
||||
) {
|
||||
std::cerr << "Error: Could not close child facing pipe ends. " << strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
delete[] design_char;
|
||||
|
|
|
|||
Loading…
Reference in a new issue