adrianhesketh.com

Event Sourced DynamoDB

A year ago, I wrote up and open sourced an event sourced database design that uses DynamoDB and the TypeScript programming language to update the state of an entity and store it in DynamoDB [0].

The approach is particularly useful for systems that receive a mixture of synchronous (e.g. REST API calls) and asynchronous (e.g. EventBridge messages) events that cause the state of an entity to be updated, and external events to be published.

I’m currently working on a Go project, and needed the same capability, so I ported the library across. It’s not database-record level compatible with the previous design, but uses a very similar database design.

I’ve made use of Go’s type system to simplify the design compared to the TypeScript version.

To demonstrate how it works, I made a complete example using CDK modelled around a simple “one-armed bandit” slot machine.

Core state machine

To gamble your money away, you need to:

  • Insert a coin
  • Pull the handle

The wheels will spin, and you’ll either win - in which case you get back to the start position via paying_out - or you’ll lose and go straight back to start.

This can be represented by the following state machine.

digraph G {
    start -> coin_inserted [ label="insert coin" ];
    coin_inserted -> spinning_wheels [ label = "pull handle"];
    spinning_wheels -> paying_out [ label="win" ];
    spinning_wheels -> start [ label="lose" ];
    paying_out -> start [ label="paid out" ];
}

There’s some implied restrictions on what you can do, depending on what state the slot machine is in. For example, if you haven’t inserted a coin, you can’t pull the handle. If you’ve already inserted a coin, you can’t insert another one.

This can be defined as a Go struct with a couple of methods attached - one for pulling the handle, the other for inserting a coin.

func NewSlotMachine(id string) *SlotMachine {
	return &SlotMachine{
		ID:        id,
		Balance:   0,
		Payout:    4,
		WinChance: 0.18,
	}
}

var ErrCannotInsertCoin = errors.New("cannot insert coin")
var ErrCannotPullHandle = errors.New("cannot pull handle")

type SlotMachine struct {
	ID      string `json:"id"`
	Balance int    `json:"balance"`
	// How much is paid out if you win.
	Payout int `json:"payout"`
	// How likely you are to get paid out.
	WinChance    float64 `json:"winChance"`
	Games        int     `json:"games"`
	Wins         int     `json:"wins"`
	Losses       int     `json:"losses"`
	IsCoinInSlot bool    `json:"isCoinInSlot"`
}

func (s *SlotMachine) InsertCoin() (ok bool) {
	if s.IsCoinInSlot {
		return false
	}
	s.IsCoinInSlot = true
	return true
}

func (s *SlotMachine) PullHandle() (win bool, ok bool) {
	// Complain if we can't take the coin.
	ok = s.IsCoinInSlot
	if !ok {
		return
	}
	s.IsCoinInSlot = false

	// See if we win.
	win = rand.Float64() <= s.WinChance

	// Update the stats.
	s.Games++
	if win {
		s.Wins++
		s.Balance -= (s.Payout - 1)
	} else {
		s.Losses++
		s.Balance++
	}
	return
}

From method calls to events

With the processing modelling done, we can move on to storing state. It’s already stored in RAM, which is where many state machine examples end. [2] [3]

However, for many business use cases, such as updating the state of an order, or an invoice, only storing data in RAM is unworkable because if the server restarts, or crashes, then the state would be lost - you’d lose track of orders, or whether invoices had been paid, or lose track of what bank balances are.

One way to get around this is to store the state of the object to a persistent store, where it can be reloaded if required.

DynamoDB is a great choice for state storage, because it is fault tolerant, provides data encryption at rest and in transit, is scalable (capable of providing extremely high read/write performance), and supports point-in-time recovery / backups.

However, if our program logic has a bug in it, just storing the current state might not allow us to correctly update the state. For example, we might not know what money was added to, or removed from a bank balance.

If we store all of the actions that happened to the specific state machine in DynamoDB, as well as its state, we’re able to replay the actions through an updated state machine and work out what the state should have been. It’s also handy as an audit trail.

While it’s not possible to store method calls in DynamoDB, we can store the name of the method to call and the data to pass to the method. The github.com/a-h/stream library enables this by requiring a state machine to implement the stream.State interface.

To implement the stream.State interface, SlotMachine has a Process method which accepts a stream.InboundEvent argument. This method executes appropriate state machine methods based on the input event’s type. The method also returns a list of stream.OutboundEvent messages which should be sent to other systems, and an error if anything went wrong during processing.

