From 95d6c712bc8ab474fad4438b0ec0cbb05b4f6785 Mon Sep 17 00:00:00 2001 From: Rod Morehead Date: Tue, 22 Jan 2019 10:35:52 -0600 Subject: [PATCH] Make Pipe() functionality more reliable 1. Enhance eventloop-integration to have BusDispatcher monitor incoming Pipe() FDs so incoming Pipe() data can be quickly handled. 2. Fix reading and writing to pipe to handle partial reads, partial writes, and EINTR. 3. Fix Pipe::signal() to follow protocol of read()/write() by writing data starting with size (number of bytes). Implemented by using Pipe::write() rather than a raw ::write(). --- include/dbus-c++/eventloop.h | 3 +- src/eventloop-integration.cpp | 12 +++- src/eventloop.cpp | 19 ++++-- src/pipe.cpp | 112 ++++++++++++++++++++++++++++++---- 4 files changed, 128 insertions(+), 18 deletions(-) diff --git a/include/dbus-c++/eventloop.h b/include/dbus-c++/eventloop.h index 53f90a7..9a242e7 100644 --- a/include/dbus-c++/eventloop.h +++ b/include/dbus-c++/eventloop.h @@ -27,6 +27,7 @@ #include #include +#include #include "api.h" #include "util.h" @@ -205,7 +206,7 @@ class DXXAPI DefaultMainLoop virtual ~DefaultMainLoop(); - virtual void dispatch(); + virtual void dispatch(std::vector& pipe_fds); int _fdunlock[2]; private: diff --git a/src/eventloop-integration.cpp b/src/eventloop-integration.cpp index 5776971..8fa5346 100644 --- a/src/eventloop-integration.cpp +++ b/src/eventloop-integration.cpp @@ -145,8 +145,18 @@ void BusDispatcher::del_pipe(Pipe *pipe) void BusDispatcher::do_iteration() { + std::vector pipe_fds; + for (std::list ::iterator p_it = pipe_list.begin(); + p_it != pipe_list.end(); + ++p_it) + { + Pipe *read_pipe = *p_it; + pipe_fds.push_back(read_pipe->_fd_read); + } + + dispatch_pending(); - dispatch(); + dispatch(pipe_fds); } Timeout *BusDispatcher::add_timeout(Timeout::Internal *ti) diff --git a/src/eventloop.cpp b/src/eventloop.cpp index f622812..c630306 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -148,7 +148,7 @@ DefaultMainLoop::~DefaultMainLoop() _mutex_t.unlock(); } -void DefaultMainLoop::dispatch() +void DefaultMainLoop::dispatch(std::vector& pipe_fds) { _mutex_w.lock(); @@ -159,6 +159,8 @@ void DefaultMainLoop::dispatch() nfd = nfd + 2; } + nfd += pipe_fds.size(); + pollfd fds[nfd]; DefaultWatches::iterator wi = _watches.begin(); @@ -177,14 +179,21 @@ void DefaultMainLoop::dispatch() if (_fdunlock) { + // only monitor "incoming" FD fds[nfd].fd = _fdunlock[0]; fds[nfd].events = POLLIN | POLLOUT | POLLPRI ; fds[nfd].revents = 0; + ++nfd; + } - nfd++; - fds[nfd].fd = _fdunlock[1]; - fds[nfd].events = POLLIN | POLLOUT | POLLPRI ; - fds[nfd].revents = 0; + + + for (std::vector::iterator i = pipe_fds.begin(); i != pipe_fds.end(); i++) + { + fds[nfd].fd = *i; + fds[nfd].events = POLLIN | POLLPRI | POLLRDHUP; + fds[nfd].revents = 0; + ++nfd; } _mutex_w.unlock(); diff --git a/src/pipe.cpp b/src/pipe.cpp index 01211b3..66bbe7a 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -60,28 +60,118 @@ Pipe::Pipe(void(*handler)(const void *data, void *buffer, unsigned int nbyte), c } } + +// this will block if the pipe is full. void Pipe::write(const void *buffer, unsigned int nbytes) { - // TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work... - // first write the size into the pipe... - ::write(_fd_write, static_cast (&nbytes), sizeof(nbytes)); + ssize_t rc; + ssize_t bytes_written = 0; + + // Write size handling EINTR and partial writes + while (bytes_written < sizeof(nbytes)) { + rc = ::write(_fd_write, + ((char*)static_cast (&nbytes)) + bytes_written, + sizeof(nbytes)-bytes_written); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + return; + } + continue; + } + bytes_written += rc; + } + + //Write payload handling EINTR and partial writes + bytes_written = 0; + while (bytes_written < sizeof(nbytes)) { + rc = ::write(_fd_write, + static_cast (buffer) + bytes_written, + nbytes-bytes_written); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + return; + } + continue; + } + bytes_written += rc; + } // ...then write the real data - ::write(_fd_write, buffer, nbytes); + } ssize_t Pipe::read(void *buffer, unsigned int &nbytes) { - // TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work... - // first read the size from the pipe... - ::read(_fd_read, &nbytes, sizeof(nbytes)); - //ssize_t size = 0; - return ::read(_fd_read, buffer, nbytes); + // Read size handling EINTR and partial reads. Note: Since the + // size is written in a single write() and this needs to return + // without blocking if no data is waiting, we can leave the "size" + // read as non-blocking. + nbytes = 0; + ssize_t rc; + ssize_t size = 0; + while (size < sizeof(unsigned int)) { + rc = ::read(_fd_read, ((char*)&nbytes) + size, sizeof(unsigned int) - size); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + // wait for eagain since the entire size should be showing up ASAP since + // it is written in a single call. + if (errno != EAGAIN) { + debug_log("Unexpected errno of %i when reading fd %i\n", errno, _fd_read); + return -1; + } + return 0; + } + continue; + } + size += rc; + } + + + // Change payload read to BLOCKING read so we don't fastloop and + // burn CPU waiting for the complete payload of the data to show up. + int old_flags = fcntl(_fd_read, F_GETFL); + if (old_flags == -1) { + // trouble + debug_log("%s fcntl() failed with errno %i for FD %i\n", + __FUNCTION__, errno, _fd_read); + } else { + // make blocking if was non-blocking + if (old_flags & O_NONBLOCK) { + fcntl(_fd_read, F_SETFL, old_flags & ~O_NONBLOCK); + } + } + + // Read payload handling EINTR and partial reads + size = 0; + while (size < nbytes ) { + rc = ::read(_fd_read, ((char*)buffer) + size, nbytes - size); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + size = 0; + debug_log("%s read() of FD %i failed unexpected with errno %i", __FUNCTION__, _fd_read, errno); + goto cleanup; + } + continue; + } + size += rc; + } + +cleanup: + if ((old_flags != -1) && (old_flags & O_NONBLOCK)) { + //restore flags if needed + fcntl(_fd_read, F_SETFL, old_flags); + } + + return size; } void Pipe::signal() { - // TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work... - ::write(_fd_write, '\0', 1); + // Write a "message" w/size prefix of size 0 + write("", 0); }