Skip to content

Commit

Permalink
fix for rare case with association to closed flow
Browse files Browse the repository at this point in the history
  • Loading branch information
zenomt committed Jan 23, 2022
1 parent b021d3e commit 0610d3a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ protocol implementation is intended to be adaptable to any host program
environment.

The library is intended for clients, servers, and P2P applications. It includes
the necessary helpers and callbacks to support P2P introduction and load
the necessary helpers and callback hooks to support P2P introduction and load
balancing.

The [`test`](test/) directory includes unit tests and examples. Of special
Expand All @@ -33,12 +33,12 @@ The application can open sending flows to new or current endpoints with
`RTMFP::openFlow()` and `Flow::openFlow`, and can open associated return flows
with `RecvFlow::openReturnFlow()`.

The application can accept new flows by implementing the `onRecvFlow` callbacks
The application can accept new flows by setting the `onRecvFlow` callbacks
on the `RTMFP` (for bare incoming flows) or on `SendFlow`s (for associated
return flows).

The application can send messages to far peers with `SendFlow::write()`, and
receive messages from far peers by implementing the `onMessage` callback on
receive messages from far peers by setting the `onMessage` callback on
`RecvFlow`s. Messages can expire and be abandoned if not started or delivered
by per-message deadlines, or by arbitrary application logic using the
[`WriteReceipt`](include/rtmfp/WriteReceipt.hpp)s returned by `SendFlow::write()`.
Expand Down
5 changes: 3 additions & 2 deletions include/rtmfp/rtmfp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class Flow : public Object {
Flow(RTMFP *rtmfp);
Flow() = delete;

std::shared_ptr<SendFlow> basicOpenFlow(const Bytes &metadata, const RecvFlow *assoc, Priority pri);
std::shared_ptr<SendFlow> basicOpenFlow(const Bytes &metadata, RecvFlow *assoc, Priority pri);

RTMFP *m_rtmfp; // weak ref
std::shared_ptr<Session> m_session;
Expand Down Expand Up @@ -265,7 +265,7 @@ class SendFlow : public Flow {
friend class Flow;
struct SendFrag;

SendFlow(RTMFP *rtmfp, const Bytes &epd, const Bytes &metadata, const RecvFlow *assoc, Priority pri);
SendFlow(RTMFP *rtmfp, const Bytes &epd, const Bytes &metadata, RecvFlow *assoc, Priority pri);
~SendFlow();

std::shared_ptr<WriteReceipt> basicWrite(const void *message, size_t len, Time startWithin, Time finishWithin);
Expand Down Expand Up @@ -306,6 +306,7 @@ class SendFlow : public Flow {
long m_last_send_queue_name;
State m_state;
std::shared_ptr<Session> m_openingSession;
std::shared_ptr<RecvFlow> m_tmp_association; // only needed until flow is established
SumList<std::shared_ptr<SendFrag> > m_send_queue;
std::shared_ptr<Timer> m_persistTimer;
};
Expand Down
2 changes: 1 addition & 1 deletion src/Flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Time Flow::getSessionCongestionDelay() const

// ---

std::shared_ptr<SendFlow> Flow::basicOpenFlow(const Bytes &metadata, const RecvFlow *assoc, Priority pri)
std::shared_ptr<SendFlow> Flow::basicOpenFlow(const Bytes &metadata, RecvFlow *assoc, Priority pri)
{
std::shared_ptr<SendFlow> rv;

Expand Down
23 changes: 22 additions & 1 deletion src/SendFlow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ size_t SendFlow::SendFrag::size_outstanding(const std::shared_ptr<SendFrag>& val

// ---

SendFlow::SendFlow(RTMFP *rtmfp, const Bytes &epd, const Bytes &metadata, const RecvFlow *assoc, Priority pri) :
SendFlow::SendFlow(RTMFP *rtmfp, const Bytes &epd, const Bytes &metadata, RecvFlow *assoc, Priority pri) :
Flow(rtmfp),
m_flow_id(-1),
m_epd(epd),
Expand All @@ -58,7 +58,10 @@ SendFlow::SendFlow(RTMFP *rtmfp, const Bytes &epd, const Bytes &metadata, const
{
Option::append(USERDATA_OPTION_METADATA, metadata.data(), metadata.size(), m_startup_options);
if(assoc)
{
Option::append(USERDATA_OPTION_RETURN_ASSOCIATION, assoc->m_flow_id, m_startup_options);
m_tmp_association = share_ref(assoc);
}
Option::append(m_startup_options);
}

Expand Down Expand Up @@ -376,6 +379,20 @@ bool SendFlow::assembleData(PacketAssembler *packet, int pri)
if(not (m_send_queue.has(startName) and m_send_queue.at(startName)->m_in_flight))
startName = m_send_queue.first();

// handle a (hopefully rare) case where we are associated in return to a flow that
// has completely closed (so the other end has forgotten the flow ID), but we're still
// starting up.
if(m_startup_options.size() and m_tmp_association and (m_tmp_association->m_state >= RecvFlow::RF_COMPLETE_LINGER))
{
// clear the startup options. it's possible that the other end has already
// seen our startup options, so things are fine. if not and this ends up being the
// first one, the other end will reject the flow because now there's no metadata either.
// this protects against accidentally associating to a new different flow with
// the old flow's ID.
m_startup_options.clear();
m_tmp_association.reset();
}

// fill up a packet
for(long name = startName; name > 0; name = m_send_queue.next(name))
{
Expand Down Expand Up @@ -504,7 +521,10 @@ void SendFlow::onAck(uint8_t chunkType, size_t bufferBytesAvailable, uintmax_t c
return;

if(not m_startup_options.empty())
{
m_startup_options.clear();
m_tmp_association.reset();
}

m_rx_buffer_size = bufferBytesAvailable;

Expand Down Expand Up @@ -644,6 +664,7 @@ void SendFlow::gotoStateClosed()
m_persistTimer->cancel();
m_persistTimer.reset();
}
m_tmp_association.reset();
}
}

Expand Down

0 comments on commit 0610d3a

Please sign in to comment.