adrianhesketh.com

Event sourced DynamoDB design with TypeScript - Part 1

This is part 1 of a 2 part series:

Introduction

Most applications deal with entities, “a thing that exists”, and the processing of events that effect change in that entity, such as a user wanting to update a field, a new bank transaction changing a bank account balance, or the final digit being entered on a combination lock resulting in checking the combination.

Within a warehouse, an order may become “ready for packing” when all order items have been picked from the shelves and dropped in the packing area. It’s tempting to put this logic into your database:

UPDATE order_items SET picked = true WHERE order_id = 1 and item_id = 12;
UPDATE order SET status = 'ready_for_packing' WHERE order_id IN (
    SELECT DISTINCT(order_id) FROM order_items GROUP BY order_id WHERE order_id = 1 HAVING picked = true)
)

This has the advantage of being simple to get started with but can result in the logic that describes the underlying state machine ending up spread across many database statements, rather than more easily testable program code. Martin Fowler calls this style of database interaction a Transaction Script: https://martinfowler.com/eaaCatalog/transactionScript.html) as opposed to a Domain Model, where program code is used the data.

I’ve built several systems using this style of development, and it’s worked well at small scale, but can be a hassle to test due to complex set-up and tear down of database structures, and the time taken to run those tests.

Many projects I work on can’t completely define all of the possible states of entities, the allowed transitions, and the triggers of those transitions at the start of the project. This leads to a lot of requirements churn as we work things out, which in turn causes database structure churn, and the spread of logic across the SQL statements can introduce some fairly unpleasant-to-work-with SQL statements.

