← Home

Event sourced DynamoDB design with TypeScript - Part 2

This is part 2 of a 2 part series:

Full example: [0]

Part 2

In part 1, we looked at the difference between Transaction Scripts and Event Sourced database designs, and the DynamoDB features that make Event Sourced database designs possible with DynamoDB.

In this post, we’ll look at a way of structuring an application to do this. The application will be made up of modules to deal with each responsibility:

  • Record management - something to get records into and out of the database.
  • State management - something to process events and calculate the state of an entity.
  • Entity management - something that deals with the record, state and event management.

Finally, we’ll define the rules that determine how events are interpreted into state.

For this example, we’ll create a ledger for a bank account.

Record management

A DynamoDB record must have a partition key, and optionally have a sort key. All other fields are optional. I’ve used _id for the partition key, and _rng for the sort key. Prefixing the field names with an underscore isn’t required by DynamoDB, it helps to differentiate data fields from fields used for database metadata.

Here’s how to create “ledger” DynamoDB table:

aws dynamodb create-table \
  --table-name ledger \
  --attribute-definitions AttributeName=_id,AttributeType=S AttributeName=_rng,AttributeType=S \
  --key-schema AttributeName=_id,KeyType=HASH AttributeName=_rng,KeyType=RANGE \
  --billing-mode PAY_PER_REQUEST

And define a type that represents what will be stored in DynamoDB.

export interface Record {
  // Identifier of the record group.
  _id: string;
  // Event sort key.
  _rng: string;
  // Facet of the event.
  _facet: string;
  // Type of the event.
  _typ: string;
  // Timestamp of the record.
  _ts: number;
  // ISO date.
  _date: string;
  // Item data.
  _itm: string;
  // Sequence of the record.
  _seq: number;
}

The _id and _rng fields represent the partition and sort key respectievly.

The _facet field stores the name of the entity type that is being stored. The term “facet” comes from the DynamoDB NoSQL Workbench [1]

For consistency, I’ve adopted it as a helpful way to differentiate between multiple entity types stored within a single DynamoDB table.

The _typ field stores the name of the event type, e.g. “TRANSACTION” or “ACCOUNT_UPDATE”. This allows our program code to be able to understand the type of data stored in the _itm field to be able to handle it appropriately.

The _ts and _date field store the time that the record was added to the database in Unix and RFC 3339 / ISO 8601 format respectively.

The _itm field stores a JSON blob of the event data. Having a JSON blob makes it easy to see the structure of the data at a glance compared to splitting it into DynamoDB fields. It also clearly delineates the fields that are used by DynamoDB, and the data.

Not splitting the _itm into individual DynamoDB fields makes it impossible to create a global secondary index on an _itm field, but for this use case, that’s not a problem. More on that later.

Finally, the _seq field stores an incrementing number that represents the sequence of the event in the list of events that make up an individual entity’s state.

We’re going to be storing 3 types of record overall:

  • Inbound events - external events that may change the state of an entity.
  • Outbound events - events to be published due to state changes within the entity.
  • State - the current calculated state of the entity.

Regardless of the type of record, the _id field will alwys be calculated by concatenating the “facet” (type of entity being stored in the table), with its unique ID. The “facet” represents a namespace within the table.

const facetId = (facet: string, id: string) => `${facet}/${id}`;

The _rng field is then used to differentiate between the record types.

For any given entity, there will only ever be one “STATE” record representing the current state, so the _rng field is set to STATE. The _seq field defines the version of the state, each change to the state increments the _seq value.

| `_id`            | `_rng` | `_facet`     | `_typ`       | `_seq` | `_itm`                                                                     |
|------------------|--------|--------------|--------------|--------|----------------------------------------------------------------------------|
| BANK_ACCOUNT/123 | STATE  | BANK_ACCOUNT | BANK_ACCOUNT | 1      | `{ "balance":0, "minBalance":-1000, "id":"123", "owner":"John Brown" }` |

There will be multiple inbound events, so the _rng key includees the type and sequence number to differentiate the events:

const inboundRecordRangeKey = (type: string, seq: number) => `INBOUND/${type}/${seq}`;

This gives us a database structure that looks something like this - a list of various events including account updates and transactions, and a “STATE” record that’s a result of the events.

