Skip to content

Commit

Permalink
Add wait(deadline future) implementation. (#535)
Browse files Browse the repository at this point in the history
* Add waitUntil(deadline) implementation.

* Add one more test.

* Fix rare race condition and tests for it.

* Rename waitUntil() to wait().
  • Loading branch information
cheatfate committed Apr 20, 2024
1 parent d184a92 commit 0f0ed1d
Show file tree
Hide file tree
Showing 2 changed files with 481 additions and 10 deletions.
94 changes: 92 additions & 2 deletions chronos/internal/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,60 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] {.
inline, deprecated: "Use withTimeout(Future[T], Duration)".} =
withTimeout(fut, timeout.milliseconds())

proc waitUntilImpl[F: SomeFuture](fut: F, retFuture: auto,
deadline: auto): auto =
var timeouted = false

template completeFuture(fut: untyped, timeout: bool): untyped =
if fut.failed():
retFuture.fail(fut.error(), warn = false)
elif fut.cancelled():
if timeout:
# Its possible that `future` could be cancelled in some other place. In
# such case we can't detect if it was our cancellation due to timeout,
# or some other cancellation.
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
retFuture.cancelAndSchedule()
else:
when type(fut).T is void:
retFuture.complete()
else:
retFuture.complete(fut.value)

proc continuation(udata: pointer) {.raises: [].} =
if not(retFuture.finished()):
if timeouted:
# When timeout is exceeded and we cancelled future via cancelSoon(),
# its possible that future at this moment already has value
# and/or error.
fut.completeFuture(timeouted)
return
if not(fut.finished()):
timeouted = true
fut.cancelSoon()
else:
fut.completeFuture(false)

var cancellation: proc(udata: pointer) {.gcsafe, raises: [].}
cancellation = proc(udata: pointer) {.gcsafe, raises: [].} =
deadline.removeCallback(continuation)
if not(fut.finished()):
fut.cancelSoon()
else:
fut.completeFuture(false)

if fut.finished():
fut.completeFuture(false)
else:
if deadline.finished():
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
retFuture.cancelCallback = cancellation
fut.addCallback(continuation)
deadline.addCallback(continuation)
retFuture

proc waitImpl[F: SomeFuture](fut: F, retFuture: auto, timeout: Duration): auto =
var
moment: Moment
Expand Down Expand Up @@ -1606,7 +1660,8 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
## TODO: In case when ``fut`` got cancelled, what result Future[T]
## should return, because it can't be cancelled too.
var
retFuture = newFuture[T]("chronos.wait()", {FutureFlag.OwnCancelSchedule})
retFuture = newFuture[T]("chronos.wait(duration)",
{FutureFlag.OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.

Expand All @@ -1621,6 +1676,28 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
else:
wait(fut, timeout.milliseconds())

proc wait*[T](fut: Future[T], deadline: SomeFuture): Future[T] =
## Returns a future which will complete once future ``fut`` completes
## or if ``deadline`` future completes.
##
## If `deadline` future completes before future `fut` -
## `AsyncTimeoutError` exception will be raised.
##
## Note: `deadline` future will not be cancelled and/or failed.
##
## Note: While `waitUntil(future)` operation is pending, please avoid any
## attempts to cancel future `fut`. If it happens `waitUntil()` could
## introduce undefined behavior - it could raise`CancelledError` or
## `AsyncTimeoutError`.
##
## If you need to cancel `future` - cancel `waitUntil(future)` instead.
var
retFuture = newFuture[T]("chronos.wait(future)",
{FutureFlag.OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.
waitUntilImpl(fut, retFuture, deadline)

proc join*(future: FutureBase): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Returns a future which will complete once future ``future`` completes.
Expand Down Expand Up @@ -1783,8 +1860,21 @@ proc wait*(fut: InternalRaisesFuture, timeout = InfiniteDuration): auto =
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)

let
retFuture = newFuture[T]("chronos.wait()", {OwnCancelSchedule})
retFuture = newFuture[T]("chronos.wait(duration)", {OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.

waitImpl(fut, retFuture, timeout)

proc wait*(fut: InternalRaisesFuture, deadline: InternalRaisesFuture): auto =
type
T = type(fut).T
E = type(fut).E
InternalRaisesFutureRaises = E.prepend(CancelledError, AsyncTimeoutError)

let
retFuture = newFuture[T]("chronos.wait(future)", {OwnCancelSchedule})
# We set `OwnCancelSchedule` flag, because we going to cancel `retFuture`
# manually at proper time.

waitUntilImpl(fut, retFuture, deadline)
Loading

0 comments on commit 0f0ed1d

Please sign in to comment.