diff --git a/include/dbus-c++/eventloop.h b/include/dbus-c++/eventloop.h index 53f90a7..9a242e7 100644 --- a/include/dbus-c++/eventloop.h +++ b/include/dbus-c++/eventloop.h @@ -27,6 +27,7 @@ #include #include +#include #include "api.h" #include "util.h" @@ -205,7 +206,7 @@ class DXXAPI DefaultMainLoop virtual ~DefaultMainLoop(); - virtual void dispatch(); + virtual void dispatch(std::vector& pipe_fds); int _fdunlock[2]; private: diff --git a/src/eventloop-integration.cpp b/src/eventloop-integration.cpp index 5776971..8fa5346 100644 --- a/src/eventloop-integration.cpp +++ b/src/eventloop-integration.cpp @@ -145,8 +145,18 @@ void BusDispatcher::del_pipe(Pipe *pipe) void BusDispatcher::do_iteration() { + std::vector pipe_fds; + for (std::list ::iterator p_it = pipe_list.begin(); + p_it != pipe_list.end(); + ++p_it) + { + Pipe *read_pipe = *p_it; + pipe_fds.push_back(read_pipe->_fd_read); + } + + dispatch_pending(); - dispatch(); + dispatch(pipe_fds); } Timeout *BusDispatcher::add_timeout(Timeout::Internal *ti) diff --git a/src/eventloop.cpp b/src/eventloop.cpp index f622812..c630306 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -148,7 +148,7 @@ DefaultMainLoop::~DefaultMainLoop() _mutex_t.unlock(); } -void DefaultMainLoop::dispatch() +void DefaultMainLoop::dispatch(std::vector& pipe_fds) { _mutex_w.lock(); @@ -159,6 +159,8 @@ void DefaultMainLoop::dispatch() nfd = nfd + 2; } + nfd += pipe_fds.size(); + pollfd fds[nfd]; DefaultWatches::iterator wi = _watches.begin(); @@ -177,14 +179,21 @@ void DefaultMainLoop::dispatch() if (_fdunlock) { + // only monitor "incoming" FD fds[nfd].fd = _fdunlock[0]; fds[nfd].events = POLLIN | POLLOUT | POLLPRI ; fds[nfd].revents = 0; + ++nfd; + } - nfd++; - fds[nfd].fd = _fdunlock[1]; - fds[nfd].events = POLLIN | POLLOUT | POLLPRI ; - fds[nfd].revents = 0; + + + for (std::vector::iterator i = pipe_fds.begin(); i != pipe_fds.end(); i++) + { + fds[nfd].fd = *i; + fds[nfd].events = POLLIN | POLLPRI | POLLRDHUP; + fds[nfd].revents = 0; + ++nfd; } _mutex_w.unlock(); diff --git a/src/pipe.cpp b/src/pipe.cpp index 01211b3..66bbe7a 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -60,28 +60,118 @@ Pipe::Pipe(void(*handler)(const void *data, void *buffer, unsigned int nbyte), c } } + +// this will block if the pipe is full. void Pipe::write(const void *buffer, unsigned int nbytes) { - // TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work... - // first write the size into the pipe... - ::write(_fd_write, static_cast (&nbytes), sizeof(nbytes)); + ssize_t rc; + ssize_t bytes_written = 0; + + // Write size handling EINTR and partial writes + while (bytes_written < sizeof(nbytes)) { + rc = ::write(_fd_write, + ((char*)static_cast (&nbytes)) + bytes_written, + sizeof(nbytes)-bytes_written); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + return; + } + continue; + } + bytes_written += rc; + } + + //Write payload handling EINTR and partial writes + bytes_written = 0; + while (bytes_written < sizeof(nbytes)) { + rc = ::write(_fd_write, + static_cast (buffer) + bytes_written, + nbytes-bytes_written); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + return; + } + continue; + } + bytes_written += rc; + } // ...then write the real data - ::write(_fd_write, buffer, nbytes); + } ssize_t Pipe::read(void *buffer, unsigned int &nbytes) { - // TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work... - // first read the size from the pipe... - ::read(_fd_read, &nbytes, sizeof(nbytes)); - //ssize_t size = 0; - return ::read(_fd_read, buffer, nbytes); + // Read size handling EINTR and partial reads. Note: Since the + // size is written in a single write() and this needs to return + // without blocking if no data is waiting, we can leave the "size" + // read as non-blocking. + nbytes = 0; + ssize_t rc; + ssize_t size = 0; + while (size < sizeof(unsigned int)) { + rc = ::read(_fd_read, ((char*)&nbytes) + size, sizeof(unsigned int) - size); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + // wait for eagain since the entire size should be showing up ASAP since + // it is written in a single call. + if (errno != EAGAIN) { + debug_log("Unexpected errno of %i when reading fd %i\n", errno, _fd_read); + return -1; + } + return 0; + } + continue; + } + size += rc; + } + + + // Change payload read to BLOCKING read so we don't fastloop and + // burn CPU waiting for the complete payload of the data to show up. + int old_flags = fcntl(_fd_read, F_GETFL); + if (old_flags == -1) { + // trouble + debug_log("%s fcntl() failed with errno %i for FD %i\n", + __FUNCTION__, errno, _fd_read); + } else { + // make blocking if was non-blocking + if (old_flags & O_NONBLOCK) { + fcntl(_fd_read, F_SETFL, old_flags & ~O_NONBLOCK); + } + } + + // Read payload handling EINTR and partial reads + size = 0; + while (size < nbytes ) { + rc = ::read(_fd_read, ((char*)buffer) + size, nbytes - size); + if (-1 == rc) { + if (errno != EINTR) { + nbytes = 0; + size = 0; + debug_log("%s read() of FD %i failed unexpected with errno %i", __FUNCTION__, _fd_read, errno); + goto cleanup; + } + continue; + } + size += rc; + } + +cleanup: + if ((old_flags != -1) && (old_flags & O_NONBLOCK)) { + //restore flags if needed + fcntl(_fd_read, F_SETFL, old_flags); + } + + return size; } void Pipe::signal() { - // TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work... - ::write(_fd_write, '\0', 1); + // Write a "message" w/size prefix of size 0 + write("", 0); }