| `_id`              | `_rng`                           | `_typ`                 | `_seq` | `_itm`                                                                    |
|------------------|--------------------------------|----------------------|------|--------------------------------------------------------------------------|
| BANK_ACCOUNT/123 | INBOUND/ACCOUNT_CREATION/2     | ACCOUNT_CREATION     | 2    | `{"id":"123"}`                                                        |
| BANK_ACCOUNT/123 | INBOUND/ACCOUNT_UPDATE/3       | ACCOUNT_UPDATE       | 3    | `{"ownerFirst":"John","ownerLast":"Brown"}`                              |
| BANK_ACCOUNT/123 | INBOUND/TRANSACTION_ACCEPTED/4 | TRANSACTION_ACCEPTED | 4    | `{"desc":"Transaction A","amount":200}`                                  |
| BANK_ACCOUNT/123 | STATE                          | BANK_ACCOUNT         | 4    | `{"balance":-200,"minBalance":-1000,"id":"123","owner":"John Brown"}` |

So, all that’s left to discuss is the outbound events, messages being sent to 3rd parties. Since a given event may result in multiple outbound messages being sent, an “index” is appended to the end of the range key to differentiate between those messages while the seq value shows which inbound event caused the outbound event to be sent.

const outboundRecordRangeKey = (type: string, seq: number, index: number) =>
  `OUTBOUND/${type}/${seq}/${index}`;

In the example above, the TRANSACTION_ACCEPTED event with sequence number 4 resulted in the account becoming overdrawn, if we updated our processing code to send an outbound message when this happens, we’d want the result to look like this:

| `_id`              | `_rng`                           | `_typ`                 | `_seq` | `_itm`                                                                 |
|------------------|--------------------------------|----------------------|------|-----------------------------------------------------------------------|
| BANK_ACCOUNT/123 | INBOUND/ACCOUNT_CREATION/2     | ACCOUNT_CREATION     | 2    | `{"id":"123"}`                                                        |
| BANK_ACCOUNT/123 | INBOUND/ACCOUNT_UPDATE/3       | ACCOUNT_UPDATE       | 3    | `{"ownerFirst":"John","ownerLast":"Brown"}`                           |
| BANK_ACCOUNT/123 | INBOUND/TRANSACTION_ACCEPTED/4 | TRANSACTION_ACCEPTED | 4    | `{"desc":"Transaction A","amount":200}`                               |
| BANK_ACCOUNT/123 | OUTBOUND/accountOverdrawn/4/0  | accountOverdrawn     | 4    | `{"accountId":"123"}`                                                 |
| BANK_ACCOUNT/123 | STATE                          | BANK_ACCOUNT         | 4    | `{"balance":-200,"minBalance":-1000,"id":"123","owner":"John Brown"}` |

Note the additional accountOverdrawn outbound message. To actually send the message, a DynamoDB Stream would be set up to receive new records added to the table, and if the _rng field starts with OUTBOUND/, send the message to EventBridge or another messaging tool.

A complete example of this database code can be seen at [2] and take a look at the integration tests alongside at [3]

State Management

With a database design and code in place, we can move on to thinking about how we take “events” and calculate the current state. We need to be able to loop over the events and update the state.

We need to write functions that takes the past state and an event, and return the updated state. These are defined as a StateUpdater function. If you’re familiar with “reducers” in Redux, then this will feel very familiar indeed.

A StateUpdater function receives a rich input of type StateUpdaterInput that allows it to publish events (i.e. store them in the DB) via the publish function, and allows it access to all past events.

// StateUpdater<TState, TCurrent> defines a function used to update the state given the current incoming event.
export type StateUpdater<TState, TCurrent> = (input: StateUpdaterInput<TState, TCurrent>) => TState;

// StateUpdaterInput is the input to the StateUpdater function.
export interface StateUpdaterInput<TState, TCurrent> {
  // state of the facet.
  state: TState;
  // current event that is modifying the state.
  current: TCurrent;
  // events that already exist.
  pastInboundEvents: Array<Event<any>>;
  // events that are being added.
  newInboundEvents: Array<Event<any>>;
  // all allows access to all of the events, new and old.
  all: Array<Event<any>>;
  // current index within the events.
  currentIndex: number;
  // The index of the latest event within all events.
  stateIndex: number;
  // publish an outbound event.
  publish: (name: string, event: any) => void;
}

// An Event can be an inbound event that makes up the facet state, or an outbound event emitted
// due to a state change. Reading through all the inbound events, and applying the rules creates
// a materialised view. This view is the "STATE" record stored in the database.
export class Event<T> {
  type: string;
  event: T | null;
  constructor(type: string, event: T | null) {
    this.type = type;
    this.event = event;
  }
}

So, to process events, we need to take a list of events, an optional starting state, and a map (dictionary) that decides which StateUpdater function should be used based on the type of the event.

