Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

#[async_stream] implementation #32

Merged
merged 21 commits into from
Oct 17, 2017
Merged

Conversation

Nemo157
Copy link
Contributor

@Nemo157 Nemo157 commented Oct 4, 2017

Current usage is similar to

fn check_load() -> Result<(), &'static str> { ... }

fn prep_launchpad() -> impl Future<Item = (), Error = &'static str> { ... }

#[async_stream(item = u32)]
fn blast_off(abort: bool) -> Result<(), &'static str> {
    stream_yield!(3);
    check_load()?;
    stream_yield!(2);
    if abort {
        return Err("blast off aborted");
    }
    await!(prep_launchpad())?;
    stream_yield!(1);
    Ok(())
}

where the signature

#[async_stream(item = T)]
fn foo() -> Result<(), U>

is transformed into

fn foo() -> impl Stream<Item=T, Error=U>

#[async_stream(boxed, item = T)] is also supported

Major stuff that needs to happen before merging:

  • Decide if never_type feature is ok, or come up with an alternative method to make await! compatible with both #[async] and #[async_stream] (EDIT: replaced with a local uninhabited type)
  • Rebase/merge latest changes (EDIT: rebased)
  • Add tests (EDIT: smoke tests added)
  • Pull out some helper methods to reduce duplication between #[async] and #[async_stream] (EDIT: Replaced with a single helper implementation)

Fixes #7

@Arnavion
Copy link

Arnavion commented Oct 4, 2017

@Thomasdezeeuw
Copy link

Why is the () in case of Ok, in the following code?

#[async_stream(item = "T")]
fn foo() -> Result<(), U>

Wouldn't returning Result<T, U> make more sense? Then item wouldn't be needed in the attribute either.

#[async_stream]
fn foo() -> Result<T, U>
// Becomes:
fn foo() -> impl Stream<Item=T, Error=U>

To me that makes a lot more sense.

@Nemo157
Copy link
Contributor Author

Nemo157 commented Oct 5, 2017

Shouldn't stream_yield! take Result<T, E> instead of T ?

That would be possible, and would support streams that can continue returning values even after they return an error (still not something I have ever seen used though). It doesn't match nicely with ? support though; either there are two ways of yielding an error value, one of which terminates the stream and one which doesn't, or we drop the Result return type and pass all items through stream_yield! and just use returning as the indicator for Ok(None).

First thought on how the second option would look:

#[async_stream(item = "u32", error = "&'static str")]
fn blast_off(abort: bool) {
    stream_yield!(Ok(2));
    if abort {
        stream_yield!(Err("blast off aborted"));
        return;
    }
    stream_yield!(Ok(1));
}

If the stream completes with an Err, GenFuture needs to return Ok(Async::Ready(None)) from atleast one more call to Stream::poll().

Oh, yes, that's gonna need some extra state to keep track of whether it's done that I was really hoping to avoid. The second option above would avoid this, but I don't think we want to drop ? support.

@Nemo157
Copy link
Contributor Author

Nemo157 commented Oct 5, 2017

Wouldn't returning Result<T, U> make more sense? Then item wouldn't be needed in the attribute either.

That was my original implementation, I changed it based on #7 (comment), so that the declared return type matches the values actually passed to return in the body. See #15 for my preferred solution to this, although that requires a breaking change for returning a Future

@Thomasdezeeuw
Copy link

@Nemo157 Looking through the comments quickly I haven't found a good reason why returning Result<T, U> wouldn't be possible, but I might have a too simplistic view of the issue though. It seems it's only because of an implementation detail? If that is the case I don't think it's a good reason to accept it. As for breaking changes isn't that why the code is still pre 1.0?

I would push for returning an Result<T, E> even if it takes some more time and breaking changes.

@Arnavion
Copy link

Arnavion commented Oct 5, 2017

@Nemo157

It doesn't match nicely with ? support though; either there are two ways of yielding an error value, one of which terminates the stream and one which doesn't, or we drop the Result return type and pass all items through stream_yield! and just use returning as the indicator for Ok(None).

Yes, two ways of yielding an error value is also what I chose for tiny-async-await. In my opinion it isn't a problem.

@Nemo157
Copy link
Contributor Author

Nemo157 commented Oct 5, 2017

@Thomasdezeeuw

Looking through the comments quickly I haven't found a good reason why returning Result<T, U> wouldn't be possible

It's not that it's not possible, I even still have a branch implementing it, it's that you then hit one of two issues

  • you don't allow the user of the macro to return Ok(T) (this is what my old implementation does). You then have a return type on the function that doesn't match the allowed type of returned values.
  • you allow the user of the macro to return Ok(T). This then provides them multiple ways to yield a value from the stream. I much prefer keeping an API constrained to one obvious way to do things to minimize the amount of thinking required when using it.

@Arnavion

Yes, two ways of yielding an error value is also what I chose for tiny-async-await. In my opinion it isn't a problem.

