From c2acdff7d4f9675844ecc1e8743212020a7af019 Mon Sep 17 00:00:00 2001 From: Michael Carlberg <c@rlberg.se> Date: Fri, 25 Nov 2016 21:20:50 +0100 Subject: [PATCH] feat(eventloop): Delayed enqueue A new worker that will block the queue channel until the delayed event has been processed. This is used to limit the amount of X button events within defined time frame and we can't block the main X thread. --- include/components/bar.hpp | 4 +- include/components/eventloop.hpp | 99 +++++++++++-- src/components/controller.cpp | 56 ++++---- src/components/eventloop.cpp | 240 ++++++++++++++++++++++--------- 4 files changed, 289 insertions(+), 110 deletions(-) diff --git a/include/components/bar.hpp b/include/components/bar.hpp index 1de9156c..0bd49cc2 100644 --- a/include/components/bar.hpp +++ b/include/components/bar.hpp @@ -56,9 +56,9 @@ class bar : public xpp::event::sink<evt::button_press, evt::expose, evt::propert alignment m_trayalign{alignment::NONE}; uint8_t m_trayclients{0}; - std::mutex m_mutex; - string m_lastinput; + + std::mutex m_mutex; }; di::injector<unique_ptr<bar>> configure_bar(); diff --git a/include/components/eventloop.hpp b/include/components/eventloop.hpp index c86dce19..9d7599d3 100644 --- a/include/components/eventloop.hpp +++ b/include/components/eventloop.hpp @@ -12,9 +12,16 @@ POLYBAR_NS using module_t = unique_ptr<modules::module_interface>; using modulemap_t = map<alignment, vector<module_t>>; -enum class event_type { NONE = 0, UPDATE, CHECK, INPUT, QUIT }; +enum class event_type : uint8_t { + NONE = 0, + UPDATE, + CHECK, + INPUT, + QUIT, +}; + struct event { - int type; + uint8_t type{0}; char data[256]{'\0'}; }; @@ -25,27 +32,33 @@ class eventloop { */ using entry_t = event; using queue_t = moodycamel::BlockingConcurrentQueue<entry_t>; + using duration_t = chrono::duration<double, std::milli>; - explicit eventloop(const logger& logger) : m_log(logger) {} + explicit eventloop(const logger& logger, const config& config); ~eventloop() noexcept; - bool enqueue(const entry_t& i); - void run(std::chrono::duration<double, std::milli> timeframe, int limit); + void start(); + void wait(); void stop(); + bool enqueue(const entry_t& entry); + bool enqueue_delayed(const entry_t& entry); + void set_update_cb(callback<>&& cb); void set_input_db(callback<string>&& cb); void add_module(const alignment pos, module_t&& module); - - modulemap_t& modules(); + const modulemap_t& modules() const; + size_t module_count() const; protected: - void start_modules(); + void dispatch_modules(); + void dispatch_queue_worker(); + void dispatch_delayed_worker(); - bool match_event(entry_t evt, event_type type); - bool compare_events(entry_t evt, entry_t evt2); + inline bool match_event(entry_t evt, event_type type); + inline bool compare_events(entry_t evt, entry_t evt2); void forward_event(entry_t evt); void on_update(); @@ -55,13 +68,77 @@ class eventloop { private: const logger& m_log; + const config& m_conf; + /** + * @brief Event queue + */ queue_t m_queue; + + /** + * @brief Loaded modules + */ modulemap_t m_modules; + + /** + * @brief Lock used when accessing the module map + */ + std::mutex m_modulelock; + + /** + * @brief Flag to indicate current run state + */ stateflag m_running; + /** + * @brief Callback fired when receiving UPDATE events + */ callback<> m_update_cb; + + /** + * @brief Callback fired for unprocessed INPUT events + */ callback<string> m_unrecognized_input_cb; + + /** + * @brief Time to wait for subsequent events + */ + duration_t m_swallow_time{0ms}; + + /** + * @brief Maximum amount of subsequent events to swallow within timeframe + */ + size_t m_swallow_limit{0}; + + /** + * @brief Time until releasing the lock on the delayed enqueue channel + */ + duration_t m_delayed_time; + + /** + * @brief Lock used to control the condition variable + */ + std::mutex m_delayed_lock; + + /** + * @brief Condition variable used to manage notifications for delayed enqueues + */ + std::condition_variable m_delayed_cond; + + /** + * @brief Pending event on the delayed channel + */ + entry_t m_delayed_entry{0}; + + /** + * @brief Queue worker thread + */ + thread m_queue_thread; + + /** + * @brief Delayed notifier thread + */ + thread m_delayed_thread; }; namespace { @@ -70,7 +147,7 @@ namespace { */ template <typename T = unique_ptr<eventloop>> di::injector<T> configure_eventloop() { - return di::make_injector(configure_logger()); + return di::make_injector(configure_logger(), configure_config()); } } diff --git a/src/components/controller.cpp b/src/components/controller.cpp index ba19a38d..815f3820 100644 --- a/src/components/controller.cpp +++ b/src/components/controller.cpp @@ -70,17 +70,13 @@ di::injector<unique_ptr<controller>> configure_controller(watch_t& confwatch) { * threads and spawned processes */ controller::~controller() { - g_signals::bar::action_click = nullptr; - if (m_command) { m_log.info("Terminating running shell command"); - m_command->terminate(); + m_command.reset(); } if (m_eventloop) { m_log.info("Deconstructing eventloop"); - m_eventloop->set_update_cb(nullptr); - m_eventloop->set_input_db(nullptr); m_eventloop.reset(); } @@ -189,9 +185,8 @@ void controller::run() { // Start event loop if (m_eventloop) { - auto throttle_ms = m_conf.get<double>("settings", "throttle-ms", 10); - auto throttle_limit = m_conf.get<int>("settings", "throttle-limit", 5); - m_eventloop->run(chrono::duration<double, std::milli>(throttle_ms), throttle_limit); + m_eventloop->start(); + m_eventloop->wait(); } // Wake up signal thread @@ -325,6 +320,7 @@ void controller::wait_for_xevent() { m_connection.flush(); + shared_ptr<xcb_generic_event_t> evt; while (m_running) { try { int error = 0; @@ -332,12 +328,7 @@ void controller::wait_for_xevent() { if ((error = m_connection.connection_has_error()) != 0) { m_log.err("Error in X event loop, terminating... (%s)", m_connection.error_str(error)); kill(getpid(), SIGTERM); - break; - } - - auto evt = m_connection.wait_for_event(); - - if (evt != nullptr) { + } else if ((evt = m_connection.wait_for_event()) != nullptr) { m_connection.dispatch_event(evt); } } catch (const exception& err) { @@ -352,7 +343,6 @@ void controller::wait_for_xevent() { void controller::bootstrap_modules() { const bar_settings bar{m_bar->settings()}; string bs{m_conf.bar_section()}; - size_t module_count = 0; for (int i = 0; i < 3; i++) { alignment align = static_cast<alignment>(i + 1); @@ -429,22 +419,19 @@ void controller::bootstrap_modules() { } module->set_update_cb( - bind(&eventloop::enqueue, m_eventloop.get(), eventloop::entry_t{static_cast<int>(event_type::UPDATE)})); + bind(&eventloop::enqueue, m_eventloop.get(), eventloop::entry_t{static_cast<uint8_t>(event_type::UPDATE)})); module->set_stop_cb( - bind(&eventloop::enqueue, m_eventloop.get(), eventloop::entry_t{static_cast<int>(event_type::CHECK)})); - + bind(&eventloop::enqueue, m_eventloop.get(), eventloop::entry_t{static_cast<uint8_t>(event_type::CHECK)})); module->setup(); m_eventloop->add_module(align, move(module)); - - module_count++; } catch (const std::runtime_error& err) { m_log.err("Disabling module \"%s\" (reason: %s)", module_name, err.what()); } } } - if (module_count == 0) { + if (!m_eventloop->module_count()) { throw application_error("No modules created"); } } @@ -453,6 +440,10 @@ void controller::bootstrap_modules() { * Callback for received ipc actions */ void controller::on_ipc_action(const ipc_action& message) { + if (!m_eventloop) { + return; + } + string action = message.payload.substr(strlen(ipc_action::prefix)); if (action.empty()) { @@ -460,7 +451,7 @@ void controller::on_ipc_action(const ipc_action& message) { return; } - eventloop::entry_t evt{static_cast<int>(event_type::INPUT)}; + eventloop::entry_t evt{static_cast<uint8_t>(event_type::INPUT)}; snprintf(evt.data, sizeof(evt.data), "%s", action.c_str()); m_log.info("Enqueuing IPC action: %s", action); @@ -471,13 +462,20 @@ void controller::on_ipc_action(const ipc_action& message) { * Callback for clicked bar actions */ void controller::on_mouse_event(const string& input) { - eventloop::entry_t evt{static_cast<int>(event_type::INPUT)}; + if (!m_eventloop) { + return; + } + + eventloop::entry_t evt{static_cast<uint8_t>(event_type::INPUT)}; if (input.length() > sizeof(evt.data)) { - m_log.warn("Ignoring input event (size)"); - } else { - snprintf(evt.data, sizeof(evt.data), "%s", input.c_str()); - m_eventloop->enqueue(evt); + return m_log.warn("Ignoring input event (size)"); + } + + snprintf(evt.data, sizeof(evt.data), "%s", input.c_str()); + + if (!m_eventloop->enqueue_delayed(evt)) { + m_log.trace_x("controller: Dispatcher busy"); } } @@ -505,6 +503,10 @@ void controller::on_unrecognized_action(string input) { * Callback for module content update */ void controller::on_update() { + if (!m_bar) { + return; + } + const bar_settings& bar{m_bar->settings()}; string contents; diff --git a/src/components/eventloop.cpp b/src/components/eventloop.cpp index 0d296602..008ea750 100644 --- a/src/components/eventloop.cpp +++ b/src/components/eventloop.cpp @@ -6,10 +6,28 @@ POLYBAR_NS +/** + * Construct eventloop instance + */ +eventloop::eventloop(const logger& logger, const config& config) : m_log(logger), m_conf(config) { + m_delayed_time = duration_t{m_conf.get<double>("settings", "eventloop-delayed-time", 25)}; + m_swallow_time = duration_t{m_conf.get<double>("settings", "eventloop-swallow-time", 10)}; + m_swallow_limit = m_conf.get<size_t>("settings", "eventloop-swallow", 5U); +} + /** * Deconstruct eventloop */ eventloop::~eventloop() noexcept { + m_update_cb = nullptr; + m_unrecognized_input_cb = nullptr; + + if (m_delayed_thread.joinable()) { + m_delayed_thread.join(); + } + + std::lock_guard<std::mutex> guard(m_modulelock, std::adopt_lock); + for (auto&& block : m_modules) { for (auto&& module : block.second) { auto module_name = module->name(); @@ -23,68 +41,25 @@ eventloop::~eventloop() noexcept { } /** - * Enqueue event + * Start module and worker threads */ -bool eventloop::enqueue(const entry_t& i) { - bool enqueued; +void eventloop::start() { + m_log.info("Starting event loop"); + m_running = true; - if (!(enqueued = m_queue.enqueue(i))) { - m_log.warn("Failed to queue event (%d)", i.type); - } + dispatch_modules(); - return enqueued; + m_queue_thread = thread(&eventloop::dispatch_queue_worker, this); + m_delayed_thread = thread(&eventloop::dispatch_delayed_worker, this); } /** - * Start module threads and wait for events on the queue - * - * @param timeframe Time to wait for subsequent events - * @param limit Maximum amount of subsequent events to swallow within timeframe + * Wait for worker threads to end */ -void eventloop::run(std::chrono::duration<double, std::milli> timeframe, int limit) { - m_log.info("Starting event loop", timeframe.count(), limit); - m_running = true; - - m_log.trace("eventloop: timeframe: %d, limit: %d", timeframe.count(), limit); - - start_modules(); - - while (m_running) { - entry_t evt, next{static_cast<int>(event_type::NONE)}; - m_queue.wait_dequeue(evt); - - if (!m_running) { - break; - } - - if (match_event(evt, event_type::UPDATE)) { - int swallowed = 0; - while (swallowed++ < limit && m_queue.wait_dequeue_timed(next, timeframe)) { - if (match_event(next, event_type::QUIT)) { - evt = next; - break; - } else if (compare_events(evt, next)) { - m_log.trace_x("eventloop: Swallowing event within timeframe"); - evt = next; - } else { - break; - } - } - } - - forward_event(evt); - - if (match_event(next, event_type::NONE)) { - continue; - } - if (compare_events(evt, next)) { - continue; - } - - forward_event(next); +void eventloop::wait() { + if (m_queue_thread.joinable()) { + m_queue_thread.join(); } - - m_log.trace("eventloop: Loop ended"); } /** @@ -93,7 +68,47 @@ void eventloop::run(std::chrono::duration<double, std::milli> timeframe, int lim void eventloop::stop() { m_log.info("Stopping event loop"); m_running = false; - enqueue({static_cast<int>(event_type::QUIT)}); + + if (m_delayed_thread.joinable()) { + m_delayed_cond.notify_one(); + } + + enqueue({static_cast<uint8_t>(event_type::QUIT)}); +} + +/** + * Enqueue event + */ +bool eventloop::enqueue(const entry_t& entry) { + if (m_queue.enqueue(entry)) { + return true; + } + m_log.warn("Failed to enqueue event (%d)", entry.type); + return false; +} + +/** + * Delay enqueue by given time + */ +bool eventloop::enqueue_delayed(const entry_t& entry) { + if (!m_delayed_lock.try_lock()) { + return false; + } + + std::unique_lock<std::mutex> guard(m_delayed_lock, std::adopt_lock); + + if (m_delayed_entry.type != 0) { + return false; + } + + m_delayed_entry = entry; + + if (m_queue.enqueue(entry)) { + return true; + } + + m_delayed_entry.type = 0; + return false; } /** @@ -114,6 +129,8 @@ void eventloop::set_input_db(callback<string>&& cb) { * Add module to alignment block */ void eventloop::add_module(const alignment pos, module_t&& module) { + std::lock_guard<std::mutex> guard(m_modulelock, std::adopt_lock); + auto it = m_modules.lower_bound(pos); if (it != m_modules.end() && !(m_modules.key_comp()(pos, it->first))) { @@ -128,16 +145,29 @@ void eventloop::add_module(const alignment pos, module_t&& module) { /** * Get reference to module map */ -modulemap_t& eventloop::modules() { +const modulemap_t& eventloop::modules() const { return m_modules; } +/** + * Get loaded module count + */ +size_t eventloop::module_count() const { + size_t count{0}; + for (auto&& block : m_modules) { + count += block.second.size(); + } + return count; +} + /** * Start module threads */ -void eventloop::start_modules() { - for (auto&& block : m_modules) { - for (auto&& module : block.second) { +void eventloop::dispatch_modules() { + std::lock_guard<std::mutex> guard(m_modulelock); + + for (const auto& block : m_modules) { + for (const auto& module : block.second) { try { m_log.info("Starting %s", module->name()); module->start(); @@ -148,31 +178,95 @@ void eventloop::start_modules() { } } +/** + * Dispatch queue worker thread + */ +void eventloop::dispatch_queue_worker() { + while (m_running) { + entry_t evt, next{static_cast<uint8_t>(event_type::NONE)}; + m_queue.wait_dequeue(evt); + + if (!m_running) { + break; + } + + if (m_delayed_entry.type != 0 && compare_events(evt, m_delayed_entry)) { + m_delayed_cond.notify_one(); + } + + size_t swallowed{0}; + while (swallowed++ < m_swallow_limit && m_queue.wait_dequeue_timed(next, m_swallow_time)) { + if (match_event(next, event_type::QUIT)) { + evt = next; + break; + } else if (!compare_events(evt, next)) { + enqueue(move(next)); + break; + } + + m_log.trace_x("eventloop: Swallowing event within timeframe"); + evt = next; + } + + forward_event(evt); + } + + m_log.trace("eventloop: Reached end of queue worker thread"); +} + +/** + * Dispatch delayed worker thread + */ +void eventloop::dispatch_delayed_worker() { + while (true) { + // wait for notification + while (m_running && m_delayed_entry.type != 0) { + std::unique_lock<std::mutex> guard(m_delayed_lock); + m_delayed_cond.wait(guard, [&] { return m_delayed_entry.type != 0 || !m_running; }); + } + + if (!m_running) { + break; + } + + this_thread::sleep_for(m_delayed_time); + m_delayed_entry.type = 0; + } + + m_log.trace("eventloop: Reached end of delayed worker thread"); +} + /** * Test if event matches given type */ -bool eventloop::match_event(entry_t evt, event_type type) { - return static_cast<int>(type) == evt.type; +inline bool eventloop::match_event(entry_t evt, event_type type) { + return static_cast<uint8_t>(type) == evt.type; } /** * Compare given events */ -bool eventloop::compare_events(entry_t evt, entry_t evt2) { - return evt.type == evt2.type; +inline bool eventloop::compare_events(entry_t evt, entry_t evt2) { + if (evt.type != evt2.type) { + return false; + } else if (match_event(evt, event_type::INPUT)) { + return evt.data[0] == evt2.data[0] && strncmp(evt.data, evt2.data, strlen(evt.data)) == 0; + } + + return true; } /** * Forward event to handler based on type */ void eventloop::forward_event(entry_t evt) { - if (evt.type == static_cast<int>(event_type::UPDATE)) { + if (evt.type == static_cast<uint8_t>(event_type::UPDATE)) { on_update(); - } else if (evt.type == static_cast<int>(event_type::INPUT)) { - on_input(string{evt.data}); - } else if (evt.type == static_cast<int>(event_type::CHECK)) { + } else if (evt.type == static_cast<uint8_t>(event_type::INPUT)) { + on_input(evt.data); + } else if (evt.type == static_cast<uint8_t>(event_type::CHECK)) { on_check(); - } else if (evt.type == static_cast<int>(event_type::QUIT)) { + } else if (evt.type == static_cast<uint8_t>(event_type::QUIT)) { on_quit(); } else { m_log.warn("Unknown event type for enqueued event (%d)", evt.type); @@ -230,8 +324,14 @@ void eventloop::on_check() { return; } - for (auto&& block : m_modules) { - for (auto&& module : block.second) { + if (!m_modulelock.try_lock()) { + return; + } + + std::lock_guard<std::mutex> guard(m_modulelock, std::adopt_lock); + + for (const auto& block : m_modules) { + for (const auto& module : block.second) { if (module->running()) { return; }