Hopper Rework: multiple senders

I wrapped up my last post by saying

I think, next steps, I'll try and get multiple senders going at once. That should be fun.

and that's what I've done! It turns out, it also was pretty fun. You can see the full diff between the last post and this in Github's compare mode but we'll walk through the changes piece by piece.

The main driver of new lines in this change set is a rework of the experimental setup. In working with parallel programs – especially where atomics are present – it's quite useful to have the ability to run a program repeatedly, being on the look out each time for behaviour from the program that is counter to your desires. The ultimate aim of this work is to replace the internals of Hopper so we know that:

  • messages must be read in the same order as they are written and
  • messages must be read exactly once.

It's easy enough for a parallel program to appear to work most of the time but, in actuality, be broken. This is especially true of parallel work done on a strongly-ordered CPU – like x86 – which is then run on a weakly-ordered CPU – like ARM. What I've done is to incorporate clap into mmap_comm, allowing me to set at runtime either how many experiments I wish to run or how long I want experiments to be run for. For instance, here's an hour long run of experiments of 100,000 total messages spread across 10 sending threads.

> cargo run --release -- --total_writes 100000 --duration 3600  --total_senders 10
   Compiling mmap_comm v0.1.0 (file:///home/blt/projects/com/postmates/mmap_comm)
    Finished release [optimized] target(s) in 31.60 secs
     Running `target/release/mmap_comm --total_writes 100000 --duration 3600 --total_senders 10`
ELAPSED: Duration { secs: 3600, nanos: 326458305 }
TIMES (of 3670):
    MIN: 0.8145
    AVG: 0.9810
    0.75: 1.0247
    0.90: 1.0537
    0.95: 1.0689
    MAX: 1.2360

This particular run was successful the whole way through –  3,670 total runs to be exact – the fastest of which was 0.8145 seconds in duration, the slowest 1.2360 seconds and 95th quantile run 1.0689 seconds. The timing statistics are computed with my CKMS implementation and lack niceties for understanding runtime behaviour like standard deviation or other statistics you'd find in a professional setup like criterion.rs. Not too bad for understanding the general swing of things, at least so far.

The careful reader will notice that I've removed a chunk of the validation setup from the last post. In particular, we no longer emit Actions into queues for post-run analysis. While this was very useful I found that pushing into the queues was making a certain failure very unlikely.

My ambition for this post was to make multiple Senders workable. Previously the Sender would keep track of its own offset, perform a write, flush the mmap and then increase the number of writes available to be read by Receiver. This was slow on account of the regular flushing but – because offset is kept private to Sender – means that there's no possibility for coordination when we spin up new Sender threads: each Sender will race all others, overwriting the same offsets. Not great. My first idea was to place offset in an atomic usize, guaranteeing offset distinction, and then do the normal flow. Say we have two Senders, call them A and B. The ideal situation looks like this:

[A:s1][SENDER] QWRITE 1 @ 0
[B:s1][SENDER] QWRITE 2 @ 1
[r1][RECEIVER] QREAD 1 @ 0
[r4][RECEIVER] QREAD 2 @ 1

The Receiver in this case has read out 1,2 as expected. Ignore for a minute that flushing on every write is unbearably slow. There's a correctness issue here if A and B interleave. Check this out:

[B:s1][SENDER] QWRITE 2 @ 1
[A:s1][SENDER] QWRITE 1 @ 0
[r1][RECEIVER] QREAD 0 @ 0

The trick here is we're using VALUES_TO_READ as a flag to wake up the Receiver when there's information to be read but we're susceptible to thread interleaving. A reserved offset 0 for its write but then B managed to perform its flow first, signaling correctly that there was only one write available – B's – but implying incorrectly to Receiver that the write could be found at offset 0. Oops.

There are two things that have to be done in a transaction: reservation of the Sender's offset and the write to our mmap. Additionally, flushing with every write is a no-go as it's much too slow. What I've done is to pull in parking_lot's Mutex and protect the Sender offset and a flag called unflushed_writes in that mutex. I know from previous experience that parking_lot Mutex is noticeably faster than standard library Mutex in situations of resource contention, which this is. Now only one Sender can enter the critical section, the flow that kicks off with reserving an offset and flushing the mmap. As an optimization, we use unflushed_writes to determine when flushing is appropriate, being very careful to increment VALUES_TO_READ only when the flush has completed. Receiver does not interact with the Sender Mutex at all, an improvement over the Big Dumb Lock approached used by hopper right now.

I have walked back the only-atomics goal I set out in the first post a touch without fully abandoning it longer term. Before I get to removing the Mutex and validating the approach across multiple CPU architectures I'd like to implement Senders rotating mmap'ed index files when the files grow too large, Receiver deleting files that have been totally read.

One thing I might fully walk back by the end of this work, now that I've been thinking about it in more detail, is the use of mmap. Current hopper uses two in-memory buffers – one called memory, the other disk – to store up writes for reading, potential flushing in the case of disk buffer. The two buffers, in retrospect, are a little goofy for cache locality purposes but the nice thing about this setup is that we're never storing all that much in memory. Mapping index files into memory space is going to cause hopper memory use to jump noticeably when the allowable index file size is large. Which... that's kind of lame. Unifying the two buffers into a single VecDeque plus a values_to_read_from_disk counter is starting to look better all the time.

Anyhow, next time: index file rotation.

Header image: KSC-20160826-PH_JBS01_0059, modified by the author.