Event Sourcing and Append Only Databases

True Story Follows

My latest obsession at work has been to avoid the use of assignment operators as much as humanly possible even in an imperative language (Python). Despite the occassional “this code is brain@#$%ed” and “hey bro your CPU usage is too high, you need to settle that down,” I can say that the absence of side effects aside from IO has produced generally bug-free results and easy maintenance.

This obsession started from just watching a few talks from Robert Martin, where he points out that the most valuable programming book he’s read is “Structure and Interpretation of Computer Programs.” If you read it, or just do a command+F (as I did), you find that the very idea of assignment operators and local state isn’t introduced until page 194 of 588. This line of thinking picked up right where I left off 10 years ago in college, where I remember my college professor dedicating an entire lesson demonstrating that it was possible to create a programming language composed entirely of functions. After a little bit of effort deliberately using nothing but functions, immutable class instances, and memoization decorators, I’d mentioned to my old teacher the insightfulness of it all, and he only casually responded, “I’m surprised it took you this long.” I was waiting for him to cap it off with “idiot.” But he didn’t.

Screen Shot 2016-07-28 at 2.05.41 AM

Along these lines, I’ve been interested in the idea of immutable datastructures because you can end up caching absolutely everything across multiple machines, but so far I’ve found that this idea, while interesting, is for the most part impractical on a large scale. However, if the use case fits (generally where hard drive space is abundant relative to the value of your data), certain problems can be dramatically simplified by storing all system events in an append only database, and all corresponding state can be ascertained by linearly reading from said append only database from beginning to end.

This is essentially how all distributed databases work anyway: write to a commit log that can be synchronized across multiple nodes with something like Paxos or its hipster cousin algorithm, Raft, and by replaying those events in order it can be guaranteed that all nodes will arrive at an identical state. This idea just bubbles up a commit log into application code. Whatever state we want, in whatever way we want to represent it, at whatever scale we desire, can be generated by replaying an event log. Since this in itself is deterministic, we can cache the state at whatever point along the commit log that we want.

The interest here is further compounded when you don’t view this sort of an idea as a replacement to conventional databases, but instead a simple mechanism to re-create the state of one or more databases. And by the way, this is more of a jostling of thoughts based on a video on Event Sourcing I watched at 3 AM, not based on a production application I’ve actually written. But, given that no one’s ever been fired for writing an application that mutates a conventional database with destructive updates, you have to be a little crazy to do something like this, so you have to start somewhere.

Some Code

I say “some” rather than “the” since this is clearly just an example. But let’s say you have a series of events that compose the transaction history of a bank account:

transactions = [
    "DEPOSIT 1235.50",
    "WITHDRAW 46.30",
    "WITHDRAW 10.11",
    "WITHDRAW 7.67",
    "WITHDRAW 8.94",
    "DEPOSIT 100.0",
]

In a real world scenario, I guess you’d have more than 6 transactions. This would be where you could store events like this in an append only database.

From here, we want our application to hold some sort of state. In this case, the final state is really just some dollar amount that’s nothing but a sum of the input values. So in this case, this class is clearly overkill, but the point is that state can be encapsulated in something that can be arbitrarily complex.

class CurrentState(object):

    def __init__(self, new_balance=None):
        if new_balance is None:
            self.balance = 0
        else:
            self.balance = new_balance

    def deposit(self, value):
        return CurrentState(new_balance=self.balance + value)

    def withdraw(self, value):
        return CurrentState(new_balance=self.balance - value)

In the above class, I made a point to keep things immutable. In keeping with the idea of immutability, any operation against the class instance that would alter state instead returns a new instance of the class, leaving the original unchanged. There’s no immediately clear benefit in this example, but the important piece is that the methods on the class have return types of that class.

Now, if we want to go ahead and tie the above 2 concepts together and replay events along the transaction history, we just need a functions that correspond to every transaction type along the original event history. So, we only need pattern matching to determine a corresponding function:

def deposit(value, current_state):
    return current_state.deposit(value)


def withdraw(value, current_state):
    return current_state.withdraw(value)


def apply_transaction(transaction_string, current_state):
    return {
        "DEPOSIT": deposit,
        "WITHDRAW": withdraw,
    }[transaction_string.split()[0]](
        float(transaction_string.split()[1]),
        current_state,
    )

So now, if we wanted to hardcode the steps to re-create the final state from the event log, we could write this:

final_state = apply_transaction(
    transactions[5],
    apply_transaction(
        transactions[4],
        apply_transaction(
            transactions[3],
            apply_transaction(
                transactions[2],
                apply_transaction(
                    transactions[1],
                    apply_transaction(
                        transactions[0],
                        CurrentState(),
                    ),
                ),
            ),
        ),
    ),
)

And now, whenever we see this sort of pattern, we can replace it with a reduce function:

final_state = reduce(
    lambda accum, transaction: apply_transaction(transaction, accum),
    transactions,
    CurrentState(),
)

Notice that in both cases, we still need to initialize an empty state. I also probably could have cleaned up the code a little more by just defining my initial “apply_transaction” such that its parameters would map well to a standard “reduce()” call by switching the parameters, thereby eliminating the anonymous function to map the parameters correctly.

Obviously though, we don’t want to replace all events all the time. Instead we can just memoize each individual step, and in doing so we can be blatantly careless about how we go about persisting cached data since it’s so easy to recreate. On a brief tangent, I’ve found that taking advantage of local key-value stores has some distinct advantages for caching in particular. Typically, we certainly care about a central database cluster with bottlenecks, shared disk usage, disk capacity, etc. For an application that you need to scale with many workers involved, it’s not uncommon for those worker machines to have reasonably large hard drives that are for the most part unused since they may rely on a remote database server. You can use a local key-value store like LMDB and for the most part not worry about being a noisy neighbor, and therefore while not necessarily the fastest caching mechanism possible, you do have scalability with conventional hardware and an opportunity to reduce the load on central bottlenecks in your system.

Anyway, I digress. If I pick apart the reduce() calls above and instead just write a standard recursive function to trace backward to generate state, I can continue backtracing until I hit a point in time where a cached result exists.

import time
import uuid


class HeadReached(Exception):
    pass


# assign unique ID's with every transaction so that we can memoize results
transactions = ["%s %s" % (t, uuid.uuid4()) for t in transactions]

def _get_previous_transaction_string(transaction_string):
    # pretend this is not as naive and inefficient as the current
    # implementation
    def non_negative_index_lookup(index):
        if index < 0:
            raise HeadReached("Initial state reached")
        return transactions[index]
    return non_negative_index_lookup(transactions.index(transaction_string) - 1)


@lru_cache(10000)
def get_current_state(transaction_string):

    def _previous_state():
        try:
            return get_current_state(
                _get_previous_transaction_string(transaction_string),
            )
        except HeadReached:
            return CurrentState()

    return apply_transaction(
        transaction_string,
        _previous_state(),
    )

And now, for whatever persistence layer that ends up getting used, the write throughput is significantly faster in an append only structure. Just as an example, in my non-sterile and not very scientific test script below, I was able to write about 60,000 key-value pairs per second with LMDB when the keys were monotonically increasing. If I change the keys to something randomly generated (like a UUID), the performance is only about half as fast.

import lmdb
import uuid

DATABASE_NAME = "append_only.db"
MAP_SIZE_GIGABYTES = 100
MAP_SIZE_BYTES = MAP_SIZE_GIGABYTES * 1024 * 1024 * 1024

lmdb_environment = lmdb.open(DATABASE_NAME, map_size=MAP_SIZE_BYTES)


def _next_key(current_key):
    try:
        return "%.9d" % (int(current_key) + 1)
    except TypeError:
        return _next_key("0")


with lmdb_environment.begin(write=True) as transaction:
    key = _next_key(None)
    for _ in xrange(100000):
        key = _next_key(key)
        transaction.put(
            key,
            str(uuid.uuid4()),
            append=True,
        )

The take away here is hardly that monotomically increasing keys are more performant for write throughput, but instead, we’re not stuck with using a conventional relational database to store the ultimate source of truth. I can conduct the same test with SQLite (again, this isn’t super scientific, I don’t want the trolls to come for me), and in the sample below I can only get about 25,000 writes per second. If I use random keys, this number decreases to about 18,000 writes per second.

import sqlite3
import uuid

DATABASE_NAME = "sqlite.db"
TABLE_NAME = "test_db"

connection = sqlite3.connect(DATABASE_NAME)
query = """
    CREATE TABLE IF NOT EXISTS {table_name}
        (key TEXT UNIQUE, value NONE);
""".format(
    table_name=TABLE_NAME,
)
connection.execute(query)
cursor = connection.cursor()
cursor.execute(query)

def put(key, value):
    query = """
        INSERT OR REPLACE INTO {table_name} (key, value) VALUES ('{key}', '{value}')
    """.format(
            table_name=TABLE_NAME,
            key=key,
            value=value,
        )
    cursor.execute(query)


def get(key):
    query = "SELECT value FROM {table_name} WHERE key='{key}' LIMIT 1".format(
        table_name=TABLE_NAME,
        key=key,
    )
    cursor.execute(query)
    return cursor.fetchone()[0]

def _next_key(current_key):
    try:
        return "%.9d" % (int(current_key) + 1)
    except TypeError:
        return _next_key("0")


key = _next_key(None)
for _ in xrange(100000):
    key = _next_key(key)
    put(key, str(uuid.uuid4()))
connection.commit()

The End