Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
Merge pull request #32 from Nemo157/stream
Browse files Browse the repository at this point in the history
#[async_stream] implementation
  • Loading branch information
alexcrichton committed Oct 17, 2017
2 parents f6a9c6a + 82fc83d commit c35e035
Show file tree
Hide file tree
Showing 20 changed files with 588 additions and 58 deletions.
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ fn foo() -> io::Result<i32> {
}
```

And finally you can also have "async `for` loops" which operate over the
[`Stream`] trait:
You can also have "async `for` loops" which operate over the [`Stream`] trait:

```rust
#[async]
Expand All @@ -78,6 +77,32 @@ An async `for` loop will propagate errors out of the function so `message` has
the `Item` type of the `stream` passed in. Note that async `for` loops can only
be used inside of an `#[async]` function.

And finally, you can create a `Stream` instead of a `Future` via
`#[async_stream(item = _)]`:

```rust
#[async]
fn fetch(client: hyper::Client, url: &'static str) -> io::Result<String> {
// ...
}

/// Fetch all provided urls one at a time
#[async_stream(item = String)]
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> io::Result<()> {
for url in urls {
let s = await!(fetch(client, url))?;
stream_yield!(s);
}
Ok(())
}
```

`#[async_stream]` must have an item type specified via `item = some::Path` and
the values output from the stream must be wrapped into a `Result` and yielded
via the `stream_yield!` macro. This macro also supports the same features as
`#[async]`, an additional `boxed` argument to return a `Box<Stream>`, async
`for` loops, etc.

[`Future`]: https://docs.rs/futures/0.1.13/futures/future/trait.Future.html
[`Stream`]: https://docs.rs/futures/0.1.13/futures/stream/trait.Stream.html

Expand Down
4 changes: 4 additions & 0 deletions futures-async-macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ proc-macro2 = { version = "0.1", features = ["unstable"] }
git = 'https://github.com/dtolnay/syn'
features = ["full", "fold", "parsing", "printing"]
default-features = false

[dependencies.synom]
git = 'https://github.com/dtolnay/syn'
default-features = false
218 changes: 178 additions & 40 deletions futures-async-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,23 @@ extern crate proc_macro;
#[macro_use]
extern crate quote;
extern crate syn;
#[macro_use]
extern crate synom;

use proc_macro2::Span;
use proc_macro::{TokenStream, TokenTree, Delimiter, TokenNode};
use quote::{Tokens, ToTokens};
use syn::*;
use syn::delimited::Delimited;
use syn::fold::Folder;

#[proc_macro_attribute]
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Handle arguments to the #[async] attribute, if any
let attribute = attribute.to_string();
let boxed = if attribute == "( boxed )" {
true
} else if attribute == "" {
false
} else {
panic!("the #[async] attribute currently only takes `boxed` as an arg");
};

