Tokio bug that is not a bug, or: why I think AsyncRead is not well-designed
This post is something that I had drafted up as a GitHub issue. During the process of investigating, I discovered that this is in fact intended behaviour, so I never posted it as an issue. But I am posting it here because I think this behaviour is somewhat surprising.
I was writing some unit tests late at night where I was writing something to a file, and then immediately read the file to make sure it was there. Doesn’t sound too wild. What is quite wild is that the tests were failing some 10% of the time! I was able to reduce this to a small test case that looks like this.
use tokio::fs::{metadata, File};
use tokio::io::AsyncWriteExt;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let dir = tempfile::tempdir().unwrap();
let data = vec![24; 1024];
let path = dir.path().join("file");
let mut file = File::create(&path).await.unwrap();
file.write(&data).await.unwrap();
let metadata = metadata(&path).await.unwrap();
assert_eq!(metadata.len() as usize, data.len());
}
In this test case, I’m creating a file and writing to it. Immediately after
writing to it, I use metadata()
to get some information on the file.
Surprisingly, sometimes the data that is supposed to be written into the file
is not actually there, and the metadata()
returns a file size of zero instead
of the expected 1024 bytes.
thread 'main' panicked at 'assertion failed: `(left == right)`
left: `0`,
right: `1024`', src/main.rs:14:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
This is very surprising. Generally, if I have a future that represents some action, I expect that when it resolves without error that the action has been performed. But here, this is simply not the case.
I saw there is a merged PR for this, https://github.com/tokio-rs/tokio/pull/4316. But it seems that there is still some race condition in the code. Since my application requires that file writes have actually completed when the write functions return for the contract this is a very serious bug for me!
Futures recap #
Before we continue with this wild debugging journey, let’s take a brief step back and recap what a future is, and how they work in Rust. Don’t be scared if you are not familiar with them, this is going to be a very gentle introduction.
Sync code #
With regular (non-async code), computation is said to be blocking. This means that if you call a function, you are handing over control to that function to do it’s thing until it is done, at which point it hands control (as well as a return value) back to you.
fn request(url: &str) -> Result<String>;
Here is an example. Imagine you are calling a request()
function to make a
HTTP request to some server. You call it ❶, it makes the request ❷, waits
for the response ❸, and then returns back to you with the result ❹.
During the time that the request()
function is running, it has control. You
cannot do anything else at the same time, unless you spawn background threads
to do them in. The function is said to block the thread while it is waiting
for a response from the server.
Futures basics #
Futures allow you to do other work while you are waiting for a result. In order to make this happen, they are used differently. Instead of being called, and thus handing control of the thread over, they are polled, which is a fancy way of saying that you ask them if they are done yet, like your kid on a long car ride.
fn request(url: &str) -> Future<Output = Result<String>>;
This is what an interaction might look like:
Here is a waker for you. Future->>Code: Result is Pending. Future->>+Server: GET /hello Server->>-Future: 200 OK Future-->>Code: Poll me again please. Code->>Future: Are you done yet?
Here is a waker for you. Future->>-Code: Result is Ready: Here is the response!
Initially, the future is polled. This starts it in the background. Instead of
taking control of the thread, it immediately returns Poll::Pending
. When the
future is polled, it also gets a handle that it can use to signal that it would
like to be polled again, this is called a Waker
.
In Rust, the API that is used to poll futures looks like this:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
As you can tell, this returns a Poll
type. This type is defined to be an enum that
can either be pending or ready.
pub enum Poll<T> {
Ready(T),
Pending,
}
To summarize this excursion into the world of futures here is what is
important: a future is polled at least once during it’s execution. It might
need to be polled more than once, depending on how it works. But once it
returns Poll::Ready(..)
, it is considered resolved and the action that it
represents is expected to have either been completed or encountered an error.
Strace #
Back to the bug (that is not a bug) that I ran into. In order for me to figure out
what is going on with this strange behaviour, I like to use strace
because it
lets me see exactly what is going on.
I have used strace
to capture a case where this fails, and I’m able to kind
of reconstruct what is going on, here’s what seems to be happening (I can
attach the entire trace later as well if you need it):
Thread 1: Main thread. The mkdir()
is from the tempdir()
crate. The two
clone3()
calls spawn other threads to do stuff. The futex()
calls wait for
stuff from the other threads.
23:46:20.465462504 mkdir("/tmp/.tmpbIzp1b", 0777) = 0
23:46:20.465497229 rt_sigaction(SIGRT_1, {sa_handler=0x7f9a3c2d3540, sa_mask=[], sa_flags=SA_RESTORER|SA_ONSTACK|SA_RESTART|SA_SIGINFO, sa_restorer=0x7f9a3c288f90}, NULL, 8) = 0
23:46:20.465511275 rt_sigprocmask(SIG_UNBLOCK, [RTMIN RT_1], NULL, 8) = 0
23:46:20.465525071 mmap(NULL, 2101248, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a3c049000
23:46:20.465539949 mprotect(0x7f9a3c04a000, 2097152, PROT_READ|PROT_WRITE) = 0
23:46:20.465555739 rt_sigprocmask(SIG_BLOCK, ~[], [], 8) = 0
23:46:20.465569114 clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0x7f9a3c249990, parent_tid=0x7f9a3c249990, exit_signal=0, stack=0x7f9a3c049000, stack_size=0x1ffe80, tls=0x7f9a3c2496c0} => {parent_tid=[1360875]}, 88) = 1360875
23:46:20.465611694 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.465637282 futex(0x5624c9442b00, FUTEX_WAIT_BITSET_PRIVATE, 0, NULL, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.465905274 futex(0x5624c9442d38, FUTEX_WAKE_PRIVATE, 1) = 0
23:46:20.465922296 mmap(NULL, 2101248, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a37dff000
23:46:20.465935781 mprotect(0x7f9a37e00000, 2097152, PROT_READ|PROT_WRITE) = 0
23:46:20.466069642 rt_sigprocmask(SIG_BLOCK, ~[], [], 8) = 0
23:46:20.466084160 clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0x7f9a37fff990, parent_tid=0x7f9a37fff990, exit_signal=0, stack=0x7f9a37dff000, stack_size=0x1ffe80, tls=0x7f9a37fff6c0} => {parent_tid=[1360876]}, 88) = 1360876
23:46:20.466137129 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.466153700 futex(0x5624c9442c90, FUTEX_WAKE_PRIVATE, 1) = 1
23:46:20.466171944 futex(0x5624c9442b00, FUTEX_WAIT_BITSET_PRIVATE, 1, NULL, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.466402036 write(1, "Metadata { file_type: FileType(F"..., 358) = 358
23:46:20.466419328 write(1, "len is: 0 but supposed to be 102"..., 34) = 34
23:46:20.466435408 write(2, "thread '", 8) = 8
23:46:20.466449264 write(2, "main", 4) = 4
23:46:20.466462649 write(2, "' panicked at '", 15) = 15
23:46:20.466475463 write(2, "assertion failed: `(left == righ"..., 63) = 63
Thread 2: Executes the write()
syscall.
23:46:20.465617795 rseq(0x7f9a3c249fe0, 0x20, 0, 0x53053053) = 0
23:46:20.465641750 set_robust_list(0x7f9a3c2499a0, 24) = 0
23:46:20.465655245 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.465669732 sigaltstack(NULL, {ss_sp=NULL, ss_flags=SS_DISABLE, ss_size=0}) = 0
23:46:20.465683859 mmap(NULL, 12288, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a3c469000
23:46:20.465699588 mprotect(0x7f9a3c469000, 4096, PROT_NONE) = 0
23:46:20.465715518 sigaltstack({ss_sp=0x7f9a3c46a000, ss_flags=0, ss_size=8192}, NULL) = 0
23:46:20.465730336 prctl(PR_SET_NAME, "tokio-runtime-w"...) = 0
23:46:20.465745344 mmap(NULL, 134217728, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_NORESERVE, -1, 0) = 0x7f9a34049000
23:46:20.465760172 munmap(0x7f9a34049000, 66809856) = 0
23:46:20.465778967 munmap(0x7f9a3c000000, 299008) = 0
23:46:20.465794727 mprotect(0x7f9a38000000, 135168, PROT_READ|PROT_WRITE) = 0
23:46:20.465811819 sched_getaffinity(1360875, 32, [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]) = 8
23:46:20.465831285 getrandom("\x79\x15\x06\x09\x96\xca\x83\x44\x90\xd2\x09\xa6\x8f\x03\x20\xde", 16, GRND_INSECURE) = 16
23:46:20.465850892 openat(AT_FDCWD, "/tmp/.tmpbIzp1b/file", O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666) = 3
23:46:20.465886990 futex(0x5624c9442b00, FUTEX_WAKE_PRIVATE, 1) = 1
23:46:20.465908881 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 0, {tv_sec=612048, tv_nsec=590130682}, FUTEX_BITSET_MATCH_ANY) = -1 EAGAIN (Resource temporarily unavailable)
23:46:20.465926524 futex(0x5624c9442c90, FUTEX_WAIT_BITSET_PRIVATE, 2, NULL, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.466193254 futex(0x5624c9442c90, FUTEX_WAKE_PRIVATE, 1) = 0
23:46:20.466209785 write(3, "\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30"..., 1024) = 1024
23:46:20.466397417 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=612048, tv_nsec=590618627}, FUTEX_BITSET_MATCH_ANY) = 0
Thread 3: Spawned to call metadata()
on the file (using the statx()
syscall.
23:46:20.466127972 rseq(0x7f9a37ffffe0, 0x20, 0, 0x53053053) = 0
23:46:20.466149913 set_robust_list(0x7f9a37fff9a0, 24) = 0
23:46:20.466167426 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.466182213 sigaltstack(NULL, {ss_sp=NULL, ss_flags=SS_DISABLE, ss_size=0}) = 0
23:46:20.466197302 mmap(NULL, 12288, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a3c466000
23:46:20.466214765 mprotect(0x7f9a3c466000, 4096, PROT_NONE) = 0
23:46:20.466230564 sigaltstack({ss_sp=0x7f9a3c467000, ss_flags=0, ss_size=8192}, NULL) = 0
23:46:20.466244891 prctl(PR_SET_NAME, "tokio-runtime-w"...) = 0
23:46:20.466259118 mmap(NULL, 134217728, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_NORESERVE, -1, 0) = 0x7f9a2fdff000
23:46:20.466272653 munmap(0x7f9a2fdff000, 2101248) = 0
23:46:20.466288643 munmap(0x7f9a34000000, 65007616) = 0
23:46:20.466303551 mprotect(0x7f9a30000000, 135168, PROT_READ|PROT_WRITE) = 0
23:46:20.466320773 sched_getaffinity(1360876, 32, [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]) = 8
23:46:20.466337966 getrandom("\x12\x16\x37\x40\xbd\xd7\x9a\x33\xc1\xb8\x1d\xbb\x2d\xb3\xa2\xce", 16, GRND_INSECURE) = 16
23:46:20.466355218 statx(AT_FDCWD, "/tmp/.tmpbIzp1b/file", AT_STATX_SYNC_AS_STAT, STATX_ALL, {stx_mask=STATX_ALL|STATX_MNT_ID, stx_attributes=0, stx_mode=S_IFREG|0644, stx_size=0, ...}) = 0
23:46:20.466379063 futex(0x5624c9442b00, FUTEX_WAKE_PRIVATE, 1) = 1
23:46:20.466392678 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=612048, tv_nsec=590617816}, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.466870595 futex(0x5624c9442d38, FUTEX_WAKE_PRIVATE, 1) = 0
23:46:20.466885833 futex(0x5624c9443330, FUTEX_WAKE_PRIVATE, 1) = 1
What we can observe here is that the statx()
syscall is called before the write completes. It is a bit tricky to make sense of this, but the write happens in thead 2:
23:46:20.466209785 write(3, "\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30"..., 1024) = 1024
23:46:20.466397417 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=612048, tv_nsec=590618627}, FUTEX_BITSET_MATCH_ANY) = 0
The times indicated before the syscalls are always timestamps of when the syscall was issues. This means that we know that the write()
syscall was issued at 23:46:20.466209785
and completed by 23:46:20.466397417
, because that is when the next syscall was issued.
At the same time, we can see that the statx()
syscall was issued while the write was still in progress:
23:46:20.466355218 statx(AT_FDCWD, "/tmp/.tmpbIzp1b/file", AT_STATX_SYNC_AS_STAT, STATX_ALL, {stx_mask=STATX_ALL|STATX_MNT_ID, stx_attributes=0, stx_mode=S_IFREG|0644, stx_size=0, ...}) = 0
This was issued at 23:46:20.466355218
, while the write only finished at 23:46:20.466397417
. Obviously, then the statx()
syscall cannot see the write that was still in progress and it indicated that the file is empty.
Investigation #
Okay, next question: why is this behaviour? I dug into the Tokio source code to
find out how write
works, and to see if I can spot where the issue might
arise.
Looking through the implementation, it seems that I have found the source of
the issue: when issuing poll_write()
, it immediately returns an
Ready(Ok(n))
,
rather than a Pending
to indicate that it is not yet done. I believe this to
be an error.
Let us digest this a bit.
Remember when I said earlier, that when a future returns Poll::Ready(..)
, it is considered to be resolved?
What Tokio is doing here is that it always immediately returns
Poll::Ready(..)
, and spawns the work off to be performed in the background.
This means that the file.write().await
call returns immediately, because from
the point of view of Rust, it is done. But from the point of Tokio, it is only
done spawning in the background, and not completed. In order to wait for it to
complete, I need to call:
file.flush().await?;
I find this behaviour to be somewhat unintuitive.