refactor(controller): Process eventqueue concurrently

This commit is contained in:
Michael Carlberg 2016-12-23 05:10:05 +01:00
parent 8cff01e3d8
commit 8cc275ccd1
2 changed files with 59 additions and 44 deletions

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <moodycamel/blockingconcurrentqueue.h> #include <moodycamel/blockingconcurrentqueue.h>
#include <thread>
#include "common.hpp" #include "common.hpp"
#include "config.hpp" #include "config.hpp"
@ -32,6 +33,7 @@ using modulemap_t = std::map<alignment, vector<module_t>>;
// }}} // }}}
using std::thread;
namespace chrono = std::chrono; namespace chrono = std::chrono;
using namespace std::chrono_literals; using namespace std::chrono_literals;
@ -83,6 +85,8 @@ class controller : public signal_receiver<SIGN_PRIORITY_CONTROLLER, sig_ev::proc
unique_ptr<file_descriptor> m_fdevent_rd; unique_ptr<file_descriptor> m_fdevent_rd;
unique_ptr<file_descriptor> m_fdevent_wr; unique_ptr<file_descriptor> m_fdevent_wr;
thread m_event_thread;
/** /**
* @brief Controls weather the output gets printed to stdout * @brief Controls weather the output gets printed to stdout
*/ */

View File

