I've been using the proposed await! and Future features in nightly Rust, and overall, I really like the design. But I did run into one surprise: await! may never return, and this has consequences I didn't fully understand. Let's take a look.

We're going to use Rust nightly-2019-02-08, and tokio-async-await. This is highly experimental code, and it will require us to convert back and forth between tokio::Future and the proposed std::future::Future.

You can find the full code on GitHub. We'll start by enabling the experimental features we'll need:

#![feature(await_macro, async_await, futures_api)]

#[macro_use]
extern crate tokio_async_await;

Then we'll import some libraries, and declare two helper functions tokio_fut and boxed_fut, that make it easy to convert from std::future::Future into tokio::Future and into Box<tokio::Future<..>>, respectively. You can look that code up on GitHub.

Next, we define a function delay, which returns a Future that waits for the specified number of milliseconds:

fn delay(millis: u64) -> Delay {
    Delay::new(
        Instant::now() + Duration::from_millis(millis),
    )
}

Canceling a Future

Now, we can define two tasks:

/// An asynchronous function that completes quickly.
async fn quick_task() -> Result<&'static str> {
    println!("START quick_task");
    await!(delay(10)).context("delay failed")?;
    println!("END quick_task");
    Ok("quick_task result")
}

/// An asynchronous function that completes very slowly.
async fn slow_task() -> Result<&'static str> {
    println!("START slow_task");
    await!(delay(10_000)).context("delay failed")?;
    println!("END slow_task");
    Ok("slow_task result")
}

Here, quick_task waits for 10 milliseconds, and slow_task waits for 10,000 milliseconds. We can combine them using select_all:

/// Run our tasks in parallel, waiting for the first to
/// complete.
async fn wait_for_first_task() -> Result<()> {
    let all_futures = vec![
        boxed_fut(quick_task()),
        boxed_fut(slow_task()),
    ];
    let (result, _idx, others) = await!(
        future::select_all(all_futures)
    )
    .map_err(|(err, idx, others)| {
        println!("A future failed, so throw the rest away");
        drop(others);
        format_err!("task {} failed: {}", idx, err)
    })?;
    println!("Result from the first task: {}", result);
    println!("Dropping unneeded computations");
    drop(others);
    Ok(())
}

The drop calls above are unnecessary—the compiler would be happy to insert them—but I included them for clarity.

When we run this, we will see:

START quick_task
START slow_task
END quick_task
Result from the first task: quick_task result
Dropping unneeded computations

Here, slow_task called await!, but await! never returned! It feels almost like an exception, but what really happened is that quick_task and slow_task are coroutines, and they only run if somebody calls await! on them. (Rust Futures have "pull" semantics, not push semantics.) This means you can cancel a future by letting it drop.

So when we called select_all and drop, we took the value of the first future to complete, and dropped the rest.

On the one hand, this is great, because it means that Rust Futures are always cancelable. On the other hand, it has surprising consequences: We never print END slow_task.

Can we clean up resources when a Future is canceled?

Yes, but we need to use RAII:

/// A structure which prints `msg` on `drop`.
struct Cleanup {
    msg: &'static str,
}

impl Drop for Cleanup {
    fn drop(&mut self) {
        println!("{}", self.msg)
    }
}

/// An asynchronous function that cleans up after itself.
async fn protected_slow_task() -> Result<&'static str> {
    println!("START protected_slow_task");
    let _cleanup = Cleanup {
        msg: "CLEANUP protected_slow_task",
    };
    await!(delay(10_000)).context("delay failed")?;
    println!("END protected_slow_task");
    Ok("protected_slow_task result")
}

...and add it to all_futures:

let all_futures = vec![
    boxed_fut(quick_task()),
    boxed_fut(slow_task()),
    boxed_fut(protected_slow_task()),
];

When run, we'll see:

START quick_task
START slow_task
START protected_slow_task
END quick_task
Result from the first task: quick_task result
Dropping unneeded computations
CLEANUP protected_slow_task