There’s no concept of runtime introspection (reflection) in TypeScript, which is why each database record stores the name of the record’s type in the _typ field as a string. This makes it possible to have a Map<RecordTypeName, StateUpdater<TState, any>> - a map of record type (actually a string) to a function that takes in the current state (TState), an event of any type, and returns the updated state (of type TState).

So a bank account’s rules can be defined as follows:

// The rules define how the BankAccount state is updated by incoming events.
// For example, an incoming "TRANSACTION" event modifies the "ACCOUNT_BALANCE" state.
// The function must be pure, it must not carry out IO (e.g. network requests, or disk
// access), and it should execute quickly. If it does not, it is more likely that in
// between the transaction starting (reading all the previous events), and completing (updating
// the state), another event will have been inserted, resulting in the transaction
// failing and needing to be executed again.
const rules = new Map<RecordTypeName, StateUpdater<BankAccount, any>>();
rules.set(
  "ACCOUNT_CREATION",
  (input: StateUpdaterInput<BankAccount, AccountCreation>): BankAccount => {
    input.state.id = input.current.id;
    return input.state;
  },
);
rules.set(
  "TRANSACTION_ACCEPTED",
  (input: StateUpdaterInput<BankAccount, Transaction>): BankAccount => {
    const previousBalance = input.state.balance;
    const newBalance = input.state.balance + input.current.amount;

    // If they don't have sufficient overdraft, cancel the transaction.
    if (newBalance < input.state.minimumBalance) {
      throw new Error("insufficient funds");
    }

    // If this is the transaction that takes the user overdrawn, notify them.
    if (previousBalance >= 0 && newBalance < 0) {
      const overdrawnEvent = { accountId: input.state.id } as AccountOverdrawn;
      input.publish(AccountOverdrawnEventName, overdrawnEvent);
    }

    input.state.balance = newBalance;
    return input.state;
  },
);
rules.set(
  "ACCOUNT_UPDATE",
  (input: StateUpdaterInput<BankAccount, AccountUpdate>): BankAccount => {
    input.state.ownerFirst = input.current.ownerFirst;
    input.state.ownerLast = input.current.ownerLast;
    return input.state;
  },
);

That is, in the case of an event with name ACCOUNT_CREATION, the id field is set on the state.

In the case of event with name TRANSACTION_ACCEPTED, the logic is more complex. If there’s not enough balance to take the payment, the transaction is rejected by throwing an error. This logic also makes sure that if this transaction is going to be the one to take someone overdrawn, an outbound event is be published.

Finally, the rules can change the name of the account holder in the case of an ACCOUNT_UPDATE event.

These rules are very easy to test in isolation, and to create tests that replicate receiving events in unexpected ordering.

The processor code is available over at [4]

Entity management

Next, we need some code to hook these two components together and to abstract the complexity of the internal details from the API consumer, so that they can focus on their application logic.

This is where the Facet class steps in, providing simple get, append, and recalculate methods.

The get method gets the current state of the entity.

The append method appends a new event to the database, and executes the state management rules to update the state of the entity. This function only gets the latest state record to pass to the StateUpdater function, rather than all past events, and is suitable for use when a state machine doesn’t need to look back through all previous events to calculate the next state.

The recalculate method is the same as the append method, except that it issues a query to the database to retrieve all past records. This allows the state management rules to look at previous events, at a cost of potentially more database read operations.

This code is available at [5]

Bringing it all together

Finally, we’re ready to bring all the components together.

First, import all of the required libraries.

import * as AWS from "aws-sdk";
import { Facet } from "../../src";
import { EventDB } from "../../src/db";
import {
  Processor,
  RecordTypeName,
  StateUpdater,
  StateUpdaterInput,
  Event,
} from "../../src/processor";

Next, define the state type and the facet name:

// The account Facet has multiple records.
// State: The current "BankAccount".
// Inbound Events: The account is made up of "Transaction" and "AccountUpdate" records.
// Outbound Event 1: An "AccountOverdrawn" event is emitted when the balance becomes < 0, and this is allowed due to an overdraft.
// Outbound Event 2: A "TransactionFailed" event is emitted when a transaction cannot be completed, due to insufficient funds.
interface BankAccount {
  id: string;
  ownerFirst: string;
  ownerLast: string;
  balance: number;
  minimumBalance: number;
}
const BankAccountRecordName = "BANK_ACCOUNT";

Next, define the inbound and outbound event types:

