True Story Follows
I started writing this entry in November of 2016, and with the results of the most recent state ballot measures, marijuana will soon be completely legal in California. With that in mind, one thing is for sure: event sourcing will become more popular as a means to model the world in an application. I was introduced to this approach of persisting data while perusing YouTube videos, but only recently did I make an attempt to bring it to the streets of production. I wanted to take a moment to document the approach and considerations. At this point in time, the system has not been fully testing in production over a long duration, so that will have to wait for another day.
Scalability is a typical concern expressed with application development, and a typical architecture involves object relational mapping where entities used to model the world are mapped 1:1 in a database table. Therefore, to process these entities, a network request must be made to the database to ascertain the current state of the world. Increased workers will increase load to the database. Responsible engineers will add caching layers that add acceptable complexity. But, with both the database and a caching server (i.e. Redis), the required network calls inherently create the possibility of failure, which must be accounted for in any calling code. Queries to each of these sets of servers will be optimized for remote network calls, which will differ from code that’s contained entirely within a single process. Function calls to an API inside of a process generally don’t require awkward query patterns optimized for network access (i.e. I can call a function thousands of times with no concern for adverse performance consequences), and those calls will be entirely deterministic with no possibility of failure. The point, then, is that as a distributd application grows, the amount of object distribution in a typical setup will increase which proportionally increases the opportunities for failure.
Unbenknownst to me, object relational mapping is simply one way to model the world in a persistence layer, and it’s possible to represent state in fundamentally different manners. In particular, event sourcing is the idea that state can be obtained by applying an ordered series of all state changes that have ever occurred. No destructive updates happen, and instead all writes to a database are append-only. Replay those events in order to arrive at the current state of the world.
I won’t focus too much on what event sourcing is or why it’s beneficial because there are an abundant papers and videos on the subject (mostly by Greg Young and Martin Fowler). Example implementations, however, were not as abundant.
High Level Overview
A core concept to event sourcing is that we’re effectively taking the idea of a commit log which might typically exist in a database anyway and we’re bubbling that up to the application level. The fundamental idea in my design is that we want to control as much as possible on the application side to include object schemas, timestamp generation, and concurrency control. In this way, I’m explicitly not tied to any particular database, and it would be trivial to change persistence layer should I desire.
The most basic premise is that we’re writing to an append only database table. In my case, I achieved this by using a timestamp rather than an auto increment ID. In my opinion, this is simpler just to not have to worry about cross data center concerns and eliminate any dependency on a particular database.
Typical examples of event sourcing, at least to my knowledge, assume eventual consistency so that database changes can be published. In my use case, however, consistency was a must and therefore a conventional read query is made whenever an up to date state of the world is needed.
Here’s where things start to get a little bit bananas. The core advantage of an event sourced setup is that I don’t need to query a large volume of data to ascertain the state of the world. I only need to fetch the differential from when I last read from the database. In the general case, this is simple and straightforward. However, an edge case exists in which timestamps generated at write time are persisted to the database out of order. If I continuously only fetch a differential based on my last read timestamp, it’s possible that my application could miss a single event in the system, which over time could create some extremely obscure bugs.
I chose to deal with the aforementioned problem in a similar manner to how VOIP phones work with what’s called a jitter buffer. The idea is that there’s an arbitrary time range that’s used to buffer incoming events so that we can re-order them if packets arrive out of order. For a VOIP phone, this is something like 300 milliseconds, and the delay is not discernable to a human. What this means in the design here for event sourcing is that we allot some buffer range (in my case it’s an arbitrary and conservatively high value of 5 seconds) where we always set the last read timestamp as the current time minus the buffer range. Then, subsequent queries will query from that timestamp and forward. This will result in duplicate rows being fetched, but it will also guarantee that we have a complete list of all events that have happened assuming it takes less than 5 seconds for a single write to happen after a timestamp is generated on the application side. This can become confusing for sure on the application side and is probably the biggest drawback of the above approach because the mental barrier to understand the system for onboarding teammates will be higher. The implication of replaying events over and over is that we need to be able to reset the state of the world to a previous state of the world in a point in time. Therefore, code in your state machine will end up looking something like this:
I’m still not done making things weird. The above scenario, where I need to be able to mutate a datastructure and then return it to its original state brings me to a situation where I can finally have a practical reason to bring purely functional data structures to a production environment. At least, the concepts behind them, namely copy-on-write operations are used for every update to an object. This effectively means that old references to an object are still unchanged and still valid, and new changes are purely non-destructive.
The Database Table
Again, Greg Young outlines the basic strategy for how to create a database table for event sourcing, but I made some changes to his approach based on some assumptions I was able to make that he was not. Namely, data needs to be denormalized to optimize reads, but while Greg accounts for this in the database table design, the use case I’m putting together assumes that data is denormalized directly in memory on a worker machine.
Therefore, the table is made up of a timestamp in microseconds (BIGINT), an aggregate UUID (16 bytes), and an aggregate version (an integer) to create a composite primary key. This is intentionally the only index in this database table. Now we have additional columns for the event type ID (an integer) that will allow the application to discern what kind of event happened and how to interpret the additional data. The next column is a blob of bytes that’s just a serialized version of the event that happened. The data can be de-serialized based on the event type ID. Finally, there’s an additional column for event metadata (another binary blob of some serialized object). Here we can include things that might end up help troubleshoot a problem down the road. In my case, I’m including the user ID of the person that took the action, the git hash of the version of the code running when the write happened, the process ID of the application that made the write, and the host machine.
While the metadata piece may sound a little aggressive, in my first instance of having to debug an issue that came up, it was the most luxurious debugging experience I’ve ever had. For example, the version of the code was known, so I knew exactly where to start looking if it was a recent change that introduced the problem, and I was able to quickly infer that two separate but related rows were being written from separate hosts entirely, which in the context of the problem I was debugging was enough to isolate exactly what the problem was.
Writes are fast because the table writes can only be maximally optimized in one way in an append only fashion. Any additional indices on a database table will also slow down writes, which in this case we don’t have. While writes are completely separate from reads in event sourcing, in my code I accounted for this inconvenience by adding a “read your own writes” feature. Concurrency control is already implemented during writes. I won’t elaborate on this because Greg Young already has an in-depth and well written explanation that I won’t do justice, but it is worth nothing that while Greg advocates using stored procedures for optimistic concurrency control, I chose to opt for a round trip to the database so that I wasn’t dependent on MySQL. But I digress. The point is, if an application instance writes a change, and it’s been determined that no conflicting writes happened, then it’s safe to apply my own writes to the local state of the world even though we have a possible stale picture of the remainder of the world.
The beauty of event sourcing is that writes are maximally optimized with an append only pattern, and reads are also maximally optimized because we read with the same high locality in a linear fashion, and we only need to read the differential of the data. The events are applied to an in-memory representation of the world on each worker machine, and now actual queries to the in-memory representation are also maximally optimized because the data should be denormalized into an appropriate data structure for the intended query patterns (additionally, the objects are entirely in memory, so it’s also fast).
As noted briefly above, the true luxury comes from the fact that any queries to the in-memory state of the world have no possible failure cases, and results are entirely deterministic. If you want to impress the ladies or the gentleman based on whatever you’re into by showing off a complete absence of runtime exceptions, this is how you do it!
Finally, the reads to the database are against rows that are inherently immutable. We can also take advantage of this by making all of our queries transaction isolation level READ UNCOMMITTED. What this means for MySQL is that the rows returned to you could possibly be dirty and some of the columns in the data might very well be stale. But for our use case, we already know that no destructive updates happen, so querying in this manner is perfectly acceptable. The actual performance enhancement here is that MySQL will read rows without locking anything. So you can forget about those 60 nanoseconds to acquire a lock! But also, you can read without blocking writes, and throughput should be maximized if multiple workers are reading and writing.
Now, our API can end up looking something like this:
Tying It All Together
Again, there’s more details to the implementation that I won’t go into great detail about because I’d just be restating what’s written on the fancy pants white papers about event sourcing. Namely, you still need to implement a background cron job that periodically takes snapshots of the world at a particular timestamp and then saves those serialized versions in Amazon S3. Without that, startup for an individual worker will require reading the entire database table of all events for all time. Based on the design outlined here though, one thing you’ll need to do is to serialize the world from some point in the not so distant past to avoid the possibility of another write coming in as you’re serializing the world that has an identical timestamp as the last read timestamp. Now, once you load a snapshot from a particular timestamp, you only need to query for values whose timestamps are greater than the snapshot timestamp.
In my implementation, the developers should have a reasonably straightforward experience in implementing new features around event sourcing by simply subclassing an abstract class and implementing the abstract methods. These classes are “state machines” and they’re responsible for ingesting events that have already been ordered and deduped (for optimistic concurrency control) by the event reader class. The state machine specifies what event types to listen for, how to serialize and deserialize a snapshot of its section of the state of the world, and how to apply incoming events to “the world”.
“The world” is the in-memory representation of all objects that should be queryable by the rest of your application code. Since each application instance only needs to be concerned with its own state relative to the stream of events, it’s also trivial to add things like caching decorators where the cache is busted in between every state change, which in practice should be infrequent enough to make the caching worth it. Caching in this case simply means an additional query optimization on what should already be an extremely fast in-memory query.
Probably the last note to make is a bug I traced down based on this implementation. Events are read in batches of 10,000 and are fed to each individual state machine. If one state machine depends on another state machine, you’ll run into errors if the events aren’t fed to the dependent state machine first. Therefore, in your initial setup you’ll need to create a dependency tree of state machines and establish the order in which all state machines should be fed events.
Event sourcing is pretty hot right now. As long as your data set is finite, event sourcing in my opinion is a quite reasonable approach. In terms of development time, it is so far my experience that the up front cost in creating the system is fairly high, but the cost of adding new events and new features is much lower.