Should Rust channels panic on send if nobody's listening?
Lately, I’ve been working on several real-world systems using Rust’s async
and tokio
. As you can see on the areweasyncyet.rs site, this requires using nightly Rust and the experimental tokio-async-await library. I hope to talk more about these experiences soon!
But today, I want to talk about channel APIs in Rust. A question was raised by @matklad on GitHub:
I’ve migrated rust-analyzer to crossbeam-channel 0.3, and the thing I’ve noticed is that every
.send
is followed by.unwrap
. Perhaps we should make this unwrapping behavior the default, and introduce a separatechecked_send
which returns a Result?
BurntSushi followed up on Reddit:
Because the vast majority of uses of
send
are like this:ch.send(foo).unwrap()
. That is, you panic because you generally regard it as a bug if you’re still sending values when all receivers have been dropped. Why? Because this is generally a property of the program’s organization.
I hesitate to disagree with two such excellent developers, but my experiences with this issue are almost the exact opposite of matklad’s and BurntSushi’s.
How I’ve been using channels in Rust
My employer has generously agreed to open source two pieces of production Rust code using tokio
and channels, which I’ll use as examples. Please keep in mind that these channels are all using tokio::sync::mpsc
channels, and so my experiences don’t necessarily directly to std::sync::mpsc
or crossbeam::channel
.
- A fork of
rust-amqp
usingtokio
. AMQP is an excellent fit fortokio::codec
, because it treats the sending and receiving half of the socket as streams, and neither half should block the other. We’ve been running this code in production for almost a year now, and it has worked absolutely flawlessly. - A pre-release tool that uses
tokio
streams to move data between PostgreSQL and BigQuery (both directions). This makes heavy use ofasync
andawait!
.
In this environment, I’ve learned a hard and bitter lesson: If I write ch.send(...).unwrap()
in async code, it’s almost certainly a bug. If it hasn’t crashed yet, I’ve just gotten lucky.
This doesn’t match the experiences of very smart people like matklad and BurntSushi, and I want to get to the bottom of this inconsistency. But first, I want to show that I’m not the only one.
The experience of the Servo team
There’s a good discussion of the Servo experience in which asajeffrey writes:
My feeling is that a lot of code is written using
unwrap()
on the basis of maintaining some invariant of the program, and normally we got that right during steady state, but not always during startup, and quite often not during shutdown. Servo still panics a lot during shutdown, due to code that assumes the other end of a channel is still alive, but the shutdown order ended up being an unanticipated one. Sigh.
I feel like if the Servo developers can’t reliably get ch.send(...).unwrap()
to work without panicking, then we should think carefully before making panicking the default. But let’s dig deeper, and try to figure out why.
Reason #1: Channels, pipes, sockets and files
In Rust, we can create a channel as follows:
let (sender, receiver): (Sender<BytesMut>, Receiver<BytesMut>) =
mpsc::channel(1);
Here, we’re using a bounded queue of length 1 (because queues don’t fix overload), and each message is a BytesMut
. This is basically the same as an array of bytes (a Vec<u8>
in Rust), except it’s easy to subdivide it.
We could then write:
sender.send(bytes)
…after which we could receive these bytes using receiver
. If the receiver is busy, then sender will block as soon as the 1-item queue is full. This is basically equivalent to a Unix shell pipeline:
gunzip book.gz | grep Hamlet | less
Here, gunzip book.gz
decompresses book.gz
and outputs it as a stream. grep Hamlet
find lines containing “Hamlet”, and less
displays the lines using a pager.
What happens if we look at only the first page of output, and then we quit less
? Well, that shuts down the pipe that grep
is trying to write to, and so grep
will receive a SIGPIPE
. Depending how grep
is written, it might instead receive an EPIPE
when it calls write
. This will cause grep
to fail, which in turn shuts down the output pipeline of gunzip
using another SIGPIPE
or EPIPE
.
When the user quits less
, the SIGPIPE
or EPIPE
propagates backwards through the entire pipeline, shutting down each program.
In tokio
, we might see something similar. In this example adapted from the tokio
fork of rust-amqp
we mentioned earlier, we use forward
to pipe stream
into sender
:
let (read_sender, read_receiver) = mpsc::channel(0);
// Copy inbound frames from `stream` to `read_sender`.
let reader = stream
.forward(read_sender)
.map_err(|e| { error!("reader failed: {}", e); e })
.map(|(_stream, _sink)| { trace!("reader done") });
This is roughly equivalent to:
stream | read_sender | read_receiver
If read_receiver
fails, the error will propagate all the way back to stream
.
For a more exotic example, this tool builds tokio
streams analogous to:
read_from_bigquery |
convert_csv_to_pg_binary |
write_to_postgres
If we’re working with sockets, and the remote server drops the connection, then we might receive ECONNRESET
. If we’re working with a file descriptor, and something goes wrong, then we might receive EIO
.
Every place we can write to a pipe, socket or file descriptor in Unix, we have to assume that this write operation could fail. So if we want to use a Sender<BytesMut>
to represent a stream of data, then we need to figure out what we’ll do if the underlying OS returns EPIPE
, ECONNRESET
or EIO
.
Unix pipelines are essentially async co-routines running over data streams, with full backpressure. And they’ve probably the most popular and successful streaming data architecture in the world.
Reason #2: In the presence of cancelable futures, any receiver can disappear
The following asynchronous Rust function takes a synchronous function f
and runs it on a background thread. The send
function below can panic. Can you see what I did wrong?
/// Run a synchronous function `f` in a background worker thread and return its
/// value.
pub(crate) async fn run_sync_fn_in_background<F, T>(f: F) -> Result<T>
where
F: (FnOnce() -> Result<T>) + Send + 'static,
T: Send + 'static,
{
let (sender, receiver) = mpsc::channel(1);
let handle = thread::spawn(move || {
sender
.send(f())
.wait()
.expect("should always be able to send results from background thread");
});;
// Wait for our worker to report its results.
let background_result = await!(receiver.into_future());
let result = match background_result {
// The background thread sent an `Ok`.
Ok((Some(Ok(value)), _receiver)) => Ok(value),
// The background thread sent an `Err`.
Ok((Some(Err(err)), _receiver)) => Err(err),
Ok((None, _receiver)) => {
unreachable!("background thread did not send any results");
}
Err(_) => Err(format_err!("background thread panicked")),
};
handle.join().expect("background worker thread panicked");
result
}
My code above is an async
function. This means that it’s run as a std::future::Future
. And Rust futures are cancelable, simply by writing drop(future)
. Or we could use a tokio
timeout:
// Wait 5 seconds for future.
await!(future.timeout(Duration::from_secs(5)))?;
If this timeout occurs, it may kill an entire branching tree of futures. Somewhere in this tree of futures, there may be a call to run_sync_fn_in_background
. And then the entire program will panic.
Counter-argument: What about control channels?
On Reddit, wmanley suggested an extremely clever idiom:
Instead of writing
stdout.send()
for writing to your stdout you replace it with:select!{ send(stdout, msg) { Ok() } recv(ctrl_in) -> msg { // No messages arrive here, only EOF assert!(msg == TryRecvErr); msg } }?
You signal that the threads should stop by closing the
ctrl_in
channel.
Unfortunately, this suffers from a couple of limitations, at least in the world of tokio
:
- We can no longer use
future.timeout(...)
on any future containing the receiving end of this channel, becausetimeout
won’t know that it needs to drop the control channel. - If we have an elaborate, Unix-style pipeline, then every data sender anywhere in the pipeline needs to honor
ctrl_in
. And the pipeline management code needs to shut down multiple control channels in a very specific order to cancel a pipeline without panicking.
I’m pretty sure that I’m not smart enough to manage ctrl_in
channels well enough to always clean up every sender before every receiver.
What to do with awkward errors?
Sometimes, when a send(...)
fails, it’s not clear what to do with it. One solution that often works well is to log it using error!
. But we might also try passing around a Context
struct like:
/// Context shared by our various asynchronous operations.
#[derive(Debug, Clone)]
pub struct Context {
/// The logger to use for code in this context.
log: Logger,
/// To report asynchronous errors anywhere in the
/// application, send them to this channel.
error_sender: mpsc::Sender<Error>,
}
If we encounter an error that we’re tempted to unwrap()
(or to silently discard), we could always just send it to error_sender
, and have a top-level thread report it to the user.
This is somewhat easier than the ctrl_in
example above, because most code has the option of ignoring it completely. Typical send
errors will propagate back out through their callers like other Rust errors, and we’ll only need error_sender
at the very top of our call chain.
Summary
In a nutshell, here’s my argument:
- In a pipeline architecture,
EPIPE
andECONNRESET
both naturally map to asend(...)
error. And pipeline architectures are popular, long-established, and easy to get right. - When the receiving end of channel is held by a future, it can be unceremoniously blown away by any
drop
ortimeout
. Avoiding this requires a profound level of vigilance, similar to flawless use ofmalloc
andfree
. In a world with async receivers,send
can inevitably fail. - Not even the Servo developers can tear down channels without hitting lots of panics.
This is why I would be concerned about any ch.send(...).unwrap()
that I saw in a code review. It might be correct in synchronous Rust, but once futures are involved, it’s just asking for trouble.
But maybe I’ve been looking at this all wrong. Some very smart people seem to write ch.send(...).unwrap()
without any problems. What do they know that I don’t?
Want to contact me about this article? Or if you're looking for something else to read, here's a list of popular posts.