In this case, if the user wins, then a message is sent out to inform other systems (e.g. the payout system) that the user won, and that they should be paid out.

func (s *SlotMachine) Process(event stream.InboundEvent) (outbound []stream.OutboundEvent, err error) {
	switch e := event.(type) {
	case InsertCoin:
		ok := s.InsertCoin()
		if !ok {
			err = ErrCannotInsertCoin
			return
		}
		break
	case PullHandle:
		win, ok := s.PullHandle()
		if !ok {
			err = ErrCannotPullHandle
		}
		outbound = append(outbound, GameResult{
			MachineID: s.ID,
			Win:       win,
		})
		if win {
			outbound = append(outbound, Payout{
				UserID: e.UserID,
				Amount: s.Payout,
			})
		}
	}
	return
}

Hooking up the state to the store

So far, we have a state machine that is easily tested in isolation by calling Process with events, and checking that the state has been updated as expected, but the state itself is still just in RAM.

To store the state, we need a stream.Store. It takes the DynamoDB table name, and the name of the state machine type, and returns a processor. The Process method executes the database work.

var err error
store, err = stream.NewStore(TableName, "machine")
if err != nil {
        panic("failed to init store: " + err.Error())
}

Then, we can create a new slot machine.

machine := models.NewSlotMachine(id)
p, err := stream.New(store, id, machine)
if err != nil {
        http.Error(w, "failed to create new stream processor", http.StatusInternalServerError)
        return
}
err = p.Process()
if err != nil {
        http.Error(w, "failed to create new machine", http.StatusInternalServerError)
        return
}

Or get an existing one.

func (h Handler) Get(w http.ResponseWriter, r *http.Request) {
	defer h.Log.Sync()

	pathValues, ok := matcher.Extract(r.URL)
	if !ok {
		http.Error(w, "path not found", http.StatusNotFound)
		return
	}
	id, ok := pathValues["id"]
	if !ok {
		http.Error(w, "missing id parameter in path", http.StatusNotFound)
		return
	}

	machine := models.NewSlotMachine(id)
	_, err := h.Store.Get(id, machine)
	if err != nil {
		h.Log.Error("failed to get machine", zap.Error(err))
		http.Error(w, "failed to get machine", http.StatusInternalServerError)
		return
	}

	enc := json.NewEncoder(w)
	err = enc.Encode(machine)
	if err != nil {
		http.Error(w, "failed to encode machine", http.StatusInternalServerError)
		return
	}
}

To interact with the state machine, a HTTP POST handler can be used to load the existing machine’s state. stream.Load takes a store to use to access DynamoDB, the ID of the state machine, and a pointer to the state machine that will be populated from the DynamoDB state record, and returns a processor.

The processor’s Process method then handles the state machine update and DynamoDB persistence.

func (h Handler) Post(w http.ResponseWriter, r *http.Request) {
	defer h.Log.Sync()

	pathValues, ok := matcher.Extract(r.URL)
	if !ok {
		http.Error(w, "path not found", http.StatusNotFound)
		return
	}
	id, ok := pathValues["id"]
	if !ok {
		http.Error(w, "missing id parameter in path", http.StatusNotFound)
		return
	}

	machine := models.NewSlotMachine(id)
	p, err := stream.Load(h.Store, id, machine)
	if err != nil {
		http.Error(w, "failed to load machine", http.StatusInternalServerError)
		return
	}

	// You might need to load the model from the HTTP body, but here we're not expecting one.
	err = p.Process(models.PullHandle{
		UserID: "test_user", // Populate this from an authentication token.
	})
	if err != nil {
		if err == models.ErrCannotPullHandle {
			http.Error(w, fmt.Sprintf("cannot pull handle, has a coin been inserted?"), http.StatusNotAcceptable)
			return
		}
		h.Log.Error("failed to pull handle", zap.Error(err))
		http.Error(w, fmt.Sprintf("internal server error"), http.StatusInternalServerError)
		return
	}

	enc := json.NewEncoder(w)
	err = enc.Encode(machine)
	if err != nil {
		http.Error(w, "failed to encode machine", http.StatusInternalServerError)
		return
	}
}

Usage

You can see the complete example, including all of the infrastructure required to run it, written in CDK at [4]

The library itself is available at [5] - please drop it a star if you find it useful.