Reopen pipe path on EOF

This commit is contained in:
patrick96 2021-09-13 19:56:24 +02:00 committed by Patrick Ziegler
parent 77b9cffaf8
commit 386eb57ba7
5 changed files with 53 additions and 38 deletions

View File

@ -40,6 +40,10 @@ struct UVHandleGeneric {
close(); close();
} }
uv_loop_t* loop() const {
return handle->loop;
}
void close() { void close() {
if (handle && !uv_is_closing((uv_handle_t*)handle)) { if (handle && !uv_is_closing((uv_handle_t*)handle)) {
uv_close((uv_handle_t*)handle, close_callback); uv_close((uv_handle_t*)handle, close_callback);
@ -91,15 +95,16 @@ struct FSEventHandle : public UVHandle<uv_fs_event_t, const char*, int, int> {
}; };
struct PipeHandle : public UVHandleGeneric<uv_pipe_t, uv_stream_t, ssize_t, const uv_buf_t*> { struct PipeHandle : public UVHandleGeneric<uv_pipe_t, uv_stream_t, ssize_t, const uv_buf_t*> {
PipeHandle( PipeHandle(uv_loop_t* loop, const string& path, function<void(const string)> fun, function<void(void)> eof_cb,
uv_loop_t* loop, function<void(const string)> fun, function<void(void)> eof_cb, function<void(int)> err_cb); function<void(int)> err_cb);
void start(int fd); void start();
void read_cb(ssize_t nread, const uv_buf_t* buf); void read_cb(ssize_t nread, const uv_buf_t* buf);
function<void(const string)> func; function<void(const string)> func;
function<void(void)> eof_cb; function<void(void)> eof_cb;
function<void(int)> err_cb; function<void(int)> err_cb;
int fd; int fd;
string path;
}; };
struct TimerHandle : public UVHandle<uv_timer_t> { struct TimerHandle : public UVHandle<uv_timer_t> {
@ -129,7 +134,8 @@ class eventloop {
void signal_handle(int signum, function<void(int)> fun); void signal_handle(int signum, function<void(int)> fun);
void poll_handle(int events, int fd, function<void(uv_poll_event)> fun, function<void(int)> err_cb); void poll_handle(int events, int fd, function<void(uv_poll_event)> fun, function<void(int)> err_cb);
void fs_event_handle(const string& path, function<void(const char*, uv_fs_event)> fun, function<void(int)> err_cb); void fs_event_handle(const string& path, function<void(const char*, uv_fs_event)> fun, function<void(int)> err_cb);
void pipe_handle(int fd, function<void(const string)> fun, function<void(void)> eof_cb, function<void(int)> err_cb); void pipe_handle(
const string& path, function<void(const string)> fun, function<void(void)> eof_cb, function<void(int)> err_cb);
void timer_handle(uint64_t timeout, uint64_t repeat, function<void(void)> fun); void timer_handle(uint64_t timeout, uint64_t repeat, function<void(void)> fun);
AsyncHandle_t async_handle(function<void(void)> fun); AsyncHandle_t async_handle(function<void(void)> fun);

View File

@ -26,16 +26,16 @@ class ipc {
explicit ipc(signal_emitter& emitter, const logger& logger); explicit ipc(signal_emitter& emitter, const logger& logger);
~ipc(); ~ipc();
string get_path() const;
void receive_data(string buf); void receive_data(string buf);
void receive_eof(); void receive_eof();
int get_file_descriptor() const;
private: private:
signal_emitter& m_sig; signal_emitter& m_sig;
const logger& m_log; const logger& m_log;
string m_path{}; string m_path{};
int m_fd;
/** /**
* Buffer for the currently received IPC message. * Buffer for the currently received IPC message.

View File

@ -260,7 +260,7 @@ void controller::read_events(bool confwatch) {
if (m_ipc) { if (m_ipc) {
eloop->pipe_handle( eloop->pipe_handle(
m_ipc->get_file_descriptor(), [this](const string payload) { m_ipc->receive_data(payload); }, m_ipc->get_path(), [this](const string payload) { m_ipc->receive_data(payload); },
[this]() { m_ipc->receive_eof(); }, [this]() { m_ipc->receive_eof(); },
[this](int err) { m_log.err("libuv error while listening to IPC channel: %s", uv_strerror(err)); }); [this](int err) { m_log.err("libuv error while listening to IPC channel: %s", uv_strerror(err)); });
} }

View File

@ -2,6 +2,8 @@
#include <cassert> #include <cassert>
#include "errors.hpp"
POLYBAR_NS POLYBAR_NS
/** /**
@ -95,37 +97,54 @@ void FSEventHandle::fs_event_cb(const char* path, int events, int status) {
// }}} // }}}
// PipeHandle {{{ // PipeHandle {{{
PipeHandle::PipeHandle( PipeHandle::PipeHandle(uv_loop_t* loop, const string& path, function<void(const string)> fun,
uv_loop_t* loop, function<void(const string)> fun, function<void(void)> eof_cb, function<void(int)> err_cb) function<void(void)> eof_cb, function<void(int)> err_cb)
: UVHandleGeneric([&](ssize_t nread, const uv_buf_t* buf) { read_cb(nread, buf); }) : UVHandleGeneric([&](ssize_t nread, const uv_buf_t* buf) { read_cb(nread, buf); })
, func(fun) , func(fun)
, eof_cb(eof_cb) , eof_cb(eof_cb)
, err_cb(err_cb) { , err_cb(err_cb)
, path(path) {
UV(uv_pipe_init, loop, handle, false); UV(uv_pipe_init, loop, handle, false);
} }
void PipeHandle::start(int fd) { void PipeHandle::start() {
this->fd = fd; if ((fd = open(path.c_str(), O_RDONLY | O_NONBLOCK)) == -1) {
throw system_error("Failed to open pipe '" + path + "'");
}
UV(uv_pipe_open, handle, fd); UV(uv_pipe_open, handle, fd);
UV(uv_read_start, (uv_stream_t*)handle, alloc_cb, callback); UV(uv_read_start, (uv_stream_t*)handle, alloc_cb, callback);
} }
void PipeHandle::read_cb(ssize_t nread, const uv_buf_t* buf) { void PipeHandle::read_cb(ssize_t nread, const uv_buf_t* buf) {
/*
* Wrap pointer so that it gets automatically freed once the function returns (even with exceptions)
*/
auto buf_ptr = unique_ptr<char>(buf->base);
if (nread > 0) { if (nread > 0) {
func(string(buf->base, nread)); func(string(buf_ptr.get(), nread));
} else if (nread < 0) { } else if (nread < 0) {
if (nread != UV_EOF) { if (nread != UV_EOF) {
close(); close();
err_cb(nread); err_cb(nread);
} else { } else {
eof_cb(); eof_cb();
// TODO this causes constant EOFs
start(this->fd);
}
}
if (buf->base) { /*
delete[] buf->base; * This is a special case.
*
* Once we read EOF, we no longer receive events for the fd, so we close the entire handle and restart it with a
* new fd.
*
* We reuse the memory for the underlying uv handle
*/
if (!uv_is_closing((uv_handle_t*)handle)) {
uv_close((uv_handle_t*)handle, [](uv_handle_t* handle) {
PipeHandle* This = static_cast<PipeHandle*>(handle->data);
UV(uv_pipe_init, This->loop(), This->handle, false);
This->start();
});
}
}
} }
} }
// }}} // }}}
@ -215,9 +234,9 @@ void eventloop::fs_event_handle(
} }
void eventloop::pipe_handle( void eventloop::pipe_handle(
int fd, function<void(const string)> fun, function<void(void)> eof_cb, function<void(int)> err_cb) { const string& path, function<void(const string)> fun, function<void(void)> eof_cb, function<void(int)> err_cb) {
m_pipe_handles.emplace_back(std::make_unique<PipeHandle>(get(), fun, eof_cb, err_cb)); m_pipe_handles.emplace_back(std::make_unique<PipeHandle>(get(), path, fun, eof_cb, err_cb));
m_pipe_handles.back()->start(fd); m_pipe_handles.back()->start();
} }
void eventloop::timer_handle(uint64_t timeout, uint64_t repeat, function<void(void)> fun) { void eventloop::timer_handle(uint64_t timeout, uint64_t repeat, function<void(void)> fun) {

View File

@ -39,11 +39,6 @@ ipc::ipc(signal_emitter& emitter, const logger& logger) : m_sig(emitter), m_log(
if (mkfifo(m_path.c_str(), 0666) == -1) { if (mkfifo(m_path.c_str(), 0666) == -1) {
throw system_error("Failed to create ipc channel"); throw system_error("Failed to create ipc channel");
} }
if ((m_fd = open(m_path.c_str(), O_RDONLY | O_NONBLOCK)) == -1) {
throw system_error("Failed to open pipe '" + m_path + "'");
}
m_log.info("Created ipc channel at: %s", m_path); m_log.info("Created ipc channel at: %s", m_path);
} }
@ -51,10 +46,12 @@ ipc::ipc(signal_emitter& emitter, const logger& logger) : m_sig(emitter), m_log(
* Deconstruct ipc handler * Deconstruct ipc handler
*/ */
ipc::~ipc() { ipc::~ipc() {
if (!m_path.empty()) { m_log.trace("ipc: Removing file handle at: %s", m_path);
m_log.trace("ipc: Removing file handle"); unlink(m_path.c_str());
unlink(m_path.c_str()); }
}
string ipc::get_path() const {
return m_path;
} }
/** /**
@ -87,11 +84,4 @@ void ipc::receive_eof() {
} }
} }
/**
* Get the file descriptor to the ipc channel
*/
int ipc::get_file_descriptor() const {
return m_fd;
}
POLYBAR_NS_END POLYBAR_NS_END