We still don't see END protected_slow_task, but we do see our CLEANUP. So that's nice. Or at least I could probably learn to live with it.

What does this mean for sending to channels?

In the recent discussion of Rust channel semantics (here, here, here and here), I've argued that we need to look at how the proposed await! and std::future::Future interact with channels.

Basically, imagine we have an asynchronous function that holds the receive end of a channel. For example, this function copies a Stream (which might be the receiving end of a channel) into an AsyncWrite:

/// Given a `Stream` of data chunks of type `BytesMut`, write
/// the entire stream to an `AsyncWrite` implementation.
pub(crate) async fn copy_stream_to_writer<S, W>(
    ctx: Context,
    mut stream: S,
    mut wtr: W,
) -> Result<()>
where
    S: Stream<Item = BytesMut, Error = Error> + 'static,
    W: AsyncWrite + 'static,
{
    loop {
        match await!(stream.into_future()) {
            Err((err, _rest_of_stream)) => {
                error!(ctx.log(), "error reading stream: {}", err);
                return Err(err);
            }
            Ok((None, _rest_of_stream)) => {
                trace!(ctx.log(), "end of stream");
                return Ok(());
            }
            Ok((Some(bytes), rest_of_stream)) => {
                stream = rest_of_stream;
                trace!(ctx.log(), "writing {} bytes", bytes.len());
                await!(io::write_all(&mut wtr, bytes)).map_err(|e| {
                    error!(ctx.log(), "write error: {}", e);
                    format_err!("error writing data: {}", e)
                })?;
            }
        }
    }
}

But what happens if io::write_all returns an error? Well, we exit, which will drop stream. If stream is actually a channel, this will close the receiving end of the channel. In this case, we could add code to notify the sending end of the channel (if stream is actually a channel), and clean everything up. This seems like a lot of extra work for no real gain, but Go programmers apparently live with it every day, and it could be done.

But now let's consider a trickier case. What if copy_stream_to_writer were wrapped inside a select_all, and it lost out to a faster future? Or what if somebody wrapped the future in a timeout?

let fut = copy_stream_to_writer(...)
    .timeout(Duration::from_secs(5));
await!(fut)?;

In this case, the await!(stream.into_future()) expression in copy_stream_to_writer will never return. And we'll drop stream without any easy way to notify anybody about that.

So far, I'm cool with all this. Sometimes, the receiving end of a channel just silently disappears, because a timeout or a select_all or a drop caused an await! to never return. It's not too hard to program in this world—all I need to do is accept that channel.send(...) can fail without explanation, and then everything works nicely.

But while I was thinking about this, a comment by ben0x539 made me realize there's an uglier side to this.

What does this mean for receiving from channels?

With typical Rust mpsc channels, if all the senders are dropped, then the receiver will see the channel return an Ok(None). If all the senders were dropped normally, this is exactly what we want. If the senders were closed because of a normal error, then we could use something like SyncStreamWriter's send_error to add that error at the end of our stream:

let result: Result<()> = try {
    let conn = connect(&url)?;
    let stmt = conn.prepare(&sql)?;
    stmt.copy_out(&[], &mut wtr)?;
};

// Report any errors to our stream.
if let Err(err) = result {
    error!(ctx.log(), "error reading from PostgreSQL: {}", err);
    if wtr.send_error(err).is_err() {
        error!(ctx.log(), "cannot report error to foreground thread");
    }
}

But what if our sender were inside an async function that got canceled, and that never returned from an await!? Well, in this case, the channel will close normally, and the receiver will never realize that the sender failed early.

This bothers me lot more than send sometimes returning an error. I'll probably need to go audit a couple of pieces of code very carefully.

When I started thinking about all of this, I was absolutely convinced that the interactions between await!, mpsc channels and Futures were surprisingly good. Sure, I had to accept than any send might fail, but that was easy enough to deal with.

But now I want to think some more about await!, and the fact that it might never return. I think this is a natural consequence of many other good things, and that it makes sense in terms of co-routines. But how does it interact with resource cleanup and channel shutdown?

Discuss on Reddit.