Optimizing Batch Database Writes Through Entity Diffing

True Story Follows

So my manager, High Voltage Arun, gives me the go ahead to work on this insane feature on the project I’m working on. Effectively, I need to take multiple steps that are spread out across a pipeline of asynchronous tasks and combine the related steps into a single, synchronous code path. The problem is much easier to reason about under the current model with separate asynchronous jobs, but could also be described as an initial, naive approach to create the currently generated output. By combining the multiple steps needed for a given business entity, we can process that business entity much faster (and as a positive side effect, use less compute resources), and in doing so I can then easier isolate a critical path for a mutux which will then allow our team to automatically handle a particular race condition instead of forcing our end users to understand and account for this particular case themselves. As one added benefit, the particular case for a race condition is also significantly diminished if the business entity can be processed in a dramatically shorter time window.

The Problem

In a nutshell, the problem now, given the aforementioned scenario, is that in my multi-step pipeline where inputs can come from every which way and outputs can go every which way, the last processing step for each sub-task writes the changes (efficiently…don’t you judge me) to disk using pre-defined update queries I’d written optimized for each particular use case.

Under the new design, the multiple writes for each step for a single entity is now sub-optimal, but more importantly, for my use case all writes are heavily optimized with write ahead buffers that are batched, buffered, and flushed for each operation (don’t worry, this is safe in my application). Given that complicated setup in addition to cross datacenter concerns (i.e. I’m only writing to one datacenter), I don’t have immediate consistency, and so continuing to use the current step in itself introduces a race condition.

The Solution

Up front, my end solution to the problem was to generically batch all database writes by finding the common differentials between multiple corresponding business entities. Arriving at this conclusion requires a decent amount of context and indirection, so to describe the entire process linearly would leave you bored and unlikely to leave from my blog enlightened. The entire context is laid out below.

The Code Evolution

The example code here is at least a reflection of some recent production code I’ve worked on, which in my opinion I’ll go ahead and consider “good code”, and I’ll hope you agree with my assessment.

Initial Premise: Use Business entities

If you get started with an ORM like Django or Flask, you’ve probably seen some example code on the streets where objects are fetched from the database and passed around to functions to be processed to do some thing.

This can be problematic for a few reasons. The entirety of your application now effectively knows about the database, and it becomes increasingly likely that your business logic and view code is tightly coupled to the database. Work in a code base like this for a year, and then randomly decide you want to swap out the database with another persistence layer…it’s a painstaking process, if not impossible. If the slippery slope continues, then not only will database objects will be referenced at random places, but random queries will exist in different places, and the project maintainers can quickly lose sight of what kinds of queries are being created, and of those, a number of them are quite possibly not optimized.

If we instead draw a distinct border between the database and the application, where the application intentionally knows nothing about the database, it becomes trivial to isolate possible queries to only those that are optimized, swap out persistence layers as desired, add caching as necessary, and add reassurance throughout the code that the actual objects being worked with are read-only items.

In Python, I typically use Schematics, which brings type safety to a type unsafe world and allows you to defer decision making about persistence layers for as long as possible. As an added bonus, it makes serialization trivial, and makes fast tests much easier to write by providing a simple mechanism to create mock objects (and avoid having to use the database in tests as much as possible).

Example usage is below:

