Rust: future cannot be sent between threads safely
Hello, dear readers,
Couple of days ago I stumbled upon compilation error that took me a while to understand and fix. The error was "future cannot be sent between threads safely", however I didn't want to send any future anywhere. Let's start with a small example that has this compilation error and how we can get it fixed. I'm going to use tokio runtime for asynchronous programming.
At first I want to provide a very simplified version of a method that should run as a background task:
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
pub type SharedState = Arc<Mutex<Vec<usize>>>;
async fn background_task(
state: SharedState,
token: CancellationToken,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Starting background_task");
const SLEEP_TIME_MS: u64 = 100;
let mut loops: usize = 0;
loop {
// Check whether we need to stop this infinite loop
if token.is_cancelled() {
break;
}
// Get lock to write to vector thread-safely
let mut old_locked_state = state.lock().unwrap();
old_locked_state.push(loops);
sleep(Duration::from_millis(SLEEP_TIME_MS)).await;
loops += 1;
}
println!("Stopped background_task, number of loops {loops}");
Ok(())
}
Method background_task
keeps looping and modifying Vec<usize>
until it receives cancellation signal that exits the loop. The data passed as state
argument is read from another thread, this forces us to wrap it with:
std::sync::Mutex - a mutual exclusion primitive useful for protecting shared data;
std::sync::Arc - a thread-safe reference-counting pointer. We wrap the Mutex into Arc because Mutex must have multiple owners, check Shared-State Concurrency from the book to understand it better.
We introduce type alias SharedState
to shorten type definition and improve clarity.
Let's define main method that spawns background_task
as async task
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
const WAIT_TIME_MS: u64 = 100;
// Cancellation token to signal background_task to stop
let token = CancellationToken::new();
// Create thread-safe state
let state: SharedState = SharedState::new(Mutex::new(Vec::new()));
// We need to clone to pass it later in spawn
let s0 = state.clone();
let t0 = token.clone();
let handle = tokio::spawn(async move { background_task(s0, t0).await.unwrap() });
println!("Created background_task handle is {:?}", handle);
// Print 20 times the state of shared state
for i in 1..=20 {
println!("Shared state at {i} is {:?}", state.lock().unwrap());
sleep(Duration::from_millis(WAIT_TIME_MS)).await;
}
// Cancel background_task
println!("Cancelling background_task...");
token.cancel();
// Sleep a bit
sleep(Duration::from_millis(1000)).await;
// Did it finish successfully?
handle.await.unwrap();
println!("Done");
Ok(())
}
When we run it, we get compilation error main.rs
Compiling playground v0.0.1 (/playground)
error: future cannot be sent between threads safely
--> src/main.rs:43:18
|
43 | let handle = tokio::spawn(async move { background_task(s0, t0).await.unwrap() });
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `{async block@src/main.rs:43:31: 43:84}`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<usize>>`, which is required by `{async block@src/main.rs:43:31: 43:84}: Send`
note: future is not `Send` as this value is used across an await
--> src/main.rs:25:53
|
22 | let mut old_locked_state = state.lock().unwrap();
| -------------------- has type `std::sync::MutexGuard<'_, Vec<usize>>` which is not `Send`
...
25 | sleep(Duration::from_millis(SLEEP_TIME_MS)).await;
| ^^^^^ await occurs here, with `mut old_locked_state` maybe used later
note: required by a bound in `tokio::spawn`
--> /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/spawn.rs:166:21
|
164 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
165 | where
166 | F: Future + Send + 'static,
| ^^^^ required by this bound in `spawn`
error: could not compile `playground` (bin "playground") due to 1 previous error
Let's parse the error, specifically this part
the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<usize>>`, which is required by `{async block@src\main.rs:43:31: 43:84}: Send`
note: future is not `Send` as this value is used across an await
As it states, Mutex.lock returns LockResult<MutexGuard<'_, T>>
and MutexGuard explicitly does not implement Send trait (!Send
syntax). But why is this an issue? The reason is how Rust compiler derives whether a future is Send
This is because the compiler currently calculates whether a future is
Send
based on scope information only. The compiler will hopefully be updated to support explicitly dropping it in the future, but for now, you must explicitly use a scope.
With this information we can easily fix our example, here is fixed version main.rs,
async fn background_task(
state: SharedState,
token: CancellationToken,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Starting background_task");
const SLEEP_TIME_MS: u64 = 100;
let mut loops: usize = 0;
loop {
// Check whether we need to stop this infinite loop
if token.is_cancelled() {
break;
}
// <--------- Added scope here
{
// Get lock to write to vector thread-safely
let mut old_locked_state = state.lock().unwrap();
old_locked_state.push(loops);
}
sleep(Duration::from_millis(SLEEP_TIME_MS)).await;
loops += 1;
}
println!("Stopped background_task, number of loops {loops}");
Ok(())
}
My run produced the following output
Created background_task handle is JoinHandle { id: Id(3) }
Shared state at 1 is []
Starting background_task
Shared state at 2 is [0]
Shared state at 3 is [0, 1]
Shared state at 4 is [0, 1, 2]
Shared state at 5 is [0, 1, 2, 3, 4]
Shared state at 6 is [0, 1, 2, 3, 4, 5]
Shared state at 7 is [0, 1, 2, 3, 4, 5, 6]
Shared state at 8 is [0, 1, 2, 3, 4, 5, 6, 7]
Shared state at 9 is [0, 1, 2, 3, 4, 5, 6, 7, 8]
Shared state at 10 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Shared state at 11 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Shared state at 12 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Shared state at 13 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Shared state at 14 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
Shared state at 15 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Shared state at 16 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
Shared state at 17 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Shared state at 18 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
Shared state at 19 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
Shared state at 20 is [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
Cancelling background_task...
Stopped background_task, number of loops 21
Done
The issue is well-explained in Tokio tutorial.
Comments and suggestions are welcome! Thank you for your time.