Understanding Tokio, pt. 2

This is my second post exploring the internals of tokio. You can find the first here. As a refresher the last article had two open questions at its core:

  • How many outstanding requests can we stack up inside of tokio: Is there a
    finite queue somewhere or what?
  • How does time work in tokio?

When I left off we were exploring the second: how does time work in tokio?

Exploring tokio::time

Let's pick back up. If you haven't read the first post I warmly recommend it. We were puzzling over tokio::time::driver::Entry and I'd guessed that we were going to run into some kind of intrinsic data structure. Why? Well,
tokio::time::driver::Registration is just an std::sync::Arc<Entry> with some functions hung off it and somewhere, somehow, tokio's going to have to start feeding it scheduler and Registration is very much not maintaining any kind of structured relationship of Entry instances, so Entry must being doing that itself. Maybe. Let's see.

Here's the Entry struct:

/// Internal state shared between a `Delay` instance and the timer.
///
/// This struct is used as a node in two intrusive data structures:
///
/// * An atomic stack used to signal to the timer thread that the entry state
///   has changed. The timer thread will observe the entry on this stack and
///   perform any actions as necessary.
///
/// * A doubly linked list used **only** by the timer thread. Each slot in the
///   timer wheel is a head pointer to the list of entries that must be
///   processed during that timer tick.
pub(crate) struct Entry {
    /// Only accessed from `Registration`.
    time: CachePadded<UnsafeCell<Time>>,

    /// Timer internals. Using a weak pointer allows the timer to shutdown
    /// without all `Delay` instances having completed.
    ///
    /// When `None`, the entry has not yet been linked with a timer instance.
    inner: Weak<Inner>,

    /// Tracks the entry state. This value contains the following information:
    ///
    /// * The deadline at which the entry must be "fired".
    /// * A flag indicating if the entry has already been fired.
    /// * Whether or not the entry transitioned to the error state.
    ///
    /// When an `Entry` is created, `state` is initialized to the instant at
    /// which the entry must be fired. When a timer is reset to a different
    /// instant, this value is changed.
    state: AtomicU64,

    /// Task to notify once the deadline is reached.
    waker: AtomicWaker,

    /// True when the entry is queued in the "process" stack. This value
    /// is set before pushing the value and unset after popping the value.
    ///
    /// TODO: This could possibly be rolled up into `state`.
    pub(super) queued: AtomicBool,

    /// Next entry in the "process" linked list.
    ///
    /// Access to this field is coordinated by the `queued` flag.
    ///
    /// Represents a strong Arc ref.
    pub(super) next_atomic: UnsafeCell<*mut Entry>,

    /// When the entry expires, relative to the `start` of the timer
    /// (Inner::start). This is only used by the timer.
    ///
    /// A `Delay` instance can be reset to a different deadline by the thread
    /// that owns the `Delay` instance. In this case, the timer thread will not
    /// immediately know that this has happened. The timer thread must know the
    /// last deadline that it saw as it uses this value to locate the entry in
    /// its wheel.
    ///
    /// Once the timer thread observes that the instant has changed, it updates
    /// the wheel and sets this value. The idea is that this value eventually
    /// converges to the value of `state` as the timer thread makes updates.
    when: UnsafeCell<Option<u64>>,

    /// Next entry in the State's linked list.
    ///
    /// This is only accessed by the timer
    pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,

    /// Previous entry in the State's linked list.
    ///
    /// This is only accessed by the timer and is used to unlink a canceled
    /// entry.
    ///
    /// This is a weak reference.
    pub(super) prev_stack: UnsafeCell<*const Entry>,
}

There's a lot here. Happily, it's well-commented. We are, in fact, taking a look at the internal state of an intrinsic structure, two by the comment. "Internal state shared between a Delay instance and the timer." Okay, well. I wonder if this comment is a little out of date. We know that the Delay actually makes use of Registration, so only uses Entry indirectly. I'm also not sure what "the timer" is. Both sub-points in the intro comment mention "timer thread" but I don't know what that is yet. Let's take the struct in smaller pieces.

/// Only accessed from `Registration`.
time: CachePadded<UnsafeCell<Time>>,

It looks like CachePadded is defined at the bottom of Entry's file, like so:

#[cfg_attr(target_arch = "x86_64", repr(align(128)))]
#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))]
#[derive(Debug)]
struct CachePadded<T>(T);

As advertised, it looks like CachePadded ensure that any type T is aligned to some multiple of word size. These kinds of optimizations can be tricky to get right and keep right, especially as computing hardware changes. Reasonable bet here that this is a profiler guided optimization. Anyway, no logic for us to concern ourselves about. UnsafeCell is std::cell::UnsafeCell. If you haven't run across this type before it's how Rust achieves interior mutability. Explaining that in depth is beyond the scope of this post but I warmly encourage you to read UnsafeCell's
documentation which does have an excellent explanation. Suffice it to say, UnsafeCell should tip us off to understanding that while Entry presents an immutible interface -- & rather than &mut references -- its internals may actually be changing.

