Hopper Rework: reliably transmitting from sender to receiver

Hopper Rework: reliably transmitting from sender to receiver

In the previous Hopper Rework post I introduced my ambition to get disk-backed MPSC Sender / Receiver going on top of

  • fine-grained, atomic synchronization
  • mmap'ing queue files

instead of the giant-struct-in-a-mutex nightmare we have now. Well! I did some fiddling on the mmap_comm prototype and have positive results. You can see the full diff in Github compare mode but we'll walk through things piece by piece.

First step, recall that originally sender and receiver synchronized on two atomic variables: TOTAL_SENDERS and VALUES_TO_READ. The receiver would loop around, pulling from the queue so long as VALUES_TO_READ was non-zero and then check that TOTAL_SENDERS was also non-zero in order to loop around again for another crack at pulling from the queue which may or may not have zero items in it. The original code called this out as "crummy and totally race-y" which was absolutely correct.

Consider this sequence of events:

[s0][SENDER]   QWRITE 1
[s1][SENDER]   INCR 'VALUES_TO_READ'  := 1
[r0][RECEIVER] READ 'VALUES_TO_READ'  == 1
[r1][RECEIVER] QREAD 1
[r2][RECEIVER] DECR 'VALUES_TO_READ'  := 0
[r3][RECEIVER] READ 'TOTAL_SENDERS'   == 1
[s2][SENDER]   QWRITE 2
[s3][SENDER]   INCR 'VALUES_TO_READ'  := 1
[s4][SENDER]   DECR 'TOTAL_SENDERS'   := 0
[s5][SENDER]   EXIT
[r4][RECEIVER] QREAD 2
[r5][RECEIVER] DECR 'VALUES_TO_READ'  := 0
[r6][RECEIVER] READ 'TOTAL_SENDERS'   == 0
[r7][RECEIVER] EXIT

This is a little dense so let's pull out a single line and dissect it.

[r5][RECEIVER] DECR 'VALUES_TO_READ' == 0

Here we see the receiver thread [RECEIVER] performs a decrement operation – denoted DECR – on the atomic variable VALUES_TO_READ. Exactly how the DECR operation is performed we leave intentionally vague for now in-diagram but you'll recall from code that it's a sequentially consistent fetch/sub. Anyway, more on that in a later post. The final mark := 0 means that the operation DECR has set the atomic integer to 0. (READ operations are denoted by ==.) The [r4] at the head of the line denotes this as the 4th conceptual action by the receiver thread. We can order the actions inside a thread but not between threads. That is, in the above example we see the sender and receiver threads playing nicely, rendezvousing back and forth as the value '1' and then '2' get written into queue. It's possible that the threads' independent operations could be re-ordered with respect to one another. Say:

[s0][SENDER]   QWRITE 1
[s1][SENDER]   INCR 'VALUES_TO_READ'  := 1
[r0][RECEIVER] READ 'VALUES_TO_READ'  == 1
[r1][RECEIVER] QREAD 1
[r2][RECEIVER] DECR 'VALUES_TO_READ'  := 0
[s2][SENDER]   QWRITE 2
[s3][SENDER]   INCR 'VALUES_TO_READ'  := 1
[s4][SENDER]   DECR 'TOTAL_SENDERS'   := 0
[s5][SENDER]   EXIT
[r3][RECEIVER] READ 'TOTAL_SENDERS'   == 0
[r4][RECEIVER] EXIT

Oops. Now, we know as knowledgeable observers know that there's been a '2' left on the queue: receiver hung-up too quickly. The global sequence of events will change depending on how the operation-system decides to schedule our threads. What we have to do is deduce a synchronization method that can survive re-ordering of actions and still reach the same conclusion.

Doing so is hard. To that end, the new commits introduce an Action whose sole purpose is to make debugging this type of thing possible. Action notes whether a read or a write was done – corresponding to QREAD or QWRITE from the above – and, with proper storage gives us the ability to confirm that every write is matched to a read. Tremendously useful.

What's the result? Well, it turns out in our first commit we goofed a touch on reading and writing into the queue. The sender wasn't properly setting the bytes of our serialized u64 because we didn't clear the write buffer. D'oh. Further, the receiver wasn't correctly reading because we weren't seeking back to the front of the read buffer. Double d'oh. (Just goes to show, don't set up a concurrency experiment without confirming its sequential behaviour first.) These problems resolved, we still have the re-ordering issue identified above. What'd we do? Get rid of TOTAL_SENDERS and sleep!

We know that receiver is, ultimately, going to be an infinite poll because that's what hopper receiver does. It hurts us not at all – right now – to allow the sender to zip along as quickly as possible while the receiver takes a nap. Our sequence of events could now look like:

[s00][SENDER]   QWRITE 1
[s01][SENDER]   INCR 'VALUES_TO_READ'  := 1
[r00][RECEIVER] READ 'VALUES_TO_READ'  == 1
[r01][RECEIVER] QREAD 1
[r02][RECEIVER] DECR 'VALUES_TO_READ'  := 0
[r03][RECEIVER] READ 'VALUES_TO_READ'  == 0
[r04][RECEIVER] SLEEP 
[s02][SENDER]   QWRITE 2
[s03][SENDER]   INCR 'VALUES_TO_READ'  := 1
[s04][SENDER]   EXIT
[r05][RECEIVER] READ 'VALUES_TO_READ'  == 1
[r06][RECEIVER] QREAD 2
[r07][RECEIVER] DECR 'VALUES_TO_READ'  := 0
[r08][RECEIVER] READ 'VALUES_TO_READ'  == 0
[r09][RECEIVER] SLEEP 
[r10][RECEIVER] READ 'VALUES_TO_READ'  == 0
[r11][RECEIVER] SLEEP 
[r12][RECEIVER] READ 'VALUES_TO_READ'  == 0
[r13][RECEIVER] SLEEP 
[r14][RECEIVER] EXIT

I think, next steps, I'll try and get multiple senders going at once. That should be fun.


Header image: KSC-20160829-PH_KLS01_0029, modified by the author.