Skip to content

Make Pipe() functionality more reliable #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/dbus-c++/eventloop.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <pthread.h>
#include <list>
#include <vector>

#include "api.h"
#include "util.h"
Expand Down Expand Up @@ -205,7 +206,7 @@ class DXXAPI DefaultMainLoop

virtual ~DefaultMainLoop();

virtual void dispatch();
virtual void dispatch(std::vector<int>& pipe_fds);

int _fdunlock[2];
private:
Expand Down
12 changes: 11 additions & 1 deletion src/eventloop-integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,18 @@ void BusDispatcher::del_pipe(Pipe *pipe)

void BusDispatcher::do_iteration()
{
std::vector<int> pipe_fds;
for (std::list <Pipe *>::iterator p_it = pipe_list.begin();
p_it != pipe_list.end();
++p_it)
{
Pipe *read_pipe = *p_it;
pipe_fds.push_back(read_pipe->_fd_read);
}


dispatch_pending();
dispatch();
dispatch(pipe_fds);
}

Timeout *BusDispatcher::add_timeout(Timeout::Internal *ti)
Expand Down
19 changes: 14 additions & 5 deletions src/eventloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ DefaultMainLoop::~DefaultMainLoop()
_mutex_t.unlock();
}

void DefaultMainLoop::dispatch()
void DefaultMainLoop::dispatch(std::vector<int>& pipe_fds)
{
_mutex_w.lock();

Expand All @@ -159,6 +159,8 @@ void DefaultMainLoop::dispatch()
nfd = nfd + 2;
}

nfd += pipe_fds.size();

pollfd fds[nfd];

DefaultWatches::iterator wi = _watches.begin();
Expand All @@ -177,14 +179,21 @@ void DefaultMainLoop::dispatch()

if (_fdunlock)
{
// only monitor "incoming" FD
fds[nfd].fd = _fdunlock[0];
fds[nfd].events = POLLIN | POLLOUT | POLLPRI ;
fds[nfd].revents = 0;
++nfd;
}

nfd++;
fds[nfd].fd = _fdunlock[1];
fds[nfd].events = POLLIN | POLLOUT | POLLPRI ;
fds[nfd].revents = 0;


for (std::vector<int>::iterator i = pipe_fds.begin(); i != pipe_fds.end(); i++)
{
fds[nfd].fd = *i;
fds[nfd].events = POLLIN | POLLPRI | POLLRDHUP;
fds[nfd].revents = 0;
++nfd;
}

_mutex_w.unlock();
Expand Down
112 changes: 101 additions & 11 deletions src/pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,118 @@ Pipe::Pipe(void(*handler)(const void *data, void *buffer, unsigned int nbyte), c
}
}


// this will block if the pipe is full.
void Pipe::write(const void *buffer, unsigned int nbytes)
{
// TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work...
// first write the size into the pipe...
::write(_fd_write, static_cast <const void *>(&nbytes), sizeof(nbytes));
ssize_t rc;
ssize_t bytes_written = 0;

// Write size handling EINTR and partial writes
while (bytes_written < sizeof(nbytes)) {
rc = ::write(_fd_write,
((char*)static_cast <const unsigned int *>(&nbytes)) + bytes_written,
sizeof(nbytes)-bytes_written);
if (-1 == rc) {
if (errno != EINTR) {
nbytes = 0;
return;
}
continue;
}
bytes_written += rc;
}

//Write payload handling EINTR and partial writes
bytes_written = 0;
while (bytes_written < sizeof(nbytes)) {
rc = ::write(_fd_write,
static_cast <const char *>(buffer) + bytes_written,
nbytes-bytes_written);
if (-1 == rc) {
if (errno != EINTR) {
nbytes = 0;
return;
}
continue;
}
bytes_written += rc;
}

// ...then write the real data
::write(_fd_write, buffer, nbytes);

}

ssize_t Pipe::read(void *buffer, unsigned int &nbytes)
{
// TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work...
// first read the size from the pipe...
::read(_fd_read, &nbytes, sizeof(nbytes));

//ssize_t size = 0;
return ::read(_fd_read, buffer, nbytes);
// Read size handling EINTR and partial reads. Note: Since the
// size is written in a single write() and this needs to return
// without blocking if no data is waiting, we can leave the "size"
// read as non-blocking.
nbytes = 0;
ssize_t rc;
ssize_t size = 0;
while (size < sizeof(unsigned int)) {
rc = ::read(_fd_read, ((char*)&nbytes) + size, sizeof(unsigned int) - size);
if (-1 == rc) {
if (errno != EINTR) {
nbytes = 0;
// wait for eagain since the entire size should be showing up ASAP since
// it is written in a single call.
if (errno != EAGAIN) {
debug_log("Unexpected errno of %i when reading fd %i\n", errno, _fd_read);
return -1;
}
return 0;
}
continue;
}
size += rc;
}


// Change payload read to BLOCKING read so we don't fastloop and
// burn CPU waiting for the complete payload of the data to show up.
int old_flags = fcntl(_fd_read, F_GETFL);
if (old_flags == -1) {
// trouble
debug_log("%s fcntl() failed with errno %i for FD %i\n",
__FUNCTION__, errno, _fd_read);
} else {
// make blocking if was non-blocking
if (old_flags & O_NONBLOCK) {
fcntl(_fd_read, F_SETFL, old_flags & ~O_NONBLOCK);
}
}

// Read payload handling EINTR and partial reads
size = 0;
while (size < nbytes ) {
rc = ::read(_fd_read, ((char*)buffer) + size, nbytes - size);
if (-1 == rc) {
if (errno != EINTR) {
nbytes = 0;
size = 0;
debug_log("%s read() of FD %i failed unexpected with errno %i", __FUNCTION__, _fd_read, errno);
goto cleanup;
}
continue;
}
size += rc;
}

cleanup:
if ((old_flags != -1) && (old_flags & O_NONBLOCK)) {
//restore flags if needed
fcntl(_fd_read, F_SETFL, old_flags);
}

return size;
}

void Pipe::signal()
{
// TODO: ignoring return of read/write generates warning; maybe relevant for eventloop work...
::write(_fd_write, '\0', 1);
// Write a "message" w/size prefix of size 0
write("", 0);
}