Lately, I've been working on several real-world systems using Rust's async and tokio. As you can see on the areweasyncyet.rs site, this requires using nightly Rust and the experimental tokio-async-await library. I hope to talk more about these experiences soon!

But today, I want to talk about channel APIs in Rust. A question was raised by @matklad on GitHub:

I've migrated rust-analyzer to crossbeam-channel 0.3, and the thing I've noticed is that every .send is followed by .unwrap. Perhaps we should make this unwrapping behavior the default, and introduce a separate checked_send which returns a Result?

BurntSushi followed up on Reddit:

Because the vast majority of uses of send are like this: ch.send(foo).unwrap(). That is, you panic because you generally regard it as a bug if you're still sending values when all receivers have been dropped. Why? Because this is generally a property of the program's organization.

I hesitate to disagree with two such excellent developers, but my experiences with this issue are almost the exact opposite of matklad's and BurntSushi's.

How I've been using channels in Rust

My employer has generously agreed to open source two pieces of production Rust code using tokio and channels, which I'll use as examples. Please keep in mind that these channels are all using tokio::sync::mpsc channels, and so my experiences don't necessarily directly to std::sync::mpsc or crossbeam::channel.

  1. A fork of rust-amqp using tokio. AMQP is an excellent fit for tokio::codec, because it treats the sending and receiving half of the socket as streams, and neither half should block the other. We've been running this code in production for almost a year now, and it has worked absolutely flawlessly.
  2. A pre-release tool that uses tokio streams to move data between PostgreSQL and BigQuery (both directions). This makes heavy use of async and await!.

In this environment, I've learned a hard and bitter lesson: If I write ch.send(...).unwrap() in async code, it's almost certainly a bug. If it hasn't crashed yet, I've just gotten lucky.

This doesn't match the experiences of very smart people like matklad and BurntSushi, and I want to get to the bottom of this inconsistency. But first, I want to show that I'm not the only one.

The experience of the Servo team

There's a good discussion of the Servo experience in which asajeffrey writes:

My feeling is that a lot of code is written using unwrap() on the basis of maintaining some invariant of the program, and normally we got that right during steady state, but not always during startup, and quite often not during shutdown. Servo still panics a lot during shutdown, due to code that assumes the other end of a channel is still alive, but the shutdown order ended up being an unanticipated one. Sigh.

I feel like if the Servo developers can't reliably get ch.send(...).unwrap() to work without panicking, then we should think carefully before making panicking the default. But let's dig deeper, and try to figure out why.

Reason #1: Channels, pipes, sockets and files

In Rust, we can create a channel as follows:

let (sender, receiver): (Sender<BytesMut>, Receiver<BytesMut>) =
    mpsc::channel(1);

Here, we're using a bounded queue of length 1 (because queues don't fix overload), and each message is a BytesMut. This is basically the same as an array of bytes (a Vec<u8> in Rust), except it's easy to subdivide it.

We could then write:

sender.send(bytes)

...after which we could receive these bytes using receiver. If the receiver is busy, then sender will block as soon as the 1-item queue is full. This is basically equivalent to a Unix shell pipeline:

gunzip book.gz | grep Hamlet | less

Here, gunzip book.gz decompresses book.gz and outputs it as a stream. grep Hamlet find lines containing "Hamlet", and less displays the lines using a pager.

What happens if we look at only the first page of output, and then we quit less? Well, that shuts down the pipe that grep is trying to write to, and so grep will receive a SIGPIPE. Depending how grep is written, it might instead receive an EPIPE when it calls write. This will cause grep to fail, which in turn shuts down the output pipeline of gunzip using another SIGPIPE or EPIPE.

When the user quits less, the SIGPIPE or EPIPE propagates backwards through the entire pipeline, shutting down each program.

In tokio, we might see something similar. In this example adapted from the tokio fork of rust-amqp we mentioned earlier, we use forward to pipe stream into sender:

let (read_sender, read_receiver) = mpsc::channel(0);

// Copy inbound frames from `stream` to `read_sender`.
let reader = stream
    .forward(read_sender)
    .map_err(|e| { error!("reader failed: {}", e); e })
    .map(|(_stream, _sink)| { trace!("reader done") });

This is roughly equivalent to:

stream | read_sender | read_receiver

If read_receiver fails, the error will propagate all the way back to stream.

For a more exotic example, this tool builds tokio streams analogous to:

read_from_bigquery |
    convert_csv_to_pg_binary |
    write_to_postgres

If we're working with sockets, and the remote server drops the connection, then we might receive ECONNRESET. If we're working with a file descriptor, and something goes wrong, then we might receive EIO.