@ -183,8 +183,15 @@ bool controller::run(bool writeback) {
m_connection.flush(); m_connection.flush();
m_event_thread = thread(&controller::process_eventqueue, this);
read_events(); read_events();
if (m_event_thread.joinable()) {
m_queue.enqueue(make_quit_evt(static_cast<bool>(g_reload)));
m_event_thread.join();
}
m_log.warn("Termination signal received, shutting down..."); m_log.warn("Termination signal received, shutting down...");
return !g_reload; return !g_reload;
@ -198,9 +205,9 @@ bool controller::enqueue(event&& evt) {
m_log.warn("Failed to enqueue event"); m_log.warn("Failed to enqueue event");
return false; return false;
} }
if (write(g_eventpipe[PIPE_WRITE], " ", 1) == -1) { // if (write(g_eventpipe[PIPE_WRITE], " ", 1) == -1) {
m_log.err("Failed to write to eventpipe (reason: %s)", strerror(errno)); // m_log.err("Failed to write to eventpipe (reason: %s)", strerror(errno));
} // }
return true; return true;
} }
@ -223,6 +230,8 @@ bool controller::enqueue(string&& input_data) {
* Read events from configured file descriptors * Read events from configured file descriptors
*/ */
void controller::read_events() { void controller::read_events() {
m_log.info("Entering event loop (thread-id=%lu)", this_thread::get_id());
int fd_connection{m_connection.get_file_descriptor()}; int fd_connection{m_connection.get_file_descriptor()};
int fd_confwatch{0}; int fd_confwatch{0};
int fd_ipc{0}; int fd_ipc{0};
@ -241,8 +250,6 @@ void controller::read_events() {
fds.emplace_back((fd_ipc = m_ipc->get_file_descriptor())); fds.emplace_back((fd_ipc = m_ipc->get_file_descriptor()));
} }
m_sig.emit(sig_ev::process_update{make_update_evt(true)});
while (!g_terminate) { while (!g_terminate) {
fd_set readfds{}; fd_set readfds{};
FD_ZERO(&readfds); FD_ZERO(&readfds);
@ -263,7 +270,6 @@ void controller::read_events() {
// Process event on the internal fd // Process event on the internal fd
if (m_fdevent_rd && FD_ISSET(*m_fdevent_rd, &readfds)) { if (m_fdevent_rd && FD_ISSET(*m_fdevent_rd, &readfds)) {
process_eventqueue();
char buffer[BUFSIZ]{'\0'}; char buffer[BUFSIZ]{'\0'};
if (read(*m_fdevent_rd, &buffer, BUFSIZ) == -1) { if (read(*m_fdevent_rd, &buffer, BUFSIZ) == -1) {
m_log.err("Failed to read from eventpipe (err: %s)", strerror(errno)); m_log.err("Failed to read from eventpipe (err: %s)", strerror(errno));
@ -301,46 +307,53 @@ void controller::read_events() {
} }
/** /**
* Dequeue items from the eventqueue * Eventqueue worker loop
*/ */
void controller::process_eventqueue() { void controller::process_eventqueue() {
event evt{}; m_log.info("Eventqueue worker (thread-id=%lu)", this_thread::get_id());
if (!m_queue.try_dequeue(evt)) { m_queue.enqueue(make_update_evt(true));
return m_log.err("Failed to dequeue event");
}
if (evt.type == static_cast<uint8_t>(event_type::INPUT)) { while (!g_terminate) {
process_inputdata(); event evt{};
} else { m_queue.wait_dequeue(evt);
event next{};
size_t swallowed{0}; if (g_terminate || evt.type == static_cast<uint8_t>(event_type::QUIT)) {
while (swallowed++ < m_swallow_limit && m_queue.wait_dequeue_timed(next, m_swallow_update)) { break;
if (next.type == static_cast<uint8_t>(event_type::QUIT)) {
evt = next;
break;
} else if (next.type == static_cast<uint8_t>(event_type::INPUT)) {
evt = next;
break;
} else if (evt.type != next.type) {
m_queue.try_enqueue(move(next));
break;
} else {
m_log.trace_x("controller: Swallowing event within timeframe");
evt = next;
}
} }
if (evt.type == static_cast<uint8_t>(event_type::UPDATE)) { if (evt.type == static_cast<uint8_t>(event_type::INPUT)) {
m_sig.emit(sig_ev::process_update{make_update_evt(evt.flag)});
} else if (evt.type == static_cast<uint8_t>(event_type::INPUT)) {
process_inputdata(); process_inputdata();
} else if (evt.type == static_cast<uint8_t>(event_type::QUIT)) {
m_sig.emit(sig_ev::process_quit{make_quit_evt(evt.flag)});
} else if (evt.type == static_cast<uint8_t>(event_type::CHECK)) {
m_sig.emit(sig_ev::process_check{});
} else { } else {
m_log.warn("Unknown event type for enqueued event (%d)", evt.type); event next{};
size_t swallowed{0};
while (swallowed++ < m_swallow_limit && m_queue.wait_dequeue_timed(next, m_swallow_update)) {
if (next.type == static_cast<uint8_t>(event_type::QUIT)) {
evt = next;
break;
} else if (next.type == static_cast<uint8_t>(event_type::INPUT)) {
evt = next;
break;
} else if (evt.type != next.type) {
m_queue.try_enqueue(move(next));
break;
} else {
m_log.trace_x("controller: Swallowing event within timeframe");
evt = next;
}
}
if (evt.type == static_cast<uint8_t>(event_type::UPDATE)) {
m_sig.emit(sig_ev::process_update{make_update_evt(evt.flag)});
} else if (evt.type == static_cast<uint8_t>(event_type::INPUT)) {
process_inputdata();
} else if (evt.type == static_cast<uint8_t>(event_type::QUIT)) {
m_sig.emit(sig_ev::process_quit{make_quit_evt(evt.flag)});
} else if (evt.type == static_cast<uint8_t>(event_type::CHECK)) {
m_sig.emit(sig_ev::process_check{});
} else {
m_log.warn("Unknown event type for enqueued event (%d)", evt.type);
}
} }
} }
} }
@ -514,7 +527,7 @@ bool controller::on(const sig_ev::process_check&) {
* Process ui button press event * Process ui button press event
*/ */
bool controller::on(const sig_ui::button_press& evt) { bool controller::on(const sig_ui::button_press& evt) {
string input{*evt.data()}; string input{*evt()};
if (input.empty()) { if (input.empty()) {
m_log.err("Cannot enqueue empty input"); m_log.err("Cannot enqueue empty input");
@ -529,8 +542,7 @@ bool controller::on(const sig_ui::button_press& evt) {
* Process ipc action messages * Process ipc action messages
*/ */
bool controller::on(const sig_ipc::process_action& evt) { bool controller::on(const sig_ipc::process_action& evt) {
ipc_action a{*evt.data()}; string action{(*evt()).payload};
string action{a.payload};
action.erase(0, strlen(ipc_action::prefix)); action.erase(0, strlen(ipc_action::prefix));
if (action.empty()) { if (action.empty()) {
@ -547,8 +559,7 @@ bool controller::on(const sig_ipc::process_action& evt) {
* Process ipc command messages * Process ipc command messages
*/ */
bool controller::on(const sig_ipc::process_command& evt) { bool controller::on(const sig_ipc::process_command& evt) {
ipc_command c{*evt.data()}; string command{(*evt()).payload};
string command{c.payload};
command.erase(0, strlen(ipc_command::prefix)); command.erase(0, strlen(ipc_command::prefix));
if (command.empty()) { if (command.empty()) {
@ -570,7 +581,7 @@ bool controller::on(const sig_ipc::process_command& evt) {
* Process ipc hook messages * Process ipc hook messages
*/ */
bool controller::on(const sig_ipc::process_hook& evt) { bool controller::on(const sig_ipc::process_hook& evt) {
const ipc_hook hook{*evt.data()}; const ipc_hook hook{*evt()};
for (const auto& block : m_modules) { for (const auto& block : m_modules) {
for (const auto& module : block.second) { for (const auto& module : block.second) {