True Story Follows
So I’m on the white board with Dmitriy “Kermit” Bryndin, and we’re writing all over the windows like it’s a scene out of A Beautiful Mind. The application we work on is one in which we have hundreds of asynchronous workers that process events in the Uber ecosystem, and the output of our application may or may not have a near real-time requirement (maybe like 10 seconds). Another large chunk of the output has no real-time requirement (and anything within a few hours is acceptable). In order to satisfy both, we must ensure that we have enough workers to keep up with incoming events at all times. This can become fairly inefficient though since traffic patterns vary wildly by time of day and day of the week, so the result is that our system is overprovisioned during non-peak hours. Moreover, at the time of this erudite discussion, we had not yet eruditely mastered our database query patterns, and so there was a point at which adding too many workers would exceed the max connection limits to the database (I just want to make clear though, this problem has since been solved).
Hence, some of the issues above can be mitigated by processing incoming tasks in different queues if you can figure out which were high priority and which were low priority.
My above statement though is a conclusion that was reached based off of a number of constraints. We are using celery, and so the standard queueing mechanism is just a standard first in, first out queue. The general strategy for separating tasks into important and not as important involves creating multiple queues and then provisioning a number of workers dedicated to respective queues in proportion to how high of priority that queue is. So for example, we might have 100 low priority workers that consume from “high” and “low” and then an extra 25 workers dedicated to consuming from “high”. Now the “high” queue is permanently overprovisioned and if we fall behind on incoming events, the dedicated high priority workers will need to become completely saturated in order for high priority items to fall behind.
The other key word here is “mitigate”. The setup of multiple queues is beneficial only to the extent that they help mitigate disasters, but it certainly doesn’t help with resource utilization as long as the low priority queues are at least intended to be processed within a reasonable timeframe. Ideally, for some input task, I could use some heuristic to be able to say “ideally this task gets processed before X timestamp”. If I have workers that could then process events in the order of their priority, now it doesn’t actually matter if we’re not keeping up with real time events as long as the current timestamp is less than the “process by” timestamp of an input task. Now the system could store tasks in a queue that are just sitting and waiting to be executed by a smaller pool of worker machines. Only when the “process by timestamps” start to exceed the current timestamp do we need to start provisioning more workers, but as an added benefit, you would be able to anticipate when this threshold is about to be reached. With only one (priority) queue, there’s also no complicated management of worker allocation to different queues and a probable over provisioning of every single queue.
Now for celery, we could of course use a redis sorted set to create a priority queue, which uses skip lists under the hood, but that’s totally lame. Only losers write priority queues that have logarithmic enqueue and constant time dequeues. Winners write constant time enqueues and logarithmic dequeues. So we should write our own.
The Use Case
The use case for the resultant code I’ll elaborate on below stems from the use case above. Asynchronous workers executing tasks is a concrete use case, but any priority queue that involves a heuristic to assign a priority to an item that later gets processed is valid. Clarifying that a heuristic correlates with the use case is an important distinction: in essence, I can create a datastructure that is fast with no locks and scales to the number of cores in the processor at the expense of not being able to guarantee that a dequeued item is the absolute highest priorityitem. For use cases that fit, this should not matter in practice.
If I care about a lock-free datastructure, that must mean that I have multiple workers. If I’m not able to get the absolute highest priority item from a dequeue operation, this is effectively a result of contention, meaning that dequeues (which are writes to the datastructure) are saturated. Therefore, if the highest priority item isn’t returned to the caller, it just means that another caller will get the highest priority item shortly after the contention is resolved.
So while I might not get a result that is pure and correct from a dequeue operation, as an individual caller I would never know that and I would never care because in the context of the larger system, everything is healthy.
Moreover, if the queue is indeed scored by a heuristic, then we already know that a heuristic cannot be 100% accurate. So an out of order result could be reasoned about as added variance to whatever scoring mechanism, and the performance benefits of allowing this leeway clearly outweigh the negligible cost of an absence of purity (I say clearly, but if your use case requires dequeues in perfect order, then your use case doesn’t fit anyway).
All of the code referenced in this discussion can be found On Github. Also, I was at Github’s headquarters last night, so let’s just throw in a picture of that as well:
Based solely off of that picture, things must get pretty crazy at GitHub.
So up front, here’s the general idea at a high level:
Create a lock-free linked list of priority queues ordered by their respective root values.
I implemented both the priority queues and the lock free linked list with my own 2 hands. You can read my post on Bootstrapped Skew Binomial Queues in Go and my post on Thread Safe Lock-Free Linked Lists in Go. But don’t read too hard. There’s actually bugs in both code examples. I may or may not go back and address those bugs in the blog post, but we’ll see, that kind of sounds like a lot of work. And if you copy and paste some random code you found a blog and turn it into production code when there’s not even corresponding test coverage, then that’s your fault, not mine.
The priority queues are based off of the critically acclaimed white paper here, hosted on my Alma Mater’s web server, and the lock-free linked lists are based off of Tim Harris’s gripping white paper: A Pragmatic Implementation of Non-Blocking Linked-Lists
I won’t go into detail about how the priority queues work, but the only important take-away in the context of what I’m talking about now is the runtime complexity of each operation against the queue. This is enumerated below:
Now, the only truly expensive operation is dequeue’ing, and at this point it’s worth mentioning how the dequeue operation works (again, just at a high level). If we pop a node from a priority queue, we now have children and siblings of that node that need to be rejoined together to restructure the queue. That rejoining process is logarithmic, but the simple act of popping is constant. So, another way to think of the runtime complexity of the various operations is as follows:
The crux of the idea now is that we can return a result to our caller of what item should be dequeued, and then we can asynchronously execute the expensive merge operation:
Another way of reasoning about this idea is that you could feasibly just not even merge the queue back together if you have back to back repeat dequeue operations, or even a back to back to back threepeat or even fourpeat dequeue operation because you can just continue to consume from the now free nodes that are not part of a unified data structure. At least inititially, this will end up being faster than rejoining the queue structure and making additional malloc calls and what not. Once you start getting into sixpeat or sevenpeat back to back to back to back to back to back to back to back operations, things will get slower.
With these ideas in mind though, this is where multiprocessing comes in to play. If I have a single priority queue that I just popped the root from, and now I need to merge child nodes, it’s a cheaper operation to just throw these nodes into a lock-free thread safe linked list (assume for the sake of this explanation that we have a thread safe lock-free linked list). Now you’ll also notice that Meld() and Enqueue() are both constant time operations, and the logarithmic dequeue aspect of the priority queues just involve those constant time operations log N number of times. In the background then, I can bring all cores of the processor into play and merge priority queues together that exist in the thread safe linked list.
Whenever I need to merge queues in the linked list together, I can pop those items from the list (in lieu of locking anything), merge the items together, and then re-insert the merged item back into the linked list as a single node. During the time we’re merging priority queues together, this is when, if the absolute highest priority item exists in one of the queues actively being merged together, that item will not be available to be dequeued, and the datastructure as a whole will return impure results.
To still keep results as pure as possible, we try to merge lower priority queues together before higher priority queues. To increase parallelization from the get-go, we have a target minimum length of the linked list to the number of cores on the processor (so on my dev machine, the target linked list size is 32, no big deal or anything). Dequeues will consume from the head of the linked list (pop the head of the list, then pop the root of the priority queue, then throw that queue’s children back into the linked list in sorted order asynchronously). Background merges, the most expensive operation and the most likely to create unavailability, pop from the Nth node from the head, where N is the number of cores on the processor (riding on 32’s, NBD). In this way, the unavailability of particular nodes to the core datastructure are low priority items relative to the rest, and for the expensive case, that unavailability has no negative impact in the context of a priority queue.
I should first point out that for my tests, this was the resulting data values for every Nth dequeue. Clearly, it’s not perfectly sorted, but this would be a valid query pattern for the use case of a priority queue we described. Notice that while we may dequeue low priority elements too early, we never dequeue high priority elements too late!
Dequeue 200,000 elements on N number of cores
We’ll start with a marginally impressive graph. You might notice that 1 core is not pictured. Running the program on only one core made things tremendously slow and effectively useless, so I didn’t add those results in because it dwarfed the other Y values.
The take-away, then, is that there is indeed room for improvement. Clearly, we’re spending a significant amount of time spinning in a coroutine that’s waiting for progress to occur. Since 1 core would time slice programming execution and no actual parallel execution would happen, it’s clear that the current implementation absolutely relies on parallel execution. It’s also clear that I can find those blocks that rely on external progress to happen and make smarter decisions about how work is distributed.
We can generally see that execution speed correlates to the number of cores, but it also appears that there’s only a marginal benefit to additional processors.
Enqueue 200,000 elements on N number of cores
Next, we can look at a fairly boring graph. Additional cores do not help enqueue’ing it would appear, but this is already a significantly cheap operation. It might well be the case here that the bottleneck here is actually the single threaded calling code used in this case.
Dequeue N elements on 8 cores
This to me is the most interesting test result. Creating a priority queue and subsequently dequeueing from is effectively a sort operation that will directly correlate in time complexity to any efficient sorting algorithm (where O(N log N) is the fastest possible runtime complexity). Hence, a reasonable expectation would be to see that this graph looks like an O(N log N) line.
Instead, it looks like we have close to linear efficiency. This might explain why increasing the number of cores only had a marginal benefit in dequeue’ing speed because the background operations quickly became less expensive than the primary synchronous path needed to return an element to the caller.
This is also indicative that the synchronous code path involved in popping an element has the most room for improvement in trying to eliminate bottlenecks.
Enqueue N elements on 8 cores
As expected, run time correlated directly with the number of enqueues since this is already a constant time operation.
Interesting Code Quirks
One of the interesting validations after testing my code was following up on the size of the linked list. If we have just one node, we won’t get very good parallelization, and if we have too many nodes, it will become increasingly inefficient to insert elements into the list. So somewhere in between lies a happy medium. I set this to the number of processors based on the idea that any real progress would be the result of a single processor, so there would not be any contention beyond the number of processors. Therefore, the minimum sized list with no contention was equal to the number of processors.
Whether or not what I just hypothesized is correct, it turns out that the optimal sized list is, in fact, equal to the number of processors.
The other interesting learning experience from writing this was that you can end up with starvation for a single goroutine in the middle of a block of logic. So if you ever have code in the first half of a function that create a state change that would prevent other concurrent executions from making progress, it’s not guaranteed that the second half of the function will be called to correct the immediately preceding state change.
In my case, this can happen with how I first implemented the (not so) thread-safe non-blocking deletes on a linked list. A node is deleted by first soft deleting it with a compare and swap on a marker, and then a subsequent compare and swap updates the previous node’s next pointer to hard delete the current node. If the 2nd compare and swap fails, we rollback the first change. However, I would eventually stop making any progress when I ran tests on large datasets, and it wasn’t clear at that point that the deletes were even necessarily the problem.
This problem ended up being so obscure that not even Roger “Grim Reaper” Hu knew how to solve this problem when I offhandedly asked him as I saw him board the BART train earlier this week.
The problem ended up being what I described; even though my code was perfectly logical, at a certain point a particular piece of logic was never reached as non-progressing goroutines dominated the system scheduler. The fix ended up being to change the logic such that it was apathetic to the results of a success or fail from the 2nd compare and swap for a hard delete, and always treated a delete as successful if the initial compare and swap for the marker succeeded. In addition, now during any traversal of the list, any goroutine that observed a soft delete would go ahead and try comparing and swapping for a hard delete, again ignoring the result of that particular operation.
- Optimize remaining bottlenecks
- Begin persisting data to the hard drive in conjunction with a commit log
- Add periodic checkpoints to persist up to a line in the commit log, add appropriate locks to make this possible
- Take advantage of multi-version concurrency control for persisting to the database with minimal locking when traversing through the reads of a datastructure
- Serialize data into pages of 4096 bytes so that they can be flushed to the hard drive efficiently
- Manage page reclamation
- Implement a recovery mechanism
- Turn it into a web server
- Implement RAFT