Every place we can write to a pipe, socket or file descriptor in Unix, we have to assume that this write operation could fail. So if we want to use a Sender<BytesMut> to represent a stream of data, then we need to figure out what we'll do if the underlying OS returns EPIPE, ECONNRESET or EIO.

Unix pipelines are essentially async co-routines running over data streams, with full backpressure. And they've probably the most popular and successful streaming data architecture in the world.

Reason #2: In the presence of cancelable futures, any receiver can disappear

The following asynchronous Rust function takes a synchronous function f and runs it on a background thread. The send function below can panic. Can you see what I did wrong?

/// Run a synchronous function `f` in a background worker thread and return its
/// value.
pub(crate) async fn run_sync_fn_in_background<F, T>(f: F) -> Result<T>
where
    F: (FnOnce() -> Result<T>) + Send + 'static,
    T: Send + 'static,
{
    let (sender, receiver) = mpsc::channel(1);
    let handle = thread::spawn(move || {
        sender
            .send(f())
            .wait()
            .expect("should always be able to send results from background thread");
    });;

    // Wait for our worker to report its results.
    let background_result = await!(receiver.into_future());
    let result = match background_result {
        // The background thread sent an `Ok`.
        Ok((Some(Ok(value)), _receiver)) => Ok(value),
        // The background thread sent an `Err`.
        Ok((Some(Err(err)), _receiver)) => Err(err),
        Ok((None, _receiver)) => {
            unreachable!("background thread did not send any results");
        }
        Err(_) => Err(format_err!("background thread panicked")),
    };
    handle.join().expect("background worker thread panicked");
    result
}

My code above is an async function. This means that it's run as a std::future::Future. And Rust futures are cancelable, simply by writing drop(future). Or we could use a tokio timeout:

// Wait 5 seconds for future.
await!(future.timeout(Duration::from_secs(5)))?;

If this timeout occurs, it may kill an entire branching tree of futures. Somewhere in this tree of futures, there may be a call to run_sync_fn_in_background. And then the entire program will panic.

Counter-argument: What about control channels?

On Reddit, wmanley suggested an extremely clever idiom:

Instead of writing stdout.send() for writing to your stdout you replace it with:

select!{
    send(stdout, msg) { Ok() }
    recv(ctrl_in) -> msg {
       // No messages arrive here, only EOF
        assert!(msg == TryRecvErr);
        msg
    }
}?

You signal that the threads should stop by closing the ctrl_in channel.

Unfortunately, this suffers from a couple of limitations, at least in the world of tokio:

  1. We can no longer use future.timeout(...) on any future containing the receiving end of this channel, because timeout won't know that it needs to drop the control channel.
  2. If we have an elaborate, Unix-style pipeline, then every data sender anywhere in the pipeline needs to honor ctrl_in. And the pipeline management code needs to shut down multiple control channels in a very specific order to cancel a pipeline without panicking.

I'm pretty sure that I'm not smart enough to manage ctrl_in channels well enough to always clean up every sender before every receiver.

What to do with awkward errors?

Sometimes, when a send(...) fails, it's not clear what to do with it. One solution that often works well is to log it using error!. But we might also try passing around a Context struct like:

/// Context shared by our various asynchronous operations.
#[derive(Debug, Clone)]
pub struct Context {
    /// The logger to use for code in this context.
    log: Logger,
    /// To report asynchronous errors anywhere in the
    /// application, send them to this channel.
    error_sender: mpsc::Sender<Error>,
}

If we encounter an error that we're tempted to unwrap() (or to silently discard), we could always just send it to error_sender, and have a top-level thread report it to the user.

This is somewhat easier than the ctrl_in example above, because most code has the option of ignoring it completely. Typical send errors will propagate back out through their callers like other Rust errors, and we'll only need error_sender at the very top of our call chain.

Summary

In a nutshell, here's my argument:

  1. In a pipeline architecture, EPIPE and ECONNRESET both naturally map to a send(...) error. And pipeline architectures are popular, long-established, and easy to get right.
  2. When the receiving end of channel is held by a future, it can be unceremoniously blown away by any drop or timeout. Avoiding this requires a profound level of vigilance, similar to flawless use of malloc and free. In a world with async receivers, send can inevitably fail.
  3. Not even the Servo developers can tear down channels without hitting lots of panics.

This is why I would be concerned about any ch.send(...).unwrap() that I saw in a code review. It might be correct in synchronous Rust, but once futures are involved, it's just asking for trouble.

But maybe I've been looking at this all wrong. Some very smart people seem to write ch.send(...).unwrap() without any problems. What do they know that I don't?