So what's Time? It looks like a storage location for the information passed from Delay, through Registration and to Entry::new:

#[derive(Debug)]
pub(crate) struct Time {
    pub(crate) deadline: Instant,
    pub(crate) duration: Duration,
}

The Entry.time field is a cache-padded, internally mutable storage place for the Instant and Duration that originated from Delay. With this context is actually makes much more sense to me to do cache padding. There's no guarantee that Instant and Duration will be sized to play nicely with the CPU's cache, so why risk it? Good call, tokio.

The next field in the Entry struct is inner: Weak<Inner>. Weak is
std::sync::Weak, a pointer that does not own the thing it points to. The use of Weak allows self-referential structures and for structures to all share pointers to the same bit of memory but you have to be careful that something, somewhere, still ultimately owns the memory else you'll have a leak. Also, because Weak does not own the thing it points to its possible that the thing will not exist to point at, meaning any reads on a Weak could return nothing, but safely. Compare this to raw pointers. They also do not own the memory they point to but dereferencing might result in a memory fault. Now what's Inner? Its proper name is tokio::time::driver::Inner and its definition is:

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
pub(crate) struct Inner {
    /// The instant at which the timer started running.
    start: Instant,

    /// The last published timer `elapsed` value.
    elapsed: AtomicU64,

    /// Number of active timeouts
    num: AtomicUsize,

    /// Head of the "process" linked list.
    process: AtomicStack,

    /// Unparks the timer thread.
    unpark: Box<dyn Unpark>,
}

We haven't run into either Driver or Handle yet. AtomicStack is probably some kind of lockless stack -- I'll actually be curious to peek there and see which data structure tokio uses -- but I don't have a clue what Unpark is. Personally, my stack of unknowns has gotten too deep. Digging into unknowns is important but at some point I, at least, need to go broad to get some context. If I don't, while I might be able to understand code locally I end up not having a sense of what it's really doing, just that it is doing something.

I still want to understand Inner -- it's clearly important -- as well as the other fields of Entry but we're in a mess of related things here with no sense of how they interact. Let's get some of the breadth by looking at how the functions on Entry work. Here's Entry::new:

pub(crate) fn new(deadline: Instant, duration: Duration) -> Arc<Entry> {
    let inner = Handle::current().inner().unwrap();
    let entry: Entry;

    // Increment the number of active timeouts
    if inner.increment().is_err() {
        entry = Entry::new2(deadline, duration, Weak::new(), ERROR)
    } else {
        let when = inner.normalize_deadline(deadline);
        let state = if when <= inner.elapsed() {
            ELAPSED
        } else {
            when
        };
        entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
    }

    let entry = Arc::new(entry);
    if inner.queue(&entry).is_err() {
        entry.error();
    }

    entry
}

This is promising. The type of inner there is Arc<Inner> so whatever
Handle is it looks like that's how we get Inners. The next important code fragment looks to be inner.increment().is_err(). Here's Inner.increment:

/// Increment the number of active timeouts
fn increment(&self) -> Result<(), Error> {
    let mut curr = self.num.load(SeqCst);

    loop {
        if curr == MAX_TIMEOUTS {
            return Err(Error::at_capacity());
        }

        let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst);

        if curr == actual {
            return Ok(());
        }

        curr = actual;
    }
}

If you'll recall from the struct definition num field is defined as the number of active timeouts, but I'm not sure if that's globally or for this particular Inner. MAX_TIMEOUTS is usize::MAX >> 1 so I'm going to guess that's global and we're looking at a global singleton in Inner. Before we pop over to Handle.current to confirm or deny this, let's finish out the rest of increment. The loop in the rest of the function appears to do two things:

  1. If the current value of self.num is MAX_TIMEOUTS notify the caller that there's no more capacity available and
  2. increment the value of self.num.

Atomic programming is hard and I think this is a little overly conservative. The use of SeqCst means all threads see the same memory operations in the same order which is a very harsh constraint to operate under, in that all CPUs have to be synchronized.  Secondly, I think this function is trying to do an atomic verion of a = (a + 1) % b, basically, which if I'm correct can be coded a little tighter. In fact, I made a PR to fix
this.

It's here I'm going to go on a wild tangent. In the PR Eliza Weisman recommended introducing a loom test for the change I'd introduce. Loom is an implementation of CDSChecker: Checking Concurrent Data Structures Written with C/C++ Atomics by Brian Norris and Brian Demsky -- #allthebrians -- which I recall reading with great interest and thinking "Boy, I have no idea how you'd do that with Rust." But someone did it and now I'm going to take a moment to understand how loom works internally, then pop back up and continue exploring tokio proper.

Next time, loom.