from schematics.models import Model
from schematics.types import (
    IntType,
    FloatType,
    StringType,
)
from sqlalchemy import (
    Float,
    Column,
    Integer,
    String,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import PrimaryKeyConstraint

Base = declarative_base()


class _ArbitraryDatabaseModel(Base):
    """
    Do not let your entire application 'know' about this
    class
    """

    __tablename__ = 'arbitrary_table'

    id = Column(Integer)

    arbitrary_string = Column(String)
    arbitrary_int = Column(Integer)
    arbitrary_timestamp = Column(Float)

    __table_args__ = (
        PrimaryKeyConstraint('id'),
    )


class ArbitraryEntity(Model):
    """
    A representation of a business entity that in practice
    corresponds to a database row (but doesn't have to)
    """

    id = IntType(required=True)

    arbitrary_string = StringType(required=True)
    arbitrary_int = IntType(required=True)
    arbitrary_timestamp = FloatType(required=True)

Naive Batch Updates

The absolute naive approach to updating a batch of records is to iterate over your business entities (that we now assume we have), and update all values for a particular primary key. If we’re marginally better than naive, we wait to commit until after the iteration is complete.

Example code below runs in about 13 seconds for updating 10,000 records:

from schematics.models import Model
from sqlalchemy.ext.declarative import declarative_base

from .sql_alchemy_utils import Session

db_session = Session()

Base = declarative_base()


class _ArbitraryDatabaseModel(Base):
    # defined previously in this example
    pass


class ArbitraryEntity(Model):
    # defined previously in this example
    pass


class ArbitraryRepository(object):

    """
    Define all of the allowable queries that can be made
    to a particular database / persistence layer.  In this way,
    the use cases are isolated, queries can be optimized as such,
    and reasonable database queries are baked into the code

    For this example, we'll skip a lot of queries we might care
    about and just look at updates
    """

    @classmethod
    def update_from_entities_v1(cls, arbitrary_entities):
        """ Finished in 13 seconds for 10,000 records. """
        for entity in arbitrary_entities:
            db_session.query(_ArbitraryDatabaseModel).filter_by(
                id=entity.id,
            ).update(
                entity.to_primitive(),
                synchronize_session="fetch",
            )
        db_session.commit()


if __name__ == "__main__":
    entities_to_update = []

    for id in range(10000):
        entities_to_update.append(
            ArbitraryEntity.get_mock_object(
                overrides={
                    "id": id,
                },
            ),
        )
    ArbitraryRepository.update_from_entities_v1(entities_to_update)

A Little Better: Batch update columns

In the same way that we should be creating individual methods that correspond to individual queries where each query should be optimized with proper indexes created and so forth, the case can be made to bake in a particular update use case into your code. In my particular application, for many cases I’m just updating an enum value across multiple rows, so it make sense to make some method like:

update_enum_record_for_ids(ids, new_enum_value)

For the same database setup, it now only takes half a second (compared to 13 seconds) to batch update 10,000 rows and update the value of a particular column.

from schematics.models import Model
from sqlalchemy.ext.declarative import declarative_base
import time

from .sql_alchemy_utils import Session

db_session = Session()

Base = declarative_base()


class _ArbitraryDatabaseModel(Base):
    # defined previously in this example
    pass


class ArbitraryEntity(Model):
    # defined previously in this example
    pass


class ArbitraryRepository(object):

    @classmethod
    def update_timestamp_for_entities(cls, arbitrary_entities, new_timestamp):
        """ Finishes in 0.41 seconds for 10,000 records. """
        db_session.query(_ArbitraryDatabaseModel).filter(
            _ArbitraryDatabaseModel.id.in_(
                [entity.id for entity in arbitrary_entities],
            ),
        ).update(
            {
                "arbitrary_timestamp": new_timestamp,
            },
            synchronize_session="fetch",
        )
        db_session.commit()


if __name__ == "__main__":
    entities_to_update = []

    for id in range(10000):
        entities_to_update.append(
            ArbitraryEntity.get_mock_object(
                overrides={
                    "id": id,
                },
            ),
        )
    ArbitraryRepository.update_timestamp_for_entities(entities_to_update, time.time())

Moar: Optimize for your use case

Pigeonhole Values to enable batch writes: In our applicaiton, in order to take advantage of the aforementioned approach, I needed to ensure that common update values existed. Enums are easy, but values like timestamps are inherently distinct. Optimizing for our exact use cases helped here because timestamps, for example, could be represented as very coarse values without adverse effects. So, before every update, we rounded all of the timestamps to the nearest 15 seconds. Now we could take advantage of batch writes since many rows now had the same update value.

Buffer writes before flushing: The batching, of course, relies on the idea that you have a batch of things to work with. So for our use case again, we ensured that operations were reproducible and safe to retry, which then enabled us to safely use Redis to write to before writing to the database. Redis of course has no durability, and you should assume everything it contains can be lost at any moment. For our case we ended up writing to redis buffers and then flushing them every few seconds or when the batch size reached a particular value, whichever came first.

Enlightenment: Generically Batch Updates Through Entity Diffing

If we have an application doing a lot of things, the steps thus far would end up with code where multiple methods would exist to write update values. In my predicament, I wanted to combine all of these methods into a single generic one which would then allow me to make some higher level code more generic, which would then enable me to clean the code up a decent amount to write the actual feature I’m interested in.

If I can take a business entity and ensure it’s never mutated, I can instead track changes to that entity with new objects that have new values reflected. If I compare those two entities, it’s easy enough to find the differential and write the appropriate database query to update the corresponding record.

Now, if I have thousands of entities and all of their corresponding differentials, I can map individual attribute updates to the set of primary keys they’re supposed to be applied to. Now with set arithmetic, I can find sets of keys that intersect across attribute updates and merge those attributes together. Now I have batches of update attributes and a corresponding set of keys to apply to. So now I can convert all of my differentials to the smallest possible number of batches I can make with individual updates.

from schematics.models import Model
from sqlalchemy.ext.declarative import declarative_base

from .sql_alchemy_utils import Session

db_session = Session()

Base = declarative_base()


class _ArbitraryDatabaseModel(Base):
    # defined previously in this example
    pass


class ArbitraryEntity(Model):
    # defined previously in this example
    pass


def generate_batched_differentials(from_entity_states, to_states):
    # this function is left empty as an exercise to the reader
    pass


def get_all_current_entities():
    # this function would presumably return a bunch of business entities
    pass


class ArbitraryRepository(object):

    @classmethod
    def update_from_differentials(cls, from_entity_states, to_entity_states):
        update_generator = generate_batched_differentials(
            from_entity_states,
            to_entity_states,
        )

        for primary_key_list, update_attributes in update_generator:
            db_session.query(_ArbitraryDatabaseModel).filter(
                _ArbitraryDatabaseModel.id.in_(
                    primary_key_list,
                ),
            ).update(
                update_attributes,
                synchronize_session="fetch",
            )
        db_session.commit()


if __name__ == "__main__":

    initial_entities = get_all_current_entities()

    final_entities = []

    for entity in initial_entities:
        final_entities.append(
            entity.copy_with_override(
                arbitrary_int=5,
            )
        )

    # will create a single UPDATE query with SET arbitrary_int=5
    ArbitraryRepository.update_from_differentials(initial_entities, final_entities)

Conclusion

Now, I can combine all of my business logic together in arbitrarily complex ways since it can be isolated to a single module. The business logic can take an input business entity and spit out an output business entity. Do this for any number of business entities, and you can safely, generically, and efficiently write those updates to the database. My own code is now cleaner as well since a larger separation exists between business logic and persistence layer changes.

import time

from schematics.models import Model

from models.arbitrary_model import ArbitraryRepository


class ArbitraryEntity(Model):

    # entity already defined in previous example, just make note of method
    # below

    def copy_with_override(self, **override_attributes):
        """ Provides a new instance of an ArbitraryEntity with the input attributes
        overriden.
        """
        object_attributes = self.to_primitive()
        object_attributes.update(override_attributes)
        return type(self)(object_attributes)


def do_some_crazy_business_logic(arbitrary_entity):
    ArbitraryRepository.update_from_differential(
        [arbitrary_entity],
        [
            _update_entity_with_business_logic_applied(
                arbitrary_entity,
            )
        ]
    )


def _update_entity_with_business_logic_applied(initial_entity_state):
    # just an example...we could get arbitrarily complex with whatever
    # differentials are applied to a new, immutable entity
    return initial_entity_state.copy_with_override(
        arbitrary_string=_some_crazy_business_logic(),
        arbitrary_timestamp=time.time(),
    )


def _some_crazy_business_logic():
    return "Pretend I return some dynamically calculated, important string"