// Inbound events must have a name.
interface AccountCreation {
  id: string;
}
const AccountCreationEventName = "ACCOUNT_CREATION";
interface AccountUpdate {
  ownerFirst: string;
  ownerLast: string;
}
const AcccountUpdateEventName = "ACCOUNT_UPDATE";
interface Transaction {
  desc: string;
  amount: number;
}
const TransactionEventName = "TRANSACTION_ACCEPTED";

// Outbound events don't need to be named, they're named when they're sent, but it's still a good
// idea to set the name.
interface AccountOverdrawn {
  accountId: string;
}
const AccountOverdrawnEventName = "accountOverdrawn";

Use the rules we defined earlier, and create the Facet.

const demonstrateLedger = async () => {
  // Create a table.
  const client = new AWS.DynamoDB.DocumentClient({
    region: "eu-west-2",
  });
  const tableName = "ledger";
  const db = new EventDB(client, tableName, BankAccountRecordName);

  // New accounts start with a balance of zero.
  const initialAccount = (): BankAccount =>
    ({
      balance: 0,
      minimumBalance: -1000, // Give the user an overdraft.
    } as BankAccount);

  // Create the processor that handles events.
  // Use the rules defined earlier.
  const processor = new Processor<BankAccount>(rules, initialAccount);

  // Can now create a ledger "Facet" in our DynamoDB table.
  const ledger = new Facet<BankAccount>(BankAccountRecordName, db, processor);

Now, we’re ready to start appending events to our entity:

  // Let's create a new account.
  const accountId = Math.round(Math.random() * 1000000).toString();

  // There is no new data to add.
  await ledger.append(
    accountId,
    new Event<AccountCreation>(AccountCreationEventName, {
      id: accountId,
    }),
  );

  // Update the name of the owner.
  await ledger.append(
    accountId,
    new Event<AccountUpdate>(AcccountUpdateEventName, {
      ownerFirst: "John",
      ownerLast: "Brown",
    }),
  );

  // Add a couple of transactions in a single operation.
  const result = await ledger.append(
    accountId,
    new Event<Transaction>(TransactionEventName, {
      desc: "Transaction A",
      amount: 200,
    }),
    new Event<Transaction>(TransactionEventName, {
      desc: "Transaction B",
      amount: -300,
    }),
  );
  result.newOutboundEvents.map((e) => console.log(`Published event: ${JSON.stringify(e)}`));

  // Another separate transaction.
  const transactionCResult = await ledger.append(
    accountId,
    new Event<Transaction>(TransactionEventName, {
      desc: "Transaction C",
      amount: 50,
    }),
  );

  // If we've just read the STATE, we can try appending without doing
  // another database read. If no other records have been written in the
  // meantime, the transaction will succeed.
  await ledger.appendTo(
    accountId,
    transactionCResult.item,
    transactionCResult.seq,
    new Event<Transaction>(TransactionEventName, {
      desc: "Transaction D",
      amount: 25,
    }),
  );

  // Get the final balance.
  const balance = await ledger.get(accountId);
  if (balance) {
    console.log(`Account details: ${JSON.stringify(balance.item)}`);
  }

  // Verify the final balance by reading all of the transactions and re-calculating.
  const verifiedBalance = await ledger.recalculate(accountId);
  console.log(`Verified balance: ${JSON.stringify(verifiedBalance.item)}`);

  // The re-calculation can also take data to modify the result.
  const finalBalance = await ledger.recalculate(
    accountId,
    new Event<Transaction>(TransactionEventName, {
      desc: "Transaction E",
      amount: 25,
    }),
  );
  console.log(`Final balance: ${JSON.stringify(finalBalance.item)}`);
};

With that demonstration function in place, it can be executed:

demonstrateLedger()
  .then(() => console.log("complete"))
  .catch((err: Error) => {
    console.log(`stack: ${err.stack}`);
  });

And the expected balance is seen on the console.

% npx ts-node ledger.ts
Published event: {"type":"accountOverdrawn","event":{"accountId":"476118"}}
Account details: {"balance":-25,"minimumBalance":-1000,"id":"476118","ownerFirst":"John","ownerLast":"Brown"}
Verified balance: {"balance":-25,"minimumBalance":-1000,"id":"476118","ownerFirst":"John","ownerLast":"Brown"}
Final balance: {"balance":0,"minimumBalance":-1000,"id":"476118","ownerFirst":"John","ownerLast":"Brown"}
complete

Summary

Building on the DynamoDB primitives, we can create an easy-to-use API to deliver an Event Sourced database design that supports asynchronous message delivery.

The full code and set of unit tests is available at [6]