Skip to main content

Tokio bug that is not a bug, or: why I think AsyncRead is not well-designed

·9 mins

This post is something that I had drafted up as a GitHub issue. During the process of investigating, I discovered that this is in fact intended behaviour, so I never posted it as an issue. But I am posting it here because I think this behaviour is somewhat surprising.

I was writing some unit tests late at night where I was writing something to a file, and then immediately read the file to make sure it was there. Doesn’t sound too wild. What is quite wild is that the tests were failing some 10% of the time! I was able to reduce this to a small test case that looks like this.

use tokio::fs::{metadata, File};
use tokio::io::AsyncWriteExt;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let dir = tempfile::tempdir().unwrap();
    let data = vec![24; 1024];
    let path = dir.path().join("file");
    let mut file = File::create(&path).await.unwrap();
    file.write(&data).await.unwrap();
    let metadata = metadata(&path).await.unwrap();
    assert_eq!(metadata.len() as usize, data.len());
}

In this test case, I’m creating a file and writing to it. Immediately after writing to it, I use metadata() to get some information on the file. Surprisingly, sometimes the data that is supposed to be written into the file is not actually there, and the metadata() returns a file size of zero instead of the expected 1024 bytes.

thread 'main' panicked at 'assertion failed: `(left == right)`
  left: `0`,
 right: `1024`', src/main.rs:14:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

This is very surprising. Generally, if I have a future that represents some action, I expect that when it resolves without error that the action has been performed. But here, this is simply not the case.

I saw there is a merged PR for this, https://github.com/tokio-rs/tokio/pull/4316. But it seems that there is still some race condition in the code. Since my application requires that file writes have actually completed when the write functions return for the contract this is a very serious bug for me!

Futures recap #

Before we continue with this wild debugging journey, let’s take a brief step back and recap what a future is, and how they work in Rust. Don’t be scared if you are not familiar with them, this is going to be a very gentle introduction.

Sync code #

With regular (non-async code), computation is said to be blocking. This means that if you call a function, you are handing over control to that function to do it’s thing until it is done, at which point it hands control (as well as a return value) back to you.

fn request(url: &str) -> Result<String>;

Here is an example. Imagine you are calling a request() function to make a HTTP request to some server. You call it ❶, it makes the request ❷, waits for the response ❸, and then returns back to you with the result ❹.

sequenceDiagram autonumber Code->>+Request: Can you make a HTTP request? Request->>+Server: GET /hello Server->>-Request: 200 OK Request->>-Code: Here is the response!

During the time that the request() function is running, it has control. You cannot do anything else at the same time, unless you spawn background threads to do them in. The function is said to block the thread while it is waiting for a response from the server.

Futures basics #

Futures allow you to do other work while you are waiting for a result. In order to make this happen, they are used differently. Instead of being called, and thus handing control of the thread over, they are polled, which is a fancy way of saying that you ask them if they are done yet, like your kid on a long car ride.

fn request(url: &str) -> Future<Output = Result<String>>;

This is what an interaction might look like:

sequenceDiagram autonumber Code->>+Future: Are you done yet?
Here is a waker for you. Future->>Code: Result is Pending. Future->>+Server: GET /hello Server->>-Future: 200 OK Future-->>Code: Poll me again please. Code->>Future: Are you done yet?
Here is a waker for you. Future->>-Code: Result is Ready: Here is the response!

Initially, the future is polled. This starts it in the background. Instead of taking control of the thread, it immediately returns Poll::Pending. When the future is polled, it also gets a handle that it can use to signal that it would like to be polled again, this is called a Waker.

In Rust, the API that is used to poll futures looks like this:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

As you can tell, this returns a Poll type. This type is defined to be an enum that can either be pending or ready.

pub enum Poll<T> {
    Ready(T),
    Pending,
}

To summarize this excursion into the world of futures here is what is important: a future is polled at least once during it’s execution. It might need to be polled more than once, depending on how it works. But once it returns Poll::Ready(..), it is considered resolved and the action that it represents is expected to have either been completed or encountered an error.

Strace #

Back to the bug (that is not a bug) that I ran into. In order for me to figure out what is going on with this strange behaviour, I like to use strace because it lets me see exactly what is going on.

I have used strace to capture a case where this fails, and I’m able to kind of reconstruct what is going on, here’s what seems to be happening (I can attach the entire trace later as well if you need it):

Thread 1: Main thread. The mkdir() is from the tempdir() crate. The two clone3() calls spawn other threads to do stuff. The futex() calls wait for stuff from the other threads.

