diff --git a/include/components/controller.hpp b/include/components/controller.hpp index 30958d9b..94c67888 100644 --- a/include/components/controller.hpp +++ b/include/components/controller.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include "common.hpp" #include "config.hpp" @@ -32,6 +33,7 @@ using modulemap_t = std::map>; // }}} +using std::thread; namespace chrono = std::chrono; using namespace std::chrono_literals; @@ -83,6 +85,8 @@ class controller : public signal_receiver m_fdevent_rd; unique_ptr m_fdevent_wr; + thread m_event_thread; + /** * @brief Controls weather the output gets printed to stdout */ diff --git a/src/components/controller.cpp b/src/components/controller.cpp index 54c2b0c2..6eb744d8 100644 --- a/src/components/controller.cpp +++ b/src/components/controller.cpp @@ -183,8 +183,15 @@ bool controller::run(bool writeback) { m_connection.flush(); + m_event_thread = thread(&controller::process_eventqueue, this); + read_events(); + if (m_event_thread.joinable()) { + m_queue.enqueue(make_quit_evt(static_cast(g_reload))); + m_event_thread.join(); + } + m_log.warn("Termination signal received, shutting down..."); return !g_reload; @@ -198,9 +205,9 @@ bool controller::enqueue(event&& evt) { m_log.warn("Failed to enqueue event"); return false; } - if (write(g_eventpipe[PIPE_WRITE], " ", 1) == -1) { - m_log.err("Failed to write to eventpipe (reason: %s)", strerror(errno)); - } + // if (write(g_eventpipe[PIPE_WRITE], " ", 1) == -1) { + // m_log.err("Failed to write to eventpipe (reason: %s)", strerror(errno)); + // } return true; } @@ -223,6 +230,8 @@ bool controller::enqueue(string&& input_data) { * Read events from configured file descriptors */ void controller::read_events() { + m_log.info("Entering event loop (thread-id=%lu)", this_thread::get_id()); + int fd_connection{m_connection.get_file_descriptor()}; int fd_confwatch{0}; int fd_ipc{0}; @@ -241,8 +250,6 @@ void controller::read_events() { fds.emplace_back((fd_ipc = m_ipc->get_file_descriptor())); } - m_sig.emit(sig_ev::process_update{make_update_evt(true)}); - while (!g_terminate) { fd_set readfds{}; FD_ZERO(&readfds); @@ -263,7 +270,6 @@ void controller::read_events() { // Process event on the internal fd if (m_fdevent_rd && FD_ISSET(*m_fdevent_rd, &readfds)) { - process_eventqueue(); char buffer[BUFSIZ]{'\0'}; if (read(*m_fdevent_rd, &buffer, BUFSIZ) == -1) { m_log.err("Failed to read from eventpipe (err: %s)", strerror(errno)); @@ -301,46 +307,53 @@ void controller::read_events() { } /** - * Dequeue items from the eventqueue + * Eventqueue worker loop */ void controller::process_eventqueue() { - event evt{}; + m_log.info("Eventqueue worker (thread-id=%lu)", this_thread::get_id()); - if (!m_queue.try_dequeue(evt)) { - return m_log.err("Failed to dequeue event"); - } + m_queue.enqueue(make_update_evt(true)); - if (evt.type == static_cast(event_type::INPUT)) { - process_inputdata(); - } else { - event next{}; - size_t swallowed{0}; - while (swallowed++ < m_swallow_limit && m_queue.wait_dequeue_timed(next, m_swallow_update)) { - if (next.type == static_cast(event_type::QUIT)) { - evt = next; - break; - } else if (next.type == static_cast(event_type::INPUT)) { - evt = next; - break; - } else if (evt.type != next.type) { - m_queue.try_enqueue(move(next)); - break; - } else { - m_log.trace_x("controller: Swallowing event within timeframe"); - evt = next; - } + while (!g_terminate) { + event evt{}; + m_queue.wait_dequeue(evt); + + if (g_terminate || evt.type == static_cast(event_type::QUIT)) { + break; } - if (evt.type == static_cast(event_type::UPDATE)) { - m_sig.emit(sig_ev::process_update{make_update_evt(evt.flag)}); - } else if (evt.type == static_cast(event_type::INPUT)) { + if (evt.type == static_cast(event_type::INPUT)) { process_inputdata(); - } else if (evt.type == static_cast(event_type::QUIT)) { - m_sig.emit(sig_ev::process_quit{make_quit_evt(evt.flag)}); - } else if (evt.type == static_cast(event_type::CHECK)) { - m_sig.emit(sig_ev::process_check{}); } else { - m_log.warn("Unknown event type for enqueued event (%d)", evt.type); + event next{}; + size_t swallowed{0}; + while (swallowed++ < m_swallow_limit && m_queue.wait_dequeue_timed(next, m_swallow_update)) { + if (next.type == static_cast(event_type::QUIT)) { + evt = next; + break; + } else if (next.type == static_cast(event_type::INPUT)) { + evt = next; + break; + } else if (evt.type != next.type) { + m_queue.try_enqueue(move(next)); + break; + } else { + m_log.trace_x("controller: Swallowing event within timeframe"); + evt = next; + } + } + + if (evt.type == static_cast(event_type::UPDATE)) { + m_sig.emit(sig_ev::process_update{make_update_evt(evt.flag)}); + } else if (evt.type == static_cast(event_type::INPUT)) { + process_inputdata(); + } else if (evt.type == static_cast(event_type::QUIT)) { + m_sig.emit(sig_ev::process_quit{make_quit_evt(evt.flag)}); + } else if (evt.type == static_cast(event_type::CHECK)) { + m_sig.emit(sig_ev::process_check{}); + } else { + m_log.warn("Unknown event type for enqueued event (%d)", evt.type); + } } } } @@ -514,7 +527,7 @@ bool controller::on(const sig_ev::process_check&) { * Process ui button press event */ bool controller::on(const sig_ui::button_press& evt) { - string input{*evt.data()}; + string input{*evt()}; if (input.empty()) { m_log.err("Cannot enqueue empty input"); @@ -529,8 +542,7 @@ bool controller::on(const sig_ui::button_press& evt) { * Process ipc action messages */ bool controller::on(const sig_ipc::process_action& evt) { - ipc_action a{*evt.data()}; - string action{a.payload}; + string action{(*evt()).payload}; action.erase(0, strlen(ipc_action::prefix)); if (action.empty()) { @@ -547,8 +559,7 @@ bool controller::on(const sig_ipc::process_action& evt) { * Process ipc command messages */ bool controller::on(const sig_ipc::process_command& evt) { - ipc_command c{*evt.data()}; - string command{c.payload}; + string command{(*evt()).payload}; command.erase(0, strlen(ipc_command::prefix)); if (command.empty()) { @@ -570,7 +581,7 @@ bool controller::on(const sig_ipc::process_command& evt) { * Process ipc hook messages */ bool controller::on(const sig_ipc::process_hook& evt) { - const ipc_hook hook{*evt.data()}; + const ipc_hook hook{*evt()}; for (const auto& block : m_modules) { for (const auto& module : block.second) {