fn async_inner<F>(
boxed: bool,
function: TokenStream,
gen_function: Tokens,
return_ty: F)
-> TokenStream
where F: FnOnce(&Ty) -> proc_macro2::TokenStream {
// Parse our item, expecting a function. This function may be an actual
// top-level function or it could be a method (typically dictated by the
// arguments). We then extract everything we'd like to use.
Expand Down Expand Up @@ -164,30 +162,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Basically just take all those expression and expand them.
let block = ExpandAsyncFor.fold_block(*block);

// TODO: can we lift the restriction that `futures` must be at the root of
// the crate?

let output_span = first_last(&output);
let return_ty = if boxed {
quote! {
Box<::futures::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
>>
}
} else {
// Dunno why this is buggy, hits weird typecheck errors in tests
//
// quote! {
// impl ::futures::Future<
// Item = <#output as ::futures::__rt::MyTry>::MyOk,
// Error = <#output as ::futures::__rt::MyTry>::MyError,
// >
// }
quote! { impl ::futures::__rt::MyFuture<!> + 'static }
};
let return_ty = respan(return_ty.into(), &output_span);
let return_ty = replace_bang(return_ty, &output);
let return_ty = return_ty(&output);

let block_inner = quote! {
#( let #patterns = #temp_bindings; )*
Expand All @@ -207,7 +182,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
#[allow(unreachable_code)]
{
return __e;
loop { yield }
loop { yield ::futures::Async::NotReady }
}
};
let mut gen_body = Tokens::new();
Expand All @@ -218,7 +193,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Give the invocation of the `gen` function the same span as the output
// as currently errors related to it being a result are targeted here. Not
// sure if more errors will highlight this function call...
let gen_function = quote! { ::futures::__rt::gen };
let output_span = first_last(&output);
let gen_function = respan(gen_function.into(), &output_span);
let body_inner = quote! {
#gen_function (move || #gen_body)
Expand Down Expand Up @@ -247,6 +222,99 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
output.into()
}

#[proc_macro_attribute]
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Handle arguments to the #[async] attribute, if any
let attribute = attribute.to_string();
let boxed = if attribute == "( boxed )" {
true
} else if attribute == "" {
false
} else {
panic!("the #[async] attribute currently only takes `boxed` as an arg");
};

async_inner(boxed, function, quote! { ::futures::__rt::gen }, |output| {
// TODO: can we lift the restriction that `futures` must be at the root of
// the crate?
let output_span = first_last(&output);
let return_ty = if boxed {
quote! {
Box<::futures::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
>>
}
} else {
// Dunno why this is buggy, hits weird typecheck errors in tests
//
// quote! {
// impl ::futures::Future<
// Item = <#output as ::futures::__rt::MyTry>::MyOk,
// Error = <#output as ::futures::__rt::MyTry>::MyError,
// >
// }
quote! { impl ::futures::__rt::MyFuture<!> + 'static }
};
let return_ty = respan(return_ty.into(), &output_span);
replace_bang(return_ty, &output)
})
}

#[proc_macro_attribute]
pub fn async_stream(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Handle arguments to the #[async_stream] attribute, if any
let args = syn::parse::<AsyncStreamArgs>(attribute)
.expect("failed to parse attribute arguments");

let mut boxed = false;
let mut item_ty = None;

for arg in args.0 {
match arg {
AsyncStreamArg(term, None) => {
if term == "boxed" {
if boxed {
panic!("duplicate 'boxed' argument to #[async_stream]");
}
boxed = true;
} else {
panic!("unexpected #[async_stream] argument '{}'", term);
}
}
AsyncStreamArg(term, Some(ty)) => {
if term == "item" {
if item_ty.is_some() {
panic!("duplicate 'item' argument to #[async_stream]");
}
item_ty = Some(ty);
} else {
panic!("unexpected #[async_stream] argument '{}'", quote!(#term = #ty));
}
}
}
}

let boxed = boxed;
let item_ty = item_ty.expect("#[async_stream] requires item type to be specified");

async_inner(boxed, function, quote! { ::futures::__rt::gen_stream }, |output| {
let output_span = first_last(&output);
let return_ty = if boxed {
quote! {
Box<::futures::Stream<
Item = !,
Error = <! as ::futures::__rt::IsResult>::Err,
>>
}
} else {
quote! { impl ::futures::__rt::MyStream<!, !> + 'static }
};
let return_ty = respan(return_ty.into(), &output_span);
replace_bangs(return_ty, &[&item_ty, &output])
})
}

#[proc_macro]
pub fn async_block(input: TokenStream) -> TokenStream {
let input = TokenStream::from(TokenTree {
Expand All @@ -268,7 +336,40 @@ pub fn async_block(input: TokenStream) -> TokenStream {
syn::tokens::Move(span).to_tokens(tokens);
syn::tokens::OrOr([span, span]).to_tokens(tokens);
syn::tokens::Brace(span).surround(tokens, |tokens| {
(quote! { if false { yield } }).to_tokens(tokens);
(quote! {
if false { yield ::futures::Async::NotReady }
}).to_tokens(tokens);
expr.to_tokens(tokens);
});
});

tokens.into()
}

#[proc_macro]
pub fn async_stream_block(input: TokenStream) -> TokenStream {
let input = TokenStream::from(TokenTree {
kind: TokenNode::Group(Delimiter::Brace, input),
span: Default::default(),
});
let expr = syn::parse(input)
.expect("failed to parse tokens as an expression");
let expr = ExpandAsyncFor.fold_expr(expr);

let mut tokens = quote! {
::futures::__rt::gen_stream
};

// Use some manual token construction here instead of `quote!` to ensure
// that we get the `call_site` span instead of the default span.
let span = syn::Span(Span::call_site());
syn::tokens::Paren(span).surround(&mut tokens, |tokens| {
syn::tokens::Move(span).to_tokens(tokens);
syn::tokens::OrOr([span, span]).to_tokens(tokens);
syn::tokens::Brace(span).surround(tokens, |tokens| {
(quote! {
if false { yield ::futures::Async::NotReady }
}).to_tokens(tokens);
expr.to_tokens(tokens);
});
});
Expand Down Expand Up @@ -311,7 +412,7 @@ impl Folder for ExpandAsyncFor {
}
}
futures_await::Async::NotReady => {
yield;
yield futures_await::Async::NotReady;
continue
}
}
Expand Down Expand Up @@ -362,3 +463,40 @@ fn replace_bang(input: proc_macro2::TokenStream, tokens: &ToTokens)
}
new_tokens.into()
}

fn replace_bangs(input: proc_macro2::TokenStream, replacements: &[&ToTokens])
-> proc_macro2::TokenStream
{
let mut replacements = replacements.iter().cycle();
let mut new_tokens = Tokens::new();
for token in input.into_iter() {
match token.kind {
proc_macro2::TokenNode::Op('!', _) => {
replacements.next().unwrap().to_tokens(&mut new_tokens);
}
_ => token.to_tokens(&mut new_tokens),
}
}
new_tokens.into()
}

struct AsyncStreamArg(syn::Ident, Option<syn::Ty>);

impl synom::Synom for AsyncStreamArg {
named!(parse -> Self, do_parse!(
i: syn!(syn::Ident) >>
p: option!(do_parse!(
syn!(syn::tokens::Eq) >>
p: syn!(syn::Ty) >>
(p))) >>
(AsyncStreamArg(i, p))));
}

struct AsyncStreamArgs(Vec<AsyncStreamArg>);

impl synom::Synom for AsyncStreamArgs {
named!(parse -> Self, map!(
option!(parens!(call!(Delimited::<AsyncStreamArg, syn::tokens::Comma>::parse_separated_nonempty))),
|p| AsyncStreamArgs(p.map(|d| d.0.into_vec()).unwrap_or_default())
));
}
13 changes: 12 additions & 1 deletion futures-await-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,18 @@ macro_rules! await {
break ::futures::__rt::Err(e)
}
}
yield
yield ::futures::Async::NotReady
}
})
}

// TODO: This macro needs to use an extra temporary variable because of
// rust-lang/rust#44197, once that's fixed this should just use $e directly
// inside the yield expression
#[macro_export]
macro_rules! stream_yield {
($e:expr) => ({
let e = $e;
yield ::futures::Async::Ready(e)
})
}
Loading

0 comments on commit c35e035

Please sign in to comment.