Understanding Tokio, pt. 1

I would like to understand how Tokio works. My interests run to the real-time and concurrent side of things but I don't know much about Tokio itself. Before the introduction of async and stable futures I more or less intentionally avoided learning it, not out of any sense that Tokio was wrong but there's only a finite amount of time to learn stuff and it's a rough business to learn a thing that is going to go out of date soonish.

Anyhow. These are my notes for learning Tokio. I don't have a plan of how to learn it's internals, but, generally, I learn best when I have some kind of project to frame my reading around. Context really helps. I don't have a sense of what I want to build long-term, but an HTTP load generator that can scale itself to find the maximum requests per second a server can handle while still satisfying some latency constraint would be pretty neat. This does mean I need to combine my learning with another library – hyper -- but I've used it before and think I can get away with leaving it as a black-box.

I also like DNS. Maybe I'll build a server later.

A Load Generator

Here's what I'd like to build. Inspired by wrk2 I'd like to see an HTTP load generator that can avoid coordinated omission – see also this article and this issue on Gattling – but also probe systems for constraint violation. So, if I demand that my system cannot have a maximum response latency in excess of one second for five minutes how many requests per second (RPS) can the system actually handle?

There's a few pieces here that need to be in place:

  • a feedback system to scale RPS up and down
  • time-windowed latency measurement
  • general HTTP client (programmable like gattling)?

For a first cut, that's a lot and getting each of those pieces in place really detracts from learning Tokio internals. I think this is probably a simpler set of goals to start with:

  • user specifies a constant-rate RPS
  • maximum latency record
  • HTTP client that only does GETs

Avoiding Coordination

What we really have to avoid in our little project is accidental coordination between the system under test and our load generator. For instance, let's say the user requires that we send GETs every 1 second but our algorithm for scheduling is:

  1. Open connection to server
  2. Once established, start the clock for the next request.
  3. Send request.
  4. Receive request, stop the clock and calculate the interval for the next request.
  5. Pause, if appropriate.

Do you see the problems? Firstly, we're assuming that the time to establish a connection to the server is zero. This is never true and this time has to count to the interval delay. Second, say we fix the problem introduced in the first step but the gap between steps three and four are greater than one second, maybe multiple seconds. We've failed to measure the system's latency in this high-latency time period and have done so by accidentally allowing the server to dictate what the real request interval is. (This, incidentally, is the heart of "coordinated omission". Skipping measurements is the omission bit.) We need to be able to schedule GETs on a regular interval.

Open questions for me right now:

  • How many outstanding requests can we stack up inside of tokio: Is there a
    finite queue somewhere or what?
  • How does time work in tokio?

The answer to each of these will determine how we avoid coordination. Let's, I think, focus on time first.

Time in Tokio

Just from breezing through the docs it seems like tokio::time is our first
area of interest. I'm poking around tokio's source code at 248bf21, which
corresponds to the 0.2.6 release. The module is located under
tokio/src/time pretty much like you'd expect. The module docs mention an Interval that "yield(s) a value at a fixed period". Cool. Interval is simply:

/// Stream returned by [`interval`](interval) and [`interval_at`](interval_at).
#[derive(Debug)]
pub struct Interval {
    /// Future that completes the next time the `Interval` yields a value.
    delay: Delay,

    /// The duration between values yielded by `Interval`.
    period: Duration,
}

There's one public function on Interval and it's async fn tick(&mut self) -> Instant. Its implementation is:

pub async fn tick(&mut self) -> Instant {
    poll_fn(|cx| self.poll_tick(cx)).await
}

poll_fn is tokio::future::poll_fn. This function is public in the crate and exists to create a PollFn<F>:

pub(crate) fn poll_fn<T, F>(f: F) -> PollFn<F>
where
    F: FnMut(&mut Context<'_>) -> Poll<T>,
{
    PollFn { f }
}

There's not much to PollFn<F>:

pub(crate) struct PollFn<F> {
    f: F,
}

There's also a Future implementation for PollFn:

impl<T, F> Future for PollFn<F>
where
    F: FnMut(&mut Context<'_>) -> Poll<T>,
{
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        (&mut self.f)(cx)
    }
}

Okay, cool. When PollFn::poll is called the underlying F from poll_fn is called with cx passed to it. Jumping back to the implementation of
Interval::tick we see that the passed F is |cx| self.poll_tick(cs). What's poll_tick?

pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
    // Wait for the delay to be done
    ready!(Pin::new(&mut self.delay).poll(cx));

    // Get the `now` by looking at the `delay` deadline
    let now = self.delay.deadline();

    // The next interval value is `duration` after the one that just
    // yielded.
    let next = now + self.period;
    self.delay.reset(next);

    // Return the current instant
    Poll::Ready(now)
}