23:46:20.465462504 mkdir("/tmp/.tmpbIzp1b", 0777) = 0
23:46:20.465497229 rt_sigaction(SIGRT_1, {sa_handler=0x7f9a3c2d3540, sa_mask=[], sa_flags=SA_RESTORER|SA_ONSTACK|SA_RESTART|SA_SIGINFO, sa_restorer=0x7f9a3c288f90}, NULL, 8) = 0
23:46:20.465511275 rt_sigprocmask(SIG_UNBLOCK, [RTMIN RT_1], NULL, 8) = 0
23:46:20.465525071 mmap(NULL, 2101248, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a3c049000
23:46:20.465539949 mprotect(0x7f9a3c04a000, 2097152, PROT_READ|PROT_WRITE) = 0
23:46:20.465555739 rt_sigprocmask(SIG_BLOCK, ~[], [], 8) = 0
23:46:20.465569114 clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0x7f9a3c249990, parent_tid=0x7f9a3c249990, exit_signal=0, stack=0x7f9a3c049000, stack_size=0x1ffe80, tls=0x7f9a3c2496c0} => {parent_tid=[1360875]}, 88) = 1360875
23:46:20.465611694 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.465637282 futex(0x5624c9442b00, FUTEX_WAIT_BITSET_PRIVATE, 0, NULL, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.465905274 futex(0x5624c9442d38, FUTEX_WAKE_PRIVATE, 1) = 0
23:46:20.465922296 mmap(NULL, 2101248, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a37dff000
23:46:20.465935781 mprotect(0x7f9a37e00000, 2097152, PROT_READ|PROT_WRITE) = 0
23:46:20.466069642 rt_sigprocmask(SIG_BLOCK, ~[], [], 8) = 0
23:46:20.466084160 clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0x7f9a37fff990, parent_tid=0x7f9a37fff990, exit_signal=0, stack=0x7f9a37dff000, stack_size=0x1ffe80, tls=0x7f9a37fff6c0} => {parent_tid=[1360876]}, 88) = 1360876
23:46:20.466137129 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.466153700 futex(0x5624c9442c90, FUTEX_WAKE_PRIVATE, 1) = 1
23:46:20.466171944 futex(0x5624c9442b00, FUTEX_WAIT_BITSET_PRIVATE, 1, NULL, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.466402036 write(1, "Metadata { file_type: FileType(F"..., 358) = 358
23:46:20.466419328 write(1, "len is: 0 but supposed to be 102"..., 34) = 34
23:46:20.466435408 write(2, "thread '", 8) = 8
23:46:20.466449264 write(2, "main", 4)  = 4
23:46:20.466462649 write(2, "' panicked at '", 15) = 15
23:46:20.466475463 write(2, "assertion failed: `(left == righ"..., 63) = 63

Thread 2: Executes the write() syscall.

23:46:20.465617795 rseq(0x7f9a3c249fe0, 0x20, 0, 0x53053053) = 0
23:46:20.465641750 set_robust_list(0x7f9a3c2499a0, 24) = 0
23:46:20.465655245 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.465669732 sigaltstack(NULL, {ss_sp=NULL, ss_flags=SS_DISABLE, ss_size=0}) = 0
23:46:20.465683859 mmap(NULL, 12288, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a3c469000
23:46:20.465699588 mprotect(0x7f9a3c469000, 4096, PROT_NONE) = 0
23:46:20.465715518 sigaltstack({ss_sp=0x7f9a3c46a000, ss_flags=0, ss_size=8192}, NULL) = 0
23:46:20.465730336 prctl(PR_SET_NAME, "tokio-runtime-w"...) = 0
23:46:20.465745344 mmap(NULL, 134217728, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_NORESERVE, -1, 0) = 0x7f9a34049000
23:46:20.465760172 munmap(0x7f9a34049000, 66809856) = 0
23:46:20.465778967 munmap(0x7f9a3c000000, 299008) = 0
23:46:20.465794727 mprotect(0x7f9a38000000, 135168, PROT_READ|PROT_WRITE) = 0
23:46:20.465811819 sched_getaffinity(1360875, 32, [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]) = 8
23:46:20.465831285 getrandom("\x79\x15\x06\x09\x96\xca\x83\x44\x90\xd2\x09\xa6\x8f\x03\x20\xde", 16, GRND_INSECURE) = 16
23:46:20.465850892 openat(AT_FDCWD, "/tmp/.tmpbIzp1b/file", O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666) = 3
23:46:20.465886990 futex(0x5624c9442b00, FUTEX_WAKE_PRIVATE, 1) = 1
23:46:20.465908881 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 0, {tv_sec=612048, tv_nsec=590130682}, FUTEX_BITSET_MATCH_ANY) = -1 EAGAIN (Resource temporarily unavailable)
23:46:20.465926524 futex(0x5624c9442c90, FUTEX_WAIT_BITSET_PRIVATE, 2, NULL, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.466193254 futex(0x5624c9442c90, FUTEX_WAKE_PRIVATE, 1) = 0
23:46:20.466209785 write(3, "\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30"..., 1024) = 1024
23:46:20.466397417 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=612048, tv_nsec=590618627}, FUTEX_BITSET_MATCH_ANY) = 0

Thread 3: Spawned to call metadata() on the file (using the statx() syscall.

23:46:20.466127972 rseq(0x7f9a37ffffe0, 0x20, 0, 0x53053053) = 0
23:46:20.466149913 set_robust_list(0x7f9a37fff9a0, 24) = 0
23:46:20.466167426 rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
23:46:20.466182213 sigaltstack(NULL, {ss_sp=NULL, ss_flags=SS_DISABLE, ss_size=0}) = 0
23:46:20.466197302 mmap(NULL, 12288, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7f9a3c466000
23:46:20.466214765 mprotect(0x7f9a3c466000, 4096, PROT_NONE) = 0
23:46:20.466230564 sigaltstack({ss_sp=0x7f9a3c467000, ss_flags=0, ss_size=8192}, NULL) = 0
23:46:20.466244891 prctl(PR_SET_NAME, "tokio-runtime-w"...) = 0
23:46:20.466259118 mmap(NULL, 134217728, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_NORESERVE, -1, 0) = 0x7f9a2fdff000
23:46:20.466272653 munmap(0x7f9a2fdff000, 2101248) = 0
23:46:20.466288643 munmap(0x7f9a34000000, 65007616) = 0
23:46:20.466303551 mprotect(0x7f9a30000000, 135168, PROT_READ|PROT_WRITE) = 0
23:46:20.466320773 sched_getaffinity(1360876, 32, [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]) = 8
23:46:20.466337966 getrandom("\x12\x16\x37\x40\xbd\xd7\x9a\x33\xc1\xb8\x1d\xbb\x2d\xb3\xa2\xce", 16, GRND_INSECURE) = 16
23:46:20.466355218 statx(AT_FDCWD, "/tmp/.tmpbIzp1b/file", AT_STATX_SYNC_AS_STAT, STATX_ALL, {stx_mask=STATX_ALL|STATX_MNT_ID, stx_attributes=0, stx_mode=S_IFREG|0644, stx_size=0, ...}) = 0
23:46:20.466379063 futex(0x5624c9442b00, FUTEX_WAKE_PRIVATE, 1) = 1
23:46:20.466392678 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=612048, tv_nsec=590617816}, FUTEX_BITSET_MATCH_ANY) = 0
23:46:20.466870595 futex(0x5624c9442d38, FUTEX_WAKE_PRIVATE, 1) = 0
23:46:20.466885833 futex(0x5624c9443330, FUTEX_WAKE_PRIVATE, 1) = 1

What we can observe here is that the statx() syscall is called before the write completes. It is a bit tricky to make sense of this, but the write happens in thead 2:

23:46:20.466209785 write(3, "\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30\30"..., 1024) = 1024
23:46:20.466397417 futex(0x5624c9442d38, FUTEX_WAIT_BITSET_PRIVATE, 1, {tv_sec=612048, tv_nsec=590618627}, FUTEX_BITSET_MATCH_ANY) = 0

The times indicated before the syscalls are always timestamps of when the syscall was issues. This means that we know that the write() syscall was issued at 23:46:20.466209785 and completed by 23:46:20.466397417, because that is when the next syscall was issued.

At the same time, we can see that the statx() syscall was issued while the write was still in progress:

23:46:20.466355218 statx(AT_FDCWD, "/tmp/.tmpbIzp1b/file", AT_STATX_SYNC_AS_STAT, STATX_ALL, {stx_mask=STATX_ALL|STATX_MNT_ID, stx_attributes=0, stx_mode=S_IFREG|0644, stx_size=0, ...}) = 0

This was issued at 23:46:20.466355218, while the write only finished at 23:46:20.466397417. Obviously, then the statx() syscall cannot see the write that was still in progress and it indicated that the file is empty.

Investigation #

Okay, next question: why is this behaviour? I dug into the Tokio source code to find out how write works, and to see if I can spot where the issue might arise.

Looking through the implementation, it seems that I have found the source of the issue: when issuing poll_write(), it immediately returns an Ready(Ok(n)), rather than a Pending to indicate that it is not yet done. I believe this to be an error.

Let us digest this a bit.

Remember when I said earlier, that when a future returns Poll::Ready(..), it is considered to be resolved?

What Tokio is doing here is that it always immediately returns Poll::Ready(..), and spawns the work off to be performed in the background. This means that the file.write().await call returns immediately, because from the point of view of Rust, it is done. But from the point of Tokio, it is only done spawning in the background, and not completed. In order to wait for it to complete, I need to call:

file.flush().await?;

I find this behaviour to be somewhat unintuitive.