After reading through that linked thread I think the start of rust-lang/futures-rs#206 (comment) really makes being able to both yield and return an error make sense to me, they are two distinct error cases that just happen to be constrained to the same type by the trait design. I'll try and push an update tonight changing this and fixing the end state to allow poll after returning an error.

Copy link
Owner

@alexcrichton alexcrichton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks great, thanks!

Could you also update the README as well?

examples/echo.rs Outdated
@@ -43,19 +42,17 @@ fn main() {
core.run(server).unwrap();
}

#[async]
fn handle_client(socket: TcpStream) -> io::Result<u64> {
#[async_stream(item = "u64")]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to get better error messages we may want to avoid the usage of a string here and instead use raw tokens, perhaps something like:

#[async_stream(item = u64)]

perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that Rust allows that now, I thought I had tried something similar when custom derives first came out and had it denied by rustc before calling into the proc macro.

syn fails to parse it since it's not really a valid attribute, I guess I can just switch from using syn to parse to working on the token tree directly.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syn fails to parse it since it's not really a valid attribute, I guess I can just switch from using syn to parse to working on the token tree directly.

It is a valid attribute when #[feature(proc_macro)] is enabled in the source. Also, the git version of syn that this crate uses will parse it since it has support for arbitrary tts in attributes. It's not a valid MetaItem so Attribute::meta_item() will return None, and you have to operate on Attribute::tts directly.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes as @Arnavion said you can have arbitrary streams of tokens in attributes with #![feature(proc_macro)] which would be exploited here. The parsing would look a little odd on the syn side of things, but you can look here for an example

}
};

// We've got to get a bit creative with our handling of arguments. For a
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this avoid duplication with #[async] and be refactored to a common implementation?

