diff --git a/include/components/controller.hpp b/include/components/controller.hpp index 58212c73..81623866 100644 --- a/include/components/controller.hpp +++ b/include/components/controller.hpp @@ -57,7 +57,6 @@ class controller : public signal_receiver { }; struct PipeHandle : public UVHandleGeneric { - PipeHandle(uv_loop_t* loop, std::function fun); + PipeHandle(uv_loop_t* loop, std::function fun, std::function eof_cb); void start(int fd); void read_cb(ssize_t nread, const uv_buf_t* buf); std::function func; + std::function eof_cb; int fd; }; @@ -114,7 +115,7 @@ class eventloop { void signal_handler(int signum, std::function fun); void poll_handler(int events, int fd, std::function fun); void fs_event_handler(const string& path, std::function fun); - void pipe_handle(int fd, std::function fun); + void pipe_handle(int fd, std::function fun, std::function eof_cb); void timer_handle(uint64_t timeout, uint64_t repeat, std::function fun); AsyncHandle_t async_handle(std::function fun); diff --git a/include/components/ipc.hpp b/include/components/ipc.hpp index e551d789..27bcdff9 100644 --- a/include/components/ipc.hpp +++ b/include/components/ipc.hpp @@ -34,7 +34,8 @@ class ipc { explicit ipc(signal_emitter& emitter, const logger& logger); ~ipc(); - void receive_message(string buf); + void receive_data(string buf); + void receive_eof(); int get_file_descriptor() const; private: @@ -42,7 +43,12 @@ class ipc { const logger& m_log; string m_path{}; - unique_ptr m_fd; + int m_fd; + + /** + * Buffer for the currently received IPC message. + */ + string m_buffer{}; }; POLYBAR_NS_END diff --git a/src/components/controller.cpp b/src/components/controller.cpp index 29fd232c..99211b13 100644 --- a/src/components/controller.cpp +++ b/src/components/controller.cpp @@ -195,11 +195,6 @@ void controller::conn_cb(int status, int) { } } -void controller::ipc_cb(string buf) { - // TODO handle messages sent in multiple parts. - m_ipc->receive_message(buf); -} - void controller::signal_handler(int signum) { m_log.notice("Received signal(%d): %s", signum, strsignal(signum)); stop(signum == SIGUSR1); @@ -264,7 +259,9 @@ void controller::read_events(bool confwatch) { } if (m_ipc) { - eloop->pipe_handle(m_ipc->get_file_descriptor(), [this](const string payload) { ipc_cb(payload); }); + eloop->pipe_handle( + m_ipc->get_file_descriptor(), [this](const string payload) { m_ipc->receive_data(payload); }, + [this]() { m_ipc->receive_eof(); }); } if (!m_snapshot_dst.empty()) { diff --git a/src/components/eventloop.cpp b/src/components/eventloop.cpp index 6003aa76..57292ee0 100644 --- a/src/components/eventloop.cpp +++ b/src/components/eventloop.cpp @@ -73,8 +73,8 @@ void FSEventHandle::start(const string& path) { // }}} // PipeHandle {{{ -PipeHandle::PipeHandle(uv_loop_t* loop, std::function fun) - : UVHandleGeneric([&](ssize_t nread, const uv_buf_t* buf) { read_cb(nread, buf); }), func(fun) { +PipeHandle::PipeHandle(uv_loop_t* loop, std::function fun, std::function eof_cb) + : UVHandleGeneric([&](ssize_t nread, const uv_buf_t* buf) { read_cb(nread, buf); }), func(fun), eof_cb(eof_cb) { UV(uv_pipe_init, loop, handle, false); } @@ -97,6 +97,7 @@ void PipeHandle::read_cb(ssize_t nread, const uv_buf_t* buf) { log.err("Read error: %s", uv_err_name(nread)); uv_close((uv_handle_t*)handle, nullptr); } else { + eof_cb(); // TODO this causes constant EOFs start(this->fd); } @@ -191,8 +192,8 @@ void eventloop::fs_event_handler(const string& path, std::functionstart(path); } -void eventloop::pipe_handle(int fd, std::function fun) { - m_pipe_handles.emplace_back(std::make_unique(get(), fun)); +void eventloop::pipe_handle(int fd, std::function fun, std::function eof_cb) { + m_pipe_handles.emplace_back(std::make_unique(get(), fun, eof_cb)); m_pipe_handles.back()->start(fd); } diff --git a/src/components/ipc.cpp b/src/components/ipc.cpp index 7487a910..d89f305a 100644 --- a/src/components/ipc.cpp +++ b/src/components/ipc.cpp @@ -33,8 +33,11 @@ ipc::ipc(signal_emitter& emitter, const logger& logger) : m_sig(emitter), m_log( throw system_error("Failed to create ipc channel"); } + if ((m_fd = open(m_path.c_str(), O_RDONLY | O_NONBLOCK)) == -1) { + throw system_error("Failed to open pipe '" + m_path + "'"); + } + m_log.info("Created ipc channel at: %s", m_path); - m_fd = file_util::make_file_descriptor(m_path, O_RDONLY | O_NONBLOCK, false); } /** @@ -48,12 +51,23 @@ ipc::~ipc() { } /** - * Receive available ipc messages and delegate valid events + * Receive parts of an IPC message */ -void ipc::receive_message(string buf) { - m_log.info("Receiving ipc message"); +void ipc::receive_data(string buf) { + m_buffer += buf; +} - string payload{string_util::trim(std::move(buf), '\n')}; +/** + * Called once the end of the message arrives. + */ +void ipc::receive_eof() { + if (m_buffer.empty()) { + return; + } + + string payload{string_util::trim(std::move(m_buffer), '\n')}; + + m_buffer = std::string(); if (payload.find(ipc_command_prefix) == 0) { m_sig.emit(signals::ipc::command{payload.substr(strlen(ipc_command_prefix))}); @@ -61,7 +75,7 @@ void ipc::receive_message(string buf) { m_sig.emit(signals::ipc::hook{payload.substr(strlen(ipc_hook_prefix))}); } else if (payload.find(ipc_action_prefix) == 0) { m_sig.emit(signals::ipc::action{payload.substr(strlen(ipc_action_prefix))}); - } else if (!payload.empty()) { + } else { m_log.warn("Received unknown ipc message: (payload=%s)", payload); } } @@ -70,7 +84,7 @@ void ipc::receive_message(string buf) { * Get the file descriptor to the ipc channel */ int ipc::get_file_descriptor() const { - return *m_fd; + return m_fd; } POLYBAR_NS_END