One pattern that I’ve found to work well for a number of use cases is to use an Event Sourced (https://martinfowler.com/eaaDev/EventSourcing.html) database design within DynamoDB instead. This design keeps a record of every event related to a particular entity, and, optionally, a “HEAD” record, or “STATE” record that represents the current state of that entity (you might think of it as a materialized view, a cached store of some processing).

This means that it includes an audit trail by default, since all events are stored, and the latest state can be calculated by reading all of the events in sequence, and passing them into program code that does the state calculation. This has the useful effect of making it possible to change the program code that dictates the current state of the entity at any point, and being able to add new states later very easily.

If there aren’t many events per entity, it’s practical to query DynamoDB for all of the events related to an entity, and to then process those events in code to work out the state of that entity. For example, a sales order might only have 20 events associated with it (“Placed”, “Fulfilled”, “Delivered” etc.).

To group related event data together, it’s possible to use the same partition key for each of the events, and differentiate each event within that partition by giving it a unqiue sort key. See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.CoreComponents.html

If those are called id and rng, it might look like this:

id rng typ facet item
order/123 event/created created order { "id": 123 }
order/123 event/itemAdded itemAdded order { "itemId": 456, "desc": "Rubber duck", "cost": 3000 }
order/123 event/purchased paid order { "amt": 3000, "method": "visa", "transactionId": "674" }

This database structure allows a single DynamoDB Query operation to get all of the events under the partition key in a single operation (assuming that there’s not > 1MB of event data, in which case multiple operations would be required).

The other fields are there to make it easier to work with. The typ field stores the programming language type name, to understand the structure of the item field. The facet defines the type of entity being managed under the id (DynamoDB tables usually store multiple types of entity), in this case an order. The item is a JSON blob of data corresponding to a given typ.

These fields make it possible for code to loop over the records, work out what type of data each record is, then update a view of the final item:

{ 
    "id": 123,
    "items": [ 
        { "itemId": 456, "desc": "Rubber duck", "cost": 3000 },
    ]
    "status": "paid",
    "payment": {
	"method": "visa",
	"transactionId": "674",
    }
}

What if you’ve got thousands of events?

Since a bank account may have tens of thousands of transactions, it would be impractical to page through every single transaction from start to finish to work out the current balance, due to the cost and time implications. In SQL databases, it’s possible to dump the work onto the database server itself:

SELECT SUM(amount) as balance FROM transactions WHERE accountNumber = '123';

Depending on your indexing strategy, this SQL query may just read every record (a “scan”) to calculate the final amount. On a SQL database, this data access and computation happens on your database server, not within your compute node (Lambda / container etc.). Databases are often licensed per CPU core, so often you’re using your most expensive CPU core to add up some numbers. This is tough because the cost of moving all that data from the database server to the compute node to do it is slow and CPU intensive too.

Ultimately, this strategy will only get you so far before you need to restructure your database to support greater amounts of concurrency. You might then use a database transaction to keep a balance table in sync, so that a SUM operation isn’t required. After all, you’re probably going to want to know your current balance a lot.

BEGIN TRANSACTION
INSERT INTO transactions (accountNumber, amount) VALUES ('123', 4500);
UPDATE balances SET (balance = balance + 4500) WHERE accountNumber = '123';
COMMIT TRANSACTION

You can get the same effect here by using a single table design with DynamoDB transactions to keep the balance up-to-date:

id rng typ facet item
acct/123 balance balance acct 200
acct/123 transaction/123 tx acct { "amt": 500, "name": "Pay" }
acct/123 transaction/456 tx acct { "amt": -300, "name": "The Pub" }

A DynamoDB transaction updates the balance amount, and adds the transaction to the account in one operation. Here’s some code generated by the AWS NoSQL Workbench that does just that. I’ve cut out some of the extra comments and stuff from it.

const AWS = require('aws-sdk');
const region = 'us-east-1';
const dynamoDbClient = createDynamoDbClient(region);

function createDynamoDbClient(regionName) {
  AWS.config.update({region: regionName});
  return new AWS.DynamoDB();
}

function createTransactWriteItemsInput() {
    return {
      "TransactItems": [
	{
	  "Put": {
	    "TableName": "events",
	    "Item": {
	      "_id": {
		"S": "acct/123"
	      },
	      "_rng": {
		"S": " transaction/789"
	      },
	      "item": {
		"S": "{ \"amt\": -100, \"name\": \"The Kebab Shop\" }"
	      }
	    }
	  }
	},
	{
	  "Update": {
	    "TableName": "events",
	    "Key": {
	      "_id": {
		"S": "acct/123"
	      },
	      "_rng": {
		"S": "balance"
	      }
	    },
	    "UpdateExpression": "SET #amt = #amt + :amt",
	    "ExpressionAttributeValues": {
	      ":amt": {
		"N": "-100"
	      }
	    },
	    "ExpressionAttributeNames": {
	      "#amt": "item"
	    }
	  }
	}
      ]
    }
}

async function executeTransactWriteItems(dynamoDbClient, transactWriteItemsInput) {
    const transactWriteItemsOutput = await dynamoDbClient.transactWriteItems(transactWriteItemsInput).promise();
}

const transactWriteItemsInput = createTransactWriteItemsInput();

executeTransactWriteItems(dynamoDbClient, transactWriteItemsInput).then(() => {
    console.info('TransactWriteItems API call has been executed.')
  }
);

So here, we’ve got two sets of records in the same DynamoDB table collected under the same partition key. One set is the transactions, the other is a single record that represents the current balance of the account (the state of the entity). However, we’ve still got all of the events in the transactions table in the case we need to recalculate the balance, or check that the balance has been calculated correctly.

If we don’t want to let a transaction happen if it would make the account go overdrawn (have a balance of < 0), or if the state can only change if something else has happened first, we can add "ConditionExpression": "#bal > :bal" to the Put operation and the transaction will only complete if there’s enough money in the account:

function createTransactWriteItemsInput() {
    return {
      "TransactItems": [
	{
	  "Put": {
	    "TableName": "events",
	    "Item": {
	      "_id": {
		"S": "acct/123"
	      },
	      "_rng": {
		"S": " transaction/789"
	      },
	      "item": {
		"S": "{ \"amt\": -100, \"name\": \"The Kebab Shop\" }"
	      }
	    }
	  }
	},
	{
	  "Update": {
	    "TableName": "events",
	    "Key": {
	      "_id": {
		"S": "acct/123"
	      },
	      "_rng": {
		"S": "balance"
	      }
	    },
	    "UpdateExpression": "SET #amt = #amt + :amt",
	    "ConditionExpression": "#bal > :bal",
	    "ExpressionAttributeValues": {
	      ":amt": {
		"N": "-100"
	      },
	      ":bal": {
		"N": "100"
	      }
	    },
	    "ExpressionAttributeNames": {
	      "#amt": "item",
	      "#bal": "item"
	    }
	  }
	}
      ]
    }
}

DynamoDB also supports adding a ConditionCheck within a transaction that can be use to cancel the transaction in the case that a DynamoDB record that you’re not updating doesn’t pass the check. In contrast, the ConditionExpression is used to cancel the transaction if a record you are updating doesn’t pass the check.

This shows that Transaction Scripts are practical in DynamoDB, and can take us a long way.

Moving from Transaction Scripts to Domain Models

While it’s possible to use DynamoDB expressions and transactions to execute complex logic, you’re likely to hit a limit where it’s possible to do it in DynamoDB or another database, but it’s starting to be really hard to reason about. For example, if you’re modelling the situation where a user might have an overdraft, so you want to allow the transaction if they’ve got an overdraft. Or you want to work out whether that particular transaction was the one transaction that took them into their overdraft so that you can send the user a text message.

At some point, you might find that it’s just easier to use a programming language, but to do this work in your favourite programming language, you’ve got to take a few steps:

This has a few downsides, one is that it requires a network round-trip. One network hop to get the data, another to update it.

It also means two database operations (read, then a write) instead of one (just a write).

Another problem is that it introduces a situation where another transaction could have updated the database record and changed it during that processing time. Let’s use a bank account as an example.

Process A: Gets record. { "balance": 100 }
Process B: Gets record. { "balance": 100 }
Process A: Adds 50 to balance.
Process B: Adds 100 to balance.
Process B: Writes update to record. { "balance": 200 }
Process A: Writes update to record. { "balance": 150 }

This scenario resulted in the loss of 100 off the balance field - it should have been 250, but the final balance here was the last written value of 150.

This same scenario would play out with state calcluations too. Let’s use a retail order for an example:

Process A: Receives order shipped notification.
Process A: Gets record. { "status": "created" }
Process B: Receives order delivered notification.
Process B: Gets record. { "status": "created" }
Process A: Calculates status as "shipped".
Process B: Calculates status as "delivered".
Process B: Writes record. { "status": "delivered" }
Process A: Writes record. { "status": "shipped" }

Here, we’ve accidentally overwritten the “delivered” status with “shipped”, losing the fact that the order has been received by the customer.

One way to deal with this is to add a version number to each record, and to check it on write, so that the code can’t overwrite another change. In DynamoDB, this can be done with a ConditionExpression as shown in the example above.

This is called “optimistic concurrency”. There’s a chance that someone else modified the record, but how much of a chance depends on the use case - if we’re dealing with a single user’s mobile shopping order, then there’s going to be a few updates, but for any given window of 100ms or so (the window of getting the data, processing it in code, and attempting to write the updated record back), the chance of a conflict is very, very low.

If another process has modified the data, the code that’s attempting to make the update would need to get the latest version of the record and try to apply its updates again. Typically, the updating code would apply a random delay (“jitter”) to reduce the chance of multiple processes retrying their transactions at the same time:

Process A: Gets record. { "version": 1, "value": 100 }
Process B: Gets record. { "version": 1, "value": 100 }
Process A: Adds 50 to the value.
Process B: Adds 100 to the value.
Process B: Attempts update to record with ConditionExpression
           to check the record version is 1. { "version": 2, "value": 200 }
Process B: Write successful.
Process A: Attempts update to record with ConditionExpression
           to check the record version is 1. { "version": 2, "value": 150 }
Process A: Write unsuccessful.
Process A: Waits for random time between 0ms and 200ms.
Process A: Gets record. { "version": 2, "value": 200 }
Process A: Adds 50 to the value.
Process A: Attempts update to record with ConditionExpression
           to check the record version is 2. { "version": 3, "value": 250 }
Process A: Write successful.

This can result in a lower system throughput, due to the conflicts, but if the logic is simpler for engineers to think about, and you get lower numbers of defects because it’s easier to unit test the logic, it’s a good choice.

However, it’s critically important that the code that processes the data doesn’t assume that its changes will be successfully persisted to the database, because the data operation might need to be retried in the case of a transaction conflict. Also, the faster the operation is, the smaller the size of the conflict window, and the less likely conflicts are to happen.

In the bank transaction example, the retrying the transaction is relatively straightfoward, because the operation is non-conflicting (https://www.geeksforgeeks.org/conflict-serializability-in-dbms/) - but in the case of conflicting updates (e.g. replacing a user’s email address, updating the status of an order), we need alternative strategies.

The simplest coding strategy is “last write wins” - i.e. just overwrite the data with whatever you want to write anyway. This gives us one benefit - being able to write our logic in program code - but this leaves us in the same position of possibly losing state transitions, or overwriting important data. Despite this, for some entities (e.g. user profile - their email, phone number etc.), it’s a valid strategy.

In recent projects I’ve worked on, the Domain Model code was focussed on handling conflicting updates, and out-of-order events. For example, if we receive events in this order:

Order Created
Order Being Processed
Order Delivered
Order Dispatched

When we receive the Order Delievered event, we’d want our code to set the status to Delivered.

When we then get the Order Dispatched event, we wouldn’t want our code to modify the status at all.

But, if our code had received Order Dispatched first, then Order Delievered we’d probably want it to send a text message to the customer telling them their order was on its way once the database transaction was written.

Sending notifications reliably

We’ve just described a situation when we want to send a message after a transaction has been committed to the database.

At the point we’re 100% sure that our database changes have been saved, we need to let other systems know about the status change so that they can carry out their actions, e.g. notifying the customer based on their communication preferences, or preparing a delivery of the order.

We might typically inform other systems by sending a message on a message bus or stream (EventBridge / Kinesis / SNS), allowing other systems to subscribe to the events and carry out appropriate actions.

There’s just one complication. The network is unreliable, the messaging system is unreliable. It’s impossible to guarantee that it will be possible to send the message, EventBridge might go down or your code might crash, we need to be able to handle this scenario:

One way to handle this is to write the outbound messages to a log file in the case of failure, then have a process that reads through the logs and processes any failures, but this has plenty of problems of its own, since you’d need to keep track of which failed messages had been successfully sent later.

I think a better way is to write outbound message as part of the same database transaction that’s persisting the inbound event (the event that caused the status change) and the current state of the item. Then, to use a DynamoDB Stream hooked up to a Lambda Function to subscribe to the table events and send the messages.

This ensures that a copy of the outbound message is persisted, while DynamoDB Streams combined with Lambda gives automatic retries and dead-letter capabilities for handling terminal failures, see https://aws.amazon.com/blogs/compute/new-aws-lambda-controls-for-stream-processing-and-asynchronous-invocations/ for more details.

Summary

Part 2 demonstrates implementing an Event Sourced database design in TypeScript.