Okay. There's a lot here, even though there's only a few lines of code. Let's tear this down. ready!(Pin::new(&mut self.delay).poll(cx)); What's going on? We know from earlier that self.delay is a tokio::time::Delay. This type is a future and generally only created by delay_until(deadline: Instant) -> Delay and delay_for(duration: Duration) -> Delay. If we take a look at Delay's struct declaration we don't learn a lot:

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Delay {
    /// The link between the `Delay` instance and the timer 
    /// that drives it.
    ///
    /// This also stores the `deadline` value.
    registration: Registration,
}

Clearly whatever Registration is will be important but let's move on for now (side note: I don't have a concrete reason for moving on, other than that my personal stack of unknowns is too deep to dig into Registration) and look at the the Future implementation of Delay:

impl Future for Delay {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        // `poll_elapsed` can return an error in two cases:
        //
        // - AtCapacity: this is a pathlogical case where far too many
        //   delays have been scheduled.
        // - Shutdown: No timer has been setup, which is a mis-use error.
        //
        // Both cases are extremely rare, and pretty accurately fit into
        // "logic errors", so we just panic in this case. A user couldn't
        // really do much better if we passed the error onwards.
        match ready!(self.registration.poll_elapsed(cx)) {
            Ok(()) => Poll::Ready(()),
            Err(e) => panic!("timer error: {}", e),
        }
    }
}

First off here, a Delay is a Future that returns nothing: that's the Output = (), which jives with my sense that Delay is solely about controlling program flow. The macro ready! is defined in futures-core crate and is:

#[macro_export]
macro_rules! ready {
    ($e:expr $(,)?) => (match $e {
        $crate::core_reexport::task::Poll::Ready(t) => t,
        $crate::core_reexport::task::Poll::Pending =>
            return $crate::core_reexport::task::Poll::Pending,
    })
}

So, ready! emits the return type of a Future if that polling result is
Poll::Ready or another Poll::Pending if it's not. If we kind of guess at
what self.registration.poll_elapsed is doing there we can see how poll_tick must work. That function polls our Delay until it passes a deadline -- inspection of delay_until shows that the deadline is held by the Registration -- and then returns with a Ready. Exactly what drives the polling and the timing guarantees of that thing are still unknown but it seems likely a Delay will never be ready until after the deadline is past. This is actually harder than it seems at first to get right, which is why I say 'seems'. What happens when clocks skew backward and how you treat many deadlines that terminate at roughly the same time is hard topic.

Okay, that's Delay but now what is Registration? Well, it's in a submodule of tokio::time so its full name is tokio::time::driver::Registration and the type is not exposed outside of the crate. We're looking at internal implementation details here, which is pretty well where we want to be to answer the second of our outstanding questions. Registration is a small file:

use crate::time::driver::Entry;
use crate::time::{Duration, Error, Instant};

use std::sync::Arc;
use std::task::{self, Poll};

/// Registration with a timer.
///
/// The association between a `Delay` instance and a timer is done lazily in
/// `poll`
#[derive(Debug)]
pub(crate) struct Registration {
    entry: Arc<Entry>,
}

impl Registration {
    pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
        Registration {
            entry: Entry::new(deadline, duration),
        }
    }

    pub(crate) fn deadline(&self) -> Instant {
        self.entry.time_ref().deadline
    }

    pub(crate) fn reset(&mut self, deadline: Instant) {
        unsafe {
            self.entry.time_mut().deadline = deadline;
        }

        Entry::reset(&mut self.entry);
    }

    pub(crate) fn is_elapsed(&self) -> bool {
        self.entry.is_elapsed()
    }

    pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
        self.entry.poll_elapsed(cx)
    }
}

impl Drop for Registration {
    fn drop(&mut self) {
        Entry::cancel(&self.entry);
    }
}

A Registration is an std::sync::Arc surrounding some other tokio internal type Entry and, in fact, the Registration::poll_elapsed that we were curious about above is a call to Entry::poll_elapsed. I'm actually pretty curious about why Registration is a thing and I'd be willing to bet we're going to find intrusive data structures somewhere. Intrusive data structures are responsible for maintaining the relationship of the structure they reside in and have some bummer downsides in that now your data is now fully aware of the structure it's a part of and can't as easily be put into other structures, unless you wrap them, like I bet Registration is doing. But, intrusive data structures do have some excellent upsides when you care about optmizing for memory allocation and cache efficiency of your structures, which a library like tokio absolutely should be.

I think this is actually a pretty good place to stop this article. We still
don't have a sense of how time works in tokio but we have a better sense of what we don't know. Next article I'll dig right into Entry and its relationship with Registration, then pop back up the stack to Interval since we've got Delay pretty well nailed down now, I do believe.