macro_rules! stream_yield {
($e:expr) => ({
let e = $e;
yield ::futures::Async::Ready(e)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this perhaps just be yield e? I think it'd be fine to use two different conversions internally for the two attributes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean by two different conversions here, could you explain a bit more?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sure yeah, basically I just mean that yielding in #[async] should be just a literal yield, and yielding in #[async_stream] would be yield e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of having the stream_yield! macro, just do a rewrite from yield e; to yield ::futures::Async::Ready(e); as part of the #[async_stream] macro? That does have the downside of meaning you can't write macros that yield values, but I assume those would be rare enough to not matter. I'll try and get this and the attribute parsing done tomorrow.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no I was thinking we'd keep the stream_yield! macro, but it would be implemented with yield e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, how would that fit with await! and #[async] for yielding Async::NotReady then, just have the user do something like stream_yield!(Async::Ready(e))? I thought part of the purpose of async/await syntax was to allow composing futures/streams together without having to work with some of the more verbose underlying types like Async.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, that makes sense! In that case let's leave as-is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on how much magic (tm) is acceptable, #[async] and #[async_stream] could themselves expand await! in different ways that make sense for each of them?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah I'm fine using the strategy here, seems fine to me!

src/lib.rs Outdated
@@ -15,6 +15,7 @@
#![feature(conservative_impl_trait)]
#![feature(generator_trait)]
#![feature(use_extern_macros)]
#![feature(never_type)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd personally prefer to not block on this stabilizing, could we avoid this feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to a custom uninhabited type.

@Nemo157
Copy link
Contributor Author

Nemo157 commented Oct 11, 2017

I've changed the argument to be a path instead of a string and added some basic details to the readme. Probably should have a full technical details section in the readme as well, I'll try and get to that by the end of the week.

@@ -311,7 +406,7 @@ impl Folder for ExpandAsyncFor {
}
}
futures_await::Async::NotReady => {
yield;
yield futures_await::Async::NotReady;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be best using ::futures::....

Could a test be added for this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extern crate futures_await; line just above. This whole #[async] for expander uses that instead of via ::futures.

quote! {
Box<::futures::Stream<
Item = !,
Error = <! as ::futures::__rt::IsResult>::Err,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it may not work? Maybe a test could be added for #[async_stream(boxed, item = ...)]?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha nevermind, missed the new replace_bangs function

src/lib.rs Outdated
GeneratorState::Yielded(Async::NotReady)
=> Ok(Async::NotReady),
GeneratorState::Yielded(Async::Ready(_))
=> unreachable!("Mu doesn't exist"),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I believe you can do match mu {} instead of an unreachable!

src/lib.rs Outdated
type Error = <<T::Yield as IsAsync>::Item as IsResult>::Err;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.done { return Ok(Async::Ready(None)) }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to elide the boolean here, right?

Copy link
Contributor Author

@Nemo157 Nemo157 Oct 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's related to @Arnavion's comment: #32 (comment)

I tried the test in e9d53a5 on the changes before 44fd004 and had it fail. Unless your conclusion from rust-lang/futures-rs#206 (comment) has changed then there needs to be the ability to track one more state change after the generator completes if it returns an error (and to support ? it needs to retain the ability to return an error).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right yeah, makes sense

src/lib.rs Outdated

impl<T> Stream for GenStream<T>
where T: Generator,
T::Yield: IsAsync,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the IsAsync trait needed here? Or can this just use T: Generator<Yield = Async<U>>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's required to allow yielding results and have the equality constraint <<T::Yield as IsAsync>::Item as IsResult>::Err = <T::Return as IsResult>::Err. If there's some way to encode that equality without it then it can be dropped, I'll take a look at all these bounds and see if there's some sort of simplification that can happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified it to

where
    T: Generator,
    T::Yield: IsAsync<Item = Result<U, <T::Return as IsResult>::Err>>,
    T::Return: IsResult<Ok = ()>,

I attempted

where
    T: Generator<Yield = Async<Result<U, <T::Return as IsResult>::Err>>>,
    T::Return: IsResult<Ok = ()>,

but that fails because of error[E0391]: unsupported cyclic reference between types/traits detected

@alexcrichton
Copy link
Owner

Looking great! I'm pretty ok merging this as-is basically.

One last thing we may want to test is a few error messages? Just basic scenarios where #[async_stream] has mistakes, and then a few tests can be added to ensure the error messages look reasonable.

README.md Outdated
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> io::Result<()> {
for url in urls {
let s = await!(fetch(client, url))?;
stream_yield!(Ok(s));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intended to be stream_yield!(s)?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so it looks like this was changed during discussion?

I'd personally prefer to only yield the item type from stream_yield!. Our eventual desired semantics for Stream is that an Err fuses (ends) the stream as well as returning None, so I'm hoping we can get a jump start on that here by only allowing yielding items and using return Err(...) for errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, that's a change I'm very much in favour of. For some reason I got the impression from the linked issue that that wouldn't be changing any time soon, if it is a concrete plan then I can definitely switch this back.

examples/echo.rs Outdated
@@ -43,19 +42,17 @@ fn main() {
core.run(server).unwrap();
}

#[async]
fn handle_client(socket: TcpStream) -> io::Result<u64> {
#[async_stream(item = u64)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this stay as it was before? This was mostly intended to showcase the similarity to an otherwise synchronous echo server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
AsyncStreamArg(term, Some(ty)) => {
if term == "item" {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this also panic if item_ty is Some already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for arg in args.0 {
match arg {
AsyncStreamArg(term, None) => {
if term == "boxed" {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to below, could this panic if boxed is already true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

--> $DIR/bad-return-type.rs:17:36
|
17 | #[async_stream(item = Option<i32>)]
| ____________________________________^
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a particularly confusing error message, especially with respect to span information. Is there anything we can do to improve this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The horribleness of this error message comes from stream_yield! expanding to

let e = $e;
yield ::futures::Async::Ready(e)

the span which is in error is the e in the second line of the expansion (although, why it then highlights what it does I don't know). Changing it to simply

yield ::futures::Async::Ready($e)

changes the error message to what it should be:

error[E0308]: mismatched types
 --> $DIR/bad-return-type.rs:25:19
   |
25 |     stream_yield!(val);
   |                   ^^^
   |                   |
   |                   expected enum `std::option::Option`, found integral variable
   |                   help: try using a variant of the expected type: `Some(val)`
   |
   = note: expected type `std::option::Option<_>`
              found type `{integer}`

but causes test failures

error[E0597]: `t` does not live long enough
   --> tests/smoke.rs:98:19
    |
98  |     stream_yield!(t.clone());
    |                   ^ does not live long enough
...
101 | }
    | - borrowed value only lives until here
    |
    = note: borrowed value must be valid for the static lifetime...

I believe those are related to rust-lang/rust#44197, once that is resolved then the stream_yield! expansion can be updated and this should be fixed.

I'd prefer to merge with the current error message and I'll open an issue to fix it once the upstream issue is fixed.

Copy link
Contributor Author

@Nemo157 Nemo157 Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to change the expansion now and change the test to

let c = t.clone();
stream_yield!(c);

Doing this instead would increase the churn once rust-lang/rust#44197 is fixed, every user of this library that had to make an extra temporary would have to fix their code at some point. As long as that issue doesn't take too long to fix I think we can live with the worse error message for a short while.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yeah this sounds like a fine tradeoff to me, mind leaving a comment in the definition of the stream_yield! macro pointing to the upstream issue?

@alexcrichton
Copy link
Owner

Thanks so much again! I left some comments, but it looks like the IsAsync bits may have disappered from the comments. I wonder, if we only allow stream_yield! with item types, does that obviate the need for IsAsync?

@Nemo157
Copy link
Contributor Author

Nemo157 commented Oct 16, 2017

Yep, with not having to keep the error equality IsAsync is no longer needed 🎉

@alexcrichton
Copy link
Owner

Thanks so much @Nemo157! With that last comment I'm good to merge

@alexcrichton alexcrichton merged commit c35e035 into alexcrichton:master Oct 17, 2017
@alexcrichton
Copy link
Owner

👍

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants