Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support infinite streams #1857

Open
udoprog opened this issue Sep 3, 2019 · 17 comments
Open

Support infinite streams #1857

udoprog opened this issue Sep 3, 2019 · 17 comments
Labels
A-stream Area: futures::stream C-feature-request

Comments

@udoprog
Copy link
Contributor

udoprog commented Sep 3, 2019

This is an issue opened to consider adding support for infinite streams as a first-class abstraction.

Infinite streams are streams which do not terminate, so they yield values of T directly instead of Option<T>. This is useful for things which conceptually never ends, like Intervals, channels, or lazy settings updates. The fact that we currently have a panicking select_next_some speaks to me that there might be space for this.

Infinite streams would look something like this:

trait InfiniteStream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Item>;
}

Or in a perfect world, consider making the existing Stream not resolve with an Option<T> and make terminating streams a specialization on this. Similarly to how Result<T, E> was removed from Future. This in my mind would eventually suit streams as a language item better, like:

async fn produces_stream() -> u32 {
    yield 1u32;
    return 42;
}

Which could produce an implementation of Stream<Item = YieldOrReturn<u32, u32>>. Which for Stream<Item=YieldOrReturn<u32, ()>> would have the same semantics as Stream<Item = Option<u32>>.

An infinite stream with could look something like:

async fn produces_stream() {
    loop {
        yield 1u32;
    }
}

And would produce an implementation of Stream<Item = u32>.

One sticking point I'm not sure about is if combinators could be implemented once instead of once per stream variant (terminating vs infinite).

CC: #1765

@canndrew
Copy link
Contributor

canndrew commented Sep 3, 2019

Stream should have two associated items: Yield and Return. That way you can have:

  • The current stream behaviour

      Stream<Yield = T, Return = ()>
    
  • Infinite streams

      Stream<Yield = T, Return = !>
    
  • infinite streams that have fatal errors

      Stream<Yield = T, Return = E>
    
  • finite streams that have both fatal and non-fatal errors and allow you to distinguish them

      Stream<Yield = Result<T, NonFatalE>, Return = Result<(), FatalE>>
    
  • etc.

@udoprog
Copy link
Contributor Author

udoprog commented Sep 3, 2019

Stream should have two associated items: Yield and Return. That way you can have:

I can't help but feel this reasoning is similar to what lead to Future polling to Poll<Self::Item, Self::Error>, which is an alias for Result<Async<Self::Item>, Self::Error>. I fear that kind of specialization mainly hurts us in the long term since it tends to color the entire ecosystem. It's also worth considering that never_type is not yet stable.

Is there a concrete benefit with this, over specializing over an enum like YieldOrReturn<Y, R> (which also happens to be what GeneratorState is) similarly to what TryFuture and TryStream does now?

@Ralith
Copy link
Contributor

Ralith commented Sep 5, 2019

I don't think this is quite the same situation as Future's historical Error type, because whether a Future returned Ok(Ready(_)) or Err(_) did not confer any information about the internal state of the future--it's completed regardless--whereas returning Yield or Return distinguishes whether the Stream may still be polled. That said, I think the differences between these two approaches are mostly cosmetic, so maybe it would be instructive to look at a sampling of combinators defined using each, and some uses of them, and see how they compare?

@nbdd0121
Copy link

nbdd0121 commented Sep 5, 2019

I am curious about the impact of the Stream design on potential future for await syntax, since Stream is supposed to be an async version of Iterator, isn't it?

@seanmonstar
Copy link
Contributor

Relevant previous discussion: #1059

@cramertj
Copy link
Member

I'm happy to leave this open for discussion, but my personal temperature on this is that we don't need native support for streams with a separate final output type from their normal yielded type. The benefits gained from modeling these kinds of streams differently aren't worth the costs, IMO. @sfackler previously made an effort to develop these stream types in a separate crate, and I believe came to the same conclusion.

@udoprog
Copy link
Contributor Author

udoprog commented Sep 12, 2019

@cramertj It isn't clear to me what your opinion on dropping the mandatory Option<Self::Item> resolve in favor of just Self::Item, and model it similarly to fallible futures (extension and blanket traits).

Reviewing the code I've written so far which uses futures, I currently don't have a single use case for finite streams. They are always either errors or panics (through e.g. select_next_some). Like: https://github.com/udoprog/OxidizeBot/blob/f3356a6f1c9ee49b4e30c57a773d8dc00477675c/bot/src/player.rs#L1436

@cramertj
Copy link
Member

cramertj commented Sep 12, 2019

I have many usecases for finite streams. Consider the for_each combinator-- how would it know when to terminate? Presumably it would only be available for Stream<Item = Option<T>>?

@cramertj
Copy link
Member

cramertj commented Sep 12, 2019

Similarly, .map etc. taking an Option would result in loads of stream.map(|x| x.map(|x| ... )), which is a pretty severe ergonomic regression.

@udoprog
Copy link
Contributor Author

udoprog commented Sep 13, 2019

Presumably it would only be available for Stream<Item = Option>?

Yeah. I'd hope to be able to mimic the current API using extension traits, similarly to how we have TryFuture.

Similarly, .map etc. taking an Option would result in loads of stream.map(|x| x.map(|x| ... )), which is a pretty severe ergonomic regression.

That is one of the sticking points mentioned above. There might be a way to avoid having multiple combinator implementations, but I'm not sure since I haven't put in the work to try it. Otherwise different functions with distinct names would be an option, one conditional on Stream<Item = Option<T>>.

It is worth noting that with the advent if async/await I'd expect most combinators to see much less use in favor of writing them out like imperative code. At least that's consistent with my experience with async/await so far. I don't use for_each, instead I write:

async {
    while let Some(result) = stream.next().await {
        /*. */
    }

    panic!("stream ended");
}

My more general point was that, apart from FuturesUnordered (who's contract is what caused me to write up this issue in #1765), I can't actually recall from the top of my head a stream implementation which is finite that I've used during the last couple of months. Note that there's plenty that wraps a Result for fallible streams though which terminate for other reasons.

@Ralith
Copy link
Contributor

Ralith commented Sep 13, 2019

I find the way channels terminate when the senders have all been dropped to be useful for gracefully shutting down message-driven spawned tasks, at least.

@udoprog
Copy link
Contributor Author

udoprog commented Sep 13, 2019

I find the way channels terminate when the senders have all been dropped to be useful for gracefully shutting down message-driven spawned tasks, at least.

It's kind of funny that I didn't even know that the contract for channels worked like this because it was put behind a Stream 🙂. That's fair. But mimicking what std::sync::channel does with RecvError might be clearer from a documentation perspective and having a stronger correlation between the APIs. async-std might even be facing some headaches if they decide to mimic the std api because of this: async-rs/async-std#72

@Ralith
Copy link
Contributor

Ralith commented Sep 13, 2019

Yeah, that use-case would still work just fine if it was an infinite stream that had a sentinel value like Err(Terminated). In Quinn I currently have some streams that yield Some(Err(_)) forever rather than returning None because I assume readers will bail out on encountering an error and going to extra effort to do something else after returning a terminal value feels strange, but this has already bitten me in test code that relied abstractly on a None eventually arriving. Having any sort of more flexible pattern for a termination value would have avoided this confusion.

In other words, a problem with the current API is that the "separate final output type" is required to be (), but often you really want it to be ! or some richer error type. Having to return your error and then the actual recognized termination value requires additional state and boilerplate.

The dwindling importance of combinators is a valid point in some respects, but if you specifically need to transform a stream to pass it on to another API, combinators remain the only option at least until we get something like async generators. There's also no easy way to supplant e.g. for_each_concurrent.

@Nemo157
Copy link
Member

Nemo157 commented Sep 14, 2019

But mimicking what std::sync::channel does with RecvError might be clearer from a documentation perspective and having a stronger correlation between the APIs.

std::sync::mpsc::IntoIter has the same behaviour of turning a disconnection into end-of-iterator. The main difference here seems to just be that futures::channel::mpsc::Receiver doesn't have a standalone async fn recv(&self) -> ? and requires you to go via the Stream impl.

@Nemo157
Copy link
Member

Nemo157 commented Sep 14, 2019

I can't actually recall from the top of my head a stream implementation which is finite that I've used during the last couple of months. Note that there's plenty that wraps a Result for fallible streams though which terminate for other reasons.

I've had the opposite experience, the majority of my Stream usage has been wrappers around multi-message IO channels, which all gracefully terminate when disconnected. The next major part has been mpsc channels which are in some way connected to an IO channel, so will terminate when the IO terminates. The only infinite streams I can think of using are simple timer ticks, and the Incoming stream from a TcpListener.

@udoprog
Copy link
Contributor Author

udoprog commented Sep 15, 2019

I've had the opposite experience, the majority of my Stream usage has been wrappers around multi-message IO channels, which all gracefully terminate when disconnected.

Hm. Do these tend to resolve to Option<Result<T, E>>, or just Option<T>? I mostly spend my time writing high-level async code. Things that should run until the application stops (or errors) and the futures are dropped.

@Nemo157
Copy link
Member

Nemo157 commented Sep 16, 2019

They're mostly per-client Stream<Item = Result<T, E>>, they could have an IO error (e.g. something in the network connection fails) or a parse error (e.g. the other side sends an invalid message), which would normally result in me just abandoning that task and dropping all other connections it's dealing with (all associated to that one client). If the other side closes the connection the the stream will cleanly finish and I instead close all other connections and wait for them to complete before the task finishes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream C-feature-request
Projects
None yet
Development

No branches or pull requests

8 participants