True Story Follows
In my time as a software engineer, I’ve always enjoyed writing performant code. In the context of a web application, the general implication is faster page load times and thus a better user experience. In the context of a background task, it generally means fewer required resources. But not until recently have I encountered the case in which my code MUST be incredibly performant in order to handle the input load. This becomes especially prevalent working at Uber when your service cannot simply be scaled out by adding workers because of one or more bottlenecks.
In my case, the service being worked on equates to a workflow problem where all users at Uber are taken in as input. Every user has a particular state, and I need to be able to ingest hundreds of events per second as new input in addition to processing the workflow problem for each user currently in the system.
When it became quite clear that my code as it stood would not stand up to the load thrown at it, the first thought I had was to check out the latest and greatest techmologies in lieu of using MySQL. The spoiler of the story is that I ended up using none of these. But either way, here were the considerations:
- Amazon DynamoDB: Using DynamoDB would mean that I could effectively query the data by timestamp with a range scan. One drawback was that Dynamo was “eventually consistent”. This meant that I had no way of knowing if data was stale, and therefore if I used this I would need to use it in conjunction with something like Redis. Also, doing append-only operations would simplify the usage which would have the added benefit of keeping historical data. However, the biggest drawback here was that at Uber, we’re not using AWS services so Dynamo was effectively out.
- Riak: Riak is based on the Dynamo paper and was therefore in theory just as feasible as Dynamo. Seemed like a strong candidate.
- HBase: HBase also seemed to fit my use case well, and I could query rows by using range scans, but after a discussion with Chase “The Cyborg” Seibert, it sounded like HBase was really optimal for batch operations and not good if you had to open lots of concurrent connections. So HBase was out.
- RethinkDB: RethinkDB is so hot right now. And if you go to their meetups, they give out the coolest shirts with the softest material. RethinkDB is a strongly consistent query-able JSON store with all sorts of fancy-pants map-reduce. However, RethinkDB is designed and optimized for the use case of pushing updates to observers about changes in state for the database, which I had no need for and assumed then that my concerns were not necessarily optimized. Whether or not it fit my use case well I actually don’t know, but since there’s not immediate support at Uber with someone else maintaining RethinkDB, I moved on in considerations.
- Cassandra: Cassandra is the alleged new hotness with Facebook. But they don’t support range scans and it’s purely a key-value store. So Cassandra was out.
- Hadoop: Hadoop seemed like the weapon of choice as I pondered the problem for days. However, if I used Hadoopaloop at Uber, I would be limited to basically running jobs in the magnitude of about every hour, and I would lose the hope of doing operations in real-time. Adding support for real time events seemed like a really cool addition for what I was working on, so Hadoop was out as well.
- Redis: I use Redis heavily all the time in all sorts of random cases, but the question here was if I could use it as my primary datastore. The obvious concern was that if a Redis server went down I could lose all my data, which for my case might be acceptable. However, for my use case it was entirely possible to have millions of values stored in a single set, and the word on the street is that Redis can start to fall over in those cases. So in addition to the obvious cost of a lack of durability, Redis was out.
Eventually, rather than replace MySQL with something else, I opted to optimize MySQL for my use case specifically so that I could process all of the input data. What I ended up doing it outlined below.
Constrain the Problem to Your Use Case
Any engineering decision you make incurs trade-offs. The trick is to find the trade-offs that exist where the negative outcomes are irrelevant for your particular problem, and you’re left with nothing but the positives. In my case, here were some of the situations I took advantage of.
- The system should be real time. Sort of. A lag in the realm of 1 minute was acceptable.
- Much of the data, i.e. timestamps, do not need to be precise. We can pigeonhole a lot of that data in order to enable batch updates
- Data in some of my database tables is more than 99% reads.
- In a worst case scenario where some piece of the system went awry, it was acceptable to drop users from the system. Although not something to strive for, it was acceptable to happen.
Write Clean Code
From my perspective this goes without saying, and everyone reading this will likely make the best attempt at writing clean code from their perspective in all cases, but it’s worth nothing that clean code enables making the changes necessary to maximize throughput. A quote that I like that I find to be completely true is that “All problems in computer science can be solved by another level of indirection.” For the work that I did, I ended up adding levels of indirection in all sorts of whacky places, but I was able to make those invasive changes because of a thorough test suite. This was especially useful because adding things like buffering layers didn’t end up changing the eventual result of a particular task, so many of the tests didn’t actually change as more code was added.
In addition, one of the basic strategies for increasing throughput was to add caching layers in the right place. Caching is easy. Busting the cache is hard. And if you have a program that’s going to be malleable and constantly changing, you can not reliably have a cache that’s properly busted unless your code is clean with single points of entry and exit.
Set up Proper Indexes on Your Tables
Even though the database is capable of doing heavy lifting, it is a shared resource and the eventual bottleneck of a program if workers are scaled up. This can be a very deep topic, but the idea here can be covered with broad strokes with some basic ideas:
- Ensure every database query is covered by an index. If the queries have any degree of complexity you’ll likely need a composite index.
- Remember that the primary key determines the physical layout of your database rows on disk. Choose a good primary key and try to keep that key size small in bytes
Batch Write to the Database
As I mentioned earlier, one constraint of the problem I took advantage of earlier was that a lot of data could be pigeonholed. Therefore, rather than update thousands of rows individually, I could batch update those same rows with the same values in the case that those rows had at least similar pieces of data. In a similar way, insertions of new rows could be batched as well by doing a single operation that wrote thousands of new rows rather than one at a time. Because of the overhead of establishing a connection to the database, the batch operations could be completed about 10x faster than the non-batch operation.
A lot of work went into making it so that data could be batched, but from a tactical perspective, this was the general course of action:
- Use Redis in conjunction with MySQL.
- For every batch operation you intend, set up a redis buffer where the key space involves the new, possibly pigeonholed values (i.e. a key for workflow ID, pigeonholed timestamp in 1 minute buckets)
- Add the rows that you intend to update to that buffer (i.e. add user ID’s to that Redis buffer)
- Now when the buffer is drained, you can SELECT all users in the buffer and UPDATE them with the new values that were part of your Redis keyspace
- For each buffer, establish a maximum size where the buffer is drained and written to the database if it reaches a certain size (i.e. 1000 records)
- For each buffer, you can also gaurantee that it’s drained within a certain amount of time (i.e. 30 seconds). I did this by setting up a distributed lock around some resource ID, enqueueing a celery task with a countdown of 30 seconds, and then not releasing the lock until 30 seconds later.
- Now that buffer will be written to disk every 30 seconds or when it gets to a certain size, whichever comes first
If you’re an astute observer, then you recognize that the above approach creates immediate problems because operations are no longer consistent. In order to avoid double processing a user that’s currently awaiting write to disk, you can just use a high watermarker of sorts. This is basically a strategy that means that I annotate the time at which I’m processing jobs, and in my context I’m querying users by time, so I only query users that fall in the range of my time differential for a given task. This takes advantage of one of the constraints of the problem where if something goes wrong, it’s reasonable to simply not process some users. Also, if Redis goes down and the buffer is wiped away, I can also avoid double processing users because the high watermarker falls back to NOW as the last annotated crawl time.
Batch Your Celery Tasks
I discovered this on accident even though it should be intuitively obvious. I initially naively approached the problem by kicking off a single task per user. This had some lazy advantages. For example, if there’s a runtime exception for a task, it only affects the 1 user in the task and not a whole group of users, and it was therefore easy to add retry logic. In theory, this also fans out your workload to all of your workers evenly and effectively. However, if you have, say, 32 workers and 100,000 tasks, it doesn’t really matter that you’ve fanned out the workload by so much.
Every task incurs overhead. I didn’t get the exact metrics, but I found a dramatic increase in performance ever since I batched my tasks in groups of 100 users instead of just 1. Since doing that, not a single task has been queued in my system (it’s been consumed by a worker immediately).
Make Your Network Calls Asynchronous
This was surprisingly easy to do. If you want to make your network calls asynchronous, the kneejerk reaction is to look for the latest hot new techmology. Both gevent and eventlet are supported by celery out of the box, and these both work by monkey patching any operation that blocks and yielding to another green thread. What could possibly go wrong? In my case, I’ll tell you what went wrong. Everything stopped working.
My understanding is quite possibly incorrect, but I believe Python 3 has built in support for working with futures. In Python 2.7, which I’m using, you can use the concurrent.futures module. A future is an object that is basically a promise of a result. Once a future is instantiated, you can think of a separate thread running in the background that’s doing work. Your program will not actually block until you ask the future for its result.
For example, this is a totally useless operation:
However, this is an incredibly powerful operation:
This particular case is easily manifested in network calls. While we await a response from a very expensive network call, we can easily go and do something else. In the simplest optimization we can just go and make other network calls while we wait on previous network calls. In the context of my program, I’m making network calls to adjacent services at Uber. Rather than use something like gevent or eventlet, I just wrote my own basic context manager that had “tasks” in the form of functions, and a single entry point to get the result of a future would instead yield to another enqueued task in the context manager. This effectively meant that I processed a batch of tasks by recursively moving on to the next task while a future processed.
This was why I ended up batching my celery tasks together in groups of users and made the accidental discovery of the large overhead cost of kicking off tasks. So I never really even got a chance yet to see how much of a difference it made to make network calls asynchronous. But I digress.
Graveyard Database Table
Another basic strategy I’ve seen on the streets is to move stale data off of your database. This might mean that archive records for a particular year can be moved to a different table or database, and you just do this every year.
In my case, given that we have a workflow problem, if you will (and I will), users will reach a terminal state for a particular workflow ID, and after that, the row will never again be updated. This also means that the query pattern for that row has changed since the row has outlasted its usefulness for the other more dynamic queries.
So rather than have one massive database table with user state, I created a second massive table for user state that was optimized for just this one use case of stale data. When users reach a terminal state, the row will be picked up by a background task that movers the user into “the graveyard table,” as it were, and that table is queried for a single use case where we check if a user exists on the table, which is light and fast. Moreover, the graveyard table is much more compact because all of the previous concepts of state no longer matter and we don’t need the same columns on the table (hence a smaller table size).
Meanwhile the more active table that’s frequently written to and read from has fewer rows to deal with and scan over as it’s queried.
Batch the Input
An earlier constraint I could take advantage of here is that I could sacrifice total real time for sort of real time. Similar to the buffering strategy for writing to the database and batching celery tasks, I could do the same thing for ingesting real time data. You can imagine some of the use cases that might exist around Uber and the things that might happen as you request a ride, get into the car, and go to a location. In that context, a lot of things don’t really matter whether an event happens instantaneously or several seconds later.
And for my use case, when hundreds of real world events can be processed every second, batching those events together resulted in a huge and worthwhile performance gain.
If you read my riveting story, you’ll notice that there’s nothing super earth shattering and nothing that was solved by a single piece of technology or some insane hardware configuration. This was really just a series of arranging simple tools widely available to most software developers.