adrianhesketh.com

Single table pattern DynamoDB with Go - Part 3

This is part 3 of a 3 part series:

Introduction

In Part 2 - Record design, I put together types used to store records, set up a local development environment, and then started to write some integration tests.

In this post, I complete the design using some of DynamoDB’s other features: QueryPages, BatchWriteItem, DynamoDB transactions and UpdateItem operations.

GetDetails with QueryPages

At this point, the Get function on the store only returns basic details, it doesn’t include the organisations and invitations belonging to the user, so I’ll add a GetDetails that does a Query operation to return all of the data.

This code uses the QueryPages method of DynamoDB. Queries are limited to how much data they will return a single HTTP request to DynamoDB, meaning that multiple calls may be made. In this case, the number of organisations and invites that a user belongs to won’t get to a high amount, so it’s a low risk to read all of the pages, however, it would be reasonable to place a limit on the number of pages read from DynamoDB to prevent accidentally wasting database calls. To do this, counting the number of pages inside the page anonymous function, then returning false instead of true would be one option.

The general layout of the function is to:

I’ve used the dynamodb expression library here, for simplicity.

func (store UserStore) GetDetails(id string) (user UserDetails, err error) {
	q := expression.Key("id").Equal(expression.Value(newUserRecordHashKey(id)))
	expr, err := expression.NewBuilder().
		WithKeyCondition(q).
		Build()
	if err != nil {
		err = fmt.Errorf("userStore.GetDetails: failed to build query: %v", err)
		return
	}

	qi := &dynamodb.QueryInput{
		TableName:                 store.TableName,
		KeyConditionExpression:    expr.KeyCondition(),
		ExpressionAttributeValues: expr.Values(),
		FilterExpression:          expr.Filter(),
		ExpressionAttributeNames:  expr.Names(),
		ConsistentRead:            aws.Bool(true),
	}

	var items []map[string]*dynamodb.AttributeValue
	page := func(page *dynamodb.QueryOutput, lastPage bool) bool {
		items = append(items, page.Items...)
		return true
	}
	err = store.Client.QueryPages(qi, page)
	if err != nil {
		err = fmt.Errorf("userStore.GetDetails: failed to query pages: %v", err)
		return
	}

	user, err = newUserDetailsFromRecords(items)
	if err != nil {
		err = fmt.Errorf("userStore.GetDetails: failed to create UserDetails: %w", err)
	}
	return
}

The newUserDetailsFromRecords function takes in a list of DynamoDB records. DynamoDB records are stored in an unweildy slice of maps ([]map[string]*dynamodb.AttributeValue).

Here is why keeping the name of the record as an attribute in the DynamoDB table is essential - I can now use this information to convert the map[string]*DynamoDBAttributeValue into a record type by looking at the value of the typ attribute to determine what the record type is - i.e. once I know the record type, a case statement can be used to process the data appropriately.

This code constructs the UserDetails entity - building out the one (user) to many (organisations) in the relationship.

func newUserDetailsFromRecords(items []map[string]*dynamodb.AttributeValue) (user UserDetails, err error) {
	for _, item := range items {
		recordType, ok := item["typ"]
		if !ok || recordType.S == nil {
			continue
		}
		switch *recordType.S {
		case userRecordName:
			err = dynamodbattribute.ConvertFromMap(item, &user)
			if err != nil {
				err = fmt.Errorf("newUserDetailsFromRecords: failed to convert userRecord: %w", err)
				return
			}
			user.ID = *item["email"].S
			break
		case userOrgnisationRecordName:
			var uor userOrganisationRecord
			err = dynamodbattribute.ConvertFromMap(item, &uor)
			if err != nil {
				err = fmt.Errorf("newUserDetailsFromRecords: failed to convert userOrganisationRecord: %w", err)
				return
			}
			if uor.AcceptedAt == nil {
				user.Invitations = append(user.Invitations, newInvitationFromRecord(uor))
				continue
			}
			user.Organisations = append(user.Organisations, newOrganisation(uor.OrganisationID, uor.OrganisationName))
			break
		}
	}
	return
}

It’s not possible to test this yet, because there’s no code to invite or add a user to an Organisation. The original spec came up with 3 required functions in this area:

So, I need to add those.

Adding multiple items with a single operation

Although I thought I’d need to create PutUserInvite, I decided to put it on the User store as Invite.

The database structure requires two records to be added. One to add the User to the Organisation and one to add the Organisation to the User. This means that an organisationMemberRecord and a userOrganisationRecord have to be inserted.

I could use a transaction here to make sure that both put operations succeed, or neither succeeds. I’d need to do this if the application broke in some way due to the lack of data, but I’ll make it so that it doesn’t matter so that I can reduce the cost of the solution (transactions cost twice as many read/write units at the time of writing).

The most cost-efficient way to do this is to use the BatchWriteItem method, which executes multiple database requests in a single HTTP request, retrying on failure.

func (store UserStore) Invite(u User, org Organisation, toGroup string) error {
	now := store.Now()
	organisationMemberRecord := newOrganisationGroupMemberRecord(org, toGroup, u, now)
	organisationMemberItem, err := dynamodbattribute.ConvertToMap(organisationMemberRecord)
	if err != nil {
		return fmt.Errorf("userStore.Invite: failed to convert organisationMemberRecord: %w", err)
	}
	userOrganisationRecord := newUserOrganisationRecord(u, org, now, nil)
	userOrganisationItem, err := dynamodbattribute.ConvertToMap(userOrganisationRecord)
	if err != nil {
		return fmt.Errorf("userStore.Invite: failed to convert userOrganisationRecord: %w", err)
	}
	_, err = store.Client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
		RequestItems: map[string][]*dynamodb.WriteRequest{
			*store.TableName: {
				&dynamodb.WriteRequest{
					PutRequest: &dynamodb.PutRequest{
						Item: organisationMemberItem,
					},
				},
				&dynamodb.WriteRequest{
					PutRequest: &dynamodb.PutRequest{
						Item: userOrganisationItem,
					},
				},
			},
		},
	})
	return err
}

To accept an invite, I can update the record to set the acceptedAt key to the current date using the UpdateItem API call. I’m making use of the newUserOrganisationRecordHashKey and newUserOrganisationRecordRangeKey functions to construct the keys in the right way.

func (store UserStore) AcceptInvite(u User, org Organisation) error {
	update := expression.Set(expression.Name("acceptedAt"), expression.Value(store.Now()))

	expr, err := expression.NewBuilder().
		WithUpdate(update).
		Build()
	if err != nil {
		return fmt.Errorf("userStore.AcceptInvite: failed to build query: %v", err)
	}

	_, err = store.Client.UpdateItem(&dynamodb.UpdateItemInput{
		TableName:                 store.TableName,
		Key:                       idAndRng(newUserOrganisationRecordHashKey(u.ID), newUserOrganisationRecordRangeKey(org.ID)),
		UpdateExpression:          expr.Update(),
		ExpressionAttributeValues: expr.Values(),
		ExpressionAttributeNames:  expr.Names(),
	})
	return err
}

To reject an invite, I’m going to delete the relationship at both sides with another BatchWriteItem API call.

func (store UserStore) RejectInvite(u User, org Organisation) error {
	organisationMemberKey := idAndRng(newOrganisationGroupMemberRecordHashKey(org.ID),
		newOrganisationGroupMemberRecordRangeKey(u.ID))
	userOrganisationRecordKey := idAndRng(newUserOrganisationRecordHashKey(u.ID),
		newOrganisationRecordRangeKey())
	_, err := store.Client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
		RequestItems: map[string][]*dynamodb.WriteRequest{
			*store.TableName: {
				&dynamodb.WriteRequest{
					DeleteRequest: &dynamodb.DeleteRequest{
						Key: organisationMemberKey,
					},
				},
				&dynamodb.WriteRequest{
					DeleteRequest: &dynamodb.DeleteRequest{
						Key: userOrganisationRecordKey,
					},
				},
			},
		},
	})
	return err
}

End-to-end integration test

OK, so time to test that it has the desired effect. Invite a user to 3 organisations, ignore one, accept another and reject the third. Finally, get the User details and check that the user only has the accepted organisation listed, and has only one outstanding invitation.

func TestUserInviteIntegration(t *testing.T) {
	if testing.Short() {
		t.Skip("skipping integration test")
	}
	name := createLocalTable(t)
	defer deleteLocalTable(t, name)
	s, err := NewUserStore(region, name)
	s.Client.Endpoint = "http://localhost:8000"
	if err != nil {
		t.Errorf("failed to create store: %v", err)
	}
	u := User{
		ID:        "test@example.com",
		FirstName: "Sarah",
		LastName:  "Connor",
		CreatedAt: time.Date(2020, time.January, 1, 0, 0, 0, 0, time.UTC),
		Phone:     "4476123456789",
	}
	err = s.Put(u)
	if err != nil {
		t.Errorf("failed to create user: %v", err)
	}

	// Invite user to three groups (A, B and C). Ignore A, Accept B, and Reject C.
	orgA := newOrganisation("orgA", "A")
	orgB := newOrganisation("orgB", "B")
	orgC := newOrganisation("orgC", "C")
	err = s.Invite(u, orgA, "testGroup")
	if err != nil {
		t.Errorf("failed to invite user to group A: %v", err)
	}
	err = s.Invite(u, orgB, "testGroup")
	if err != nil {
		t.Errorf("failed to invite user to group B: %v", err)
	}
	err = s.Invite(u, orgC, "testGroup")
	if err != nil {
		t.Errorf("failed to invite user to group C: %v", err)
	}

	err = s.AcceptInvite(u, orgB)
	if err != nil {
		t.Errorf("failed to accept invite to orgB: %v", err)
	}
	err = s.RejectInvite(u, orgC)
	if err != nil {
		t.Errorf("failed to reject invite to orgC: %v", err)
	}

	// Get the details and ensure that this is reflected.
	userDetails, err := s.GetDetails("test@example.com")
	if err != nil {
		t.Errorf("failed to get user details: %v", err)
	}

	if diff := cmp.Diff(u, userDetails.User); diff != "" {
		t.Errorf("failed to match user:\n%v", diff)
	}
	if len(userDetails.Organisations) != 1 {
		t.Errorf("expected 1 organisation, got %d", len(userDetails.Organisations))
	}
	if userDetails.Organisations[0].ID != "orgB" {
		t.Errorf("accepted orgB, but it's showing as %q", userDetails.Organisations[0].ID)
	}
	if diff := cmp.Diff(orgB, userDetails.Organisations[0]); diff != "" {
		t.Errorf("organisation fields not correct:\n%v", diff)
	}
	if len(userDetails.Invitations) != 1 {
		t.Errorf("expected 1 invitation, got %d", len(userDetails.Invitations))
	}
	if userDetails.Invitations[0].Organisation.ID != "orgA" {
		t.Errorf("the invite from orgA has not been accepted or rejected, but got %q", userDetails.Invitations[0].Organisation.ID)
	}
	if diff := cmp.Diff(orgA, userDetails.Invitations[0].Organisation); diff != "" {
		t.Errorf("invitation organisation fields not correct:\n%v", diff)
	}
}

It’s a big test, here, but I split it up into 3 separate tests in the git repo (https://github.com/a-h/organisation)

Organisation store

Now, it’s time to implement the Organisation store, following the same pattern laid out for users.

With a little copy / paste, and some find / replace, the basics of a Put and Get operation can be put in place.

Create (insert, ensuring no overwrite)

Moving on to the Create method, while it’s very unlikely that a UUID will already be present in the database, it is possible, so I’ll use a transaction combined with a condition expression to make sure that I’m not overwriting something or assigning someone ownership to another person’s organisation, and to ensure that everything is rolled back if the condition expression fails.

func (store OrganisationStore) Create(owner User, name string) error {
	// Create the Organisation.
	id := uuid.New().String()
	now := store.Now()
	org := newOrganisation(id, name)
	or := newOrganisationRecord(org)
	orItem, err := dynamodbattribute.MarshalMap(or)
	if err != nil {
		return err
	}
	notOverwrite := expression.And(expression.AttributeNotExists(expression.Name("id")),
		expression.AttributeNotExists(expression.Name("rng")))
	notOverwriteExpr, err := expression.NewBuilder().WithCondition(notOverwrite).Build()
	if err != nil {
		return err
	}
	putNewOrganisation := &dynamodb.Put{
		TableName:           store.TableName,
		Item:                orItem,
		ConditionExpression: notOverwriteExpr.KeyCondition(),
	}

	// Assign ownership.
	ogmr := newOrganisationGroupMemberRecord(org, []string{GroupOwner}, owner, now)
	ogmrItem, err := dynamodbattribute.MarshalMap(ogmr)
	if err != nil {
		return err
	}
	putOrganisationGroupMember := &dynamodb.Put{
		TableName: store.TableName,
		Item:      ogmrItem,
	}

	// Include the user side of the ownership relationship.
	userOrganisationRecord := newUserOrganisationRecord(owner, org, now, &now)
	userOrganisationItem, err := dynamodbattribute.ConvertToMap(userOrganisationRecord)
	if err != nil {
		return fmt.Errorf("userStore.Invite: failed to convert userOrganisationRecord: %w", err)
	}
	putUserOrganisation := &dynamodb.Put{
		TableName: store.TableName,
		Item:      userOrganisationItem,
	}

	_, err = store.Client.TransactWriteItems(&dynamodb.TransactWriteItemsInput{
		TransactItems: []*dynamodb.TransactWriteItem{
			&dynamodb.TransactWriteItem{Put: putNewOrganisation},
			&dynamodb.TransactWriteItem{Put: putOrganisationGroupMember},
			&dynamodb.TransactWriteItem{Put: putUserOrganisation},
		},
	})
	return err
}

PutOrganisationGroup and DeleteOrganisationGroup

Adding a user to a group requires adding a value to a set on the organisationMember. This design has one record per user, with multiple multiple organisation and service groups stored in a single string set. Users can then be removed from the organisation in one DeleteItem operation.

To do this, I created a custom type called groupSet that carefully controls how it is serialized to DynamoDB to convert the Go map types into the DynamoDB stringset value while allowing organisation and service groups to be stored in a single map.

// A groupSet maps both Organisation level and Service-level groups into a single DynamoDB set.
type groupSet struct {
	m                  sync.Mutex
	organisationGroups map[string]struct{}
	serviceIDToGroups  map[string]map[string]struct{}
}

A common pattern in Go is to use an empty struct struct{} to note that you don’t care about the value held within the map, but it looks odd, so I’ve put in some helper functions to abstract the underlying data structures so that users of the type don’t have to wonder what it’s all about.

func newGroupSet(organisationGroups []string, serviceIDToGroups map[string][]string) *groupSet {
	gs := &groupSet{}
	gs.AddToGroups(organisationGroups...)
	if serviceIDToGroups != nil {
		for serviceID := range serviceIDToGroups {
			gs.AddToServiceGroups(serviceID, serviceIDToGroups[serviceID]...)
		}
	}
	return gs
}

// AddToGroups assigns membership of the Organisation groups.
func (gs *groupSet) AddToGroups(groups ...string) {
	if len(groups) == 0 {
		return
	}
	gs.m.Lock()
	defer gs.m.Unlock()
	if gs.organisationGroups == nil {
		gs.organisationGroups = make(map[string]struct{}, len(groups))
	}
	for i := 0; i < len(groups); i++ {
		gs.organisationGroups[groups[i]] = struct{}{}
	}
}

// AddToServiceGroups assigns membership of the Service groups.
func (gs *groupSet) AddToServiceGroups(serviceID string, groups ...string) {
	if len(groups) == 0 {
		return
	}
	gs.m.Lock()
	defer gs.m.Unlock()
	if gs.serviceIDToGroups == nil {
		gs.serviceIDToGroups = make(map[string]map[string]struct{})
	}
	sgs := gs.serviceIDToGroups[serviceID]
	if sgs == nil {
		sgs = make(map[string]struct{})
	}
	for i := 0; i < len(groups); i++ {
		sgs[groups[i]] = struct{}{}
	}
	gs.serviceIDToGroups[serviceID] = sgs
}

With the add helper functions in place, the Unmarshaler interface (https://docs.aws.amazon.com/sdk-for-go/api/service/dynamodb/dynamodbattribute/#Unmarshaler) and the Marshaler interface (https://docs.aws.amazon.com/sdk-for-go/api/service/dynamodb/dynamodbattribute/#Marshaler) are implemented to control the conversion to and from DynamoDB.

The result is that groups are stored in the following format. The wrapperName, values and type elements are all added by DynamoDB, they’re not unique to the groupSet type or the Go programming language. It’s an internal structure of DynamoDB that is exposed to end users.

{
  "groups": {
    "wrapperName": "Set",
    "values": [
      "organisationGroup/owner",
      "serviceGroup/350fa9b2-337e-489d-b095-751321212fc8/service_group1",
      "serviceGroup/350fa9b2-337e-489d-b095-751321212fc8/service_group3"
    ],
    "type": "String"
  }
}
func (gs *groupSet) UnmarshalDynamoDBAttributeValue(av *dynamodb.AttributeValue) error {
	gs.organisationGroups = make(map[string]struct{})
	gs.serviceIDToGroups = make(map[string]map[string]struct{})
	if av.NULL != nil && *av.NULL == true {
		return nil
	}
	for i := 0; i < len(av.SS); i++ {
		g := av.SS[i]
		if g == nil {
			continue
		}
		parts := strings.SplitN(*g, "/", 3)
		if len(parts) < 2 {
			return fmt.Errorf("groupSet: cannot unmarshal string value %q into a group", *g)
		}
		switch parts[0] {
		case "organisationGroup":
			gs.AddToGroups(parts[1])
		case "serviceGroup":
			gs.AddToServiceGroups(parts[1], parts[2])
		}
	}
	return nil
}

func (gs *groupSet) MarshalDynamoDBAttributeValue(av *dynamodb.AttributeValue) error {
	var ss []string
	if gs.organisationGroups != nil {
		for g := range gs.organisationGroups {
			ss = append(ss, "organisationGroup/"+g)
		}
	}
	if gs.serviceIDToGroups != nil {
		for serviceID, groupNames := range gs.serviceIDToGroups {
			for g := range groupNames {
				ss = append(ss, "serviceGroup/"+serviceID+"/"+g)
			}
		}
	}
	av.SetSS(aws.StringSlice(ss))
	return nil
}

Finally, I add some extra methods to get the data out in a pleasant way.

// OrganisationGroups gets the list of Organisation level group names.
func (gs *groupSet) OrganisationGroups() (groups []string) {
	if gs.organisationGroups == nil {
		return
	}
	for g := range gs.organisationGroups {
		g := g
		groups = append(groups, g)
	}
	return
}

// ServiceGroups gets a map of ServiceIDs and group names.
func (gs *groupSet) ServiceGroups() (groups map[string][]string) {
	if gs.serviceIDToGroups == nil {
		return
	}
	groups = make(map[string][]string)
	for serviceID, setOfGroups := range gs.serviceIDToGroups {
		for g := range setOfGroups {
			g := g
			groups[serviceID] = append(groups[serviceID], g)
		}
	}
	return groups
}

With that in place, it’s possible to use the Add capability of the SET operation to add users to groups.

func (store OrganisationStore) AddUserToGroups(organisationID string, user User, groups []string, serviceIDToGroups map[string][]string) error {
	gs := newGroupSet(groups, serviceIDToGroups)
	update := expression.
		Set(expression.Name("typ"), expression.Value(organisationMemberRecordName)).
		Set(expression.Name("v"), expression.Value(0)).
		Set(expression.Name("organisationId"), expression.Value(organisationID)).
		Add(expression.Name("groups"), expression.Value(gs)).
		Set(expression.Name("email"), expression.Value(user.ID)).
		Set(expression.Name("firstName"), expression.Value(user.FirstName)).
		Set(expression.Name("lastName"), expression.Value(user.LastName)).
		Set(expression.Name("phone"), expression.Value(user.Phone)).
		Set(expression.Name("createdAt"), expression.Value(user.CreatedAt))
	expr, err := expression.NewBuilder().
		WithUpdate(update).
		Build()
	if err != nil {
		return err
	}
	_, err = store.Client.UpdateItem(&dynamodb.UpdateItemInput{
		TableName:                 store.TableName,
		Key:                       idAndRng(newOrganisationMemberRecordHashKey(organisationID), newOrganisationMemberRecordRangeKey(user.ID)),
		ExpressionAttributeNames:  expr.Names(),
		ExpressionAttributeValues: expr.Values(),
		UpdateExpression:          expr.Update(),
	})
	return err
}

And it’s possible to easily remove a user from a group.

func (store OrganisationStore) RemoveUserFromGroups(organisationID, userID string, groups []string, serviceIDToGroups map[string][]string) error {
	gs := newGroupSet(groups, serviceIDToGroups)
	update := expression.Delete(expression.Name("groups"), expression.Value(gs))
	expr, err := expression.NewBuilder().
		WithUpdate(update).
		Build()
	if err != nil {
		return err
	}
	_, err = store.Client.UpdateItem(&dynamodb.UpdateItemInput{
		TableName:                 store.TableName,
		Key:                       idAndRng(newOrganisationMemberRecordHashKey(organisationID), newOrganisationMemberRecordRangeKey(userID)),
		ExpressionAttributeNames:  expr.Names(),
		ExpressionAttributeValues: expr.Values(),
		UpdateExpression:          expr.Update(),
	})
	return err
}

Deleting a user from an Organisation is simple too.

func (store OrganisationStore) RemoveUser(organisationID string, userID string) error {
	key := idAndRng(newOrganisationMemberRecordHashKey(organisationID), newOrganisationMemberRecordRangeKey(userID))
	_, err := store.Client.DeleteItem(&dynamodb.DeleteItemInput{
		TableName: store.TableName,
		Key:       key,
	})
	return err
}

See the repo at https://github.com/a-h/organisation/ for examples of integration tests of these methods, e.g. https://github.com/a-h/organisation/blob/aaf8f515a05e6a3fb2172d44710fbf96160301b6/db/organisation_test.go#L335

The GetDetails method of the Organisation store is the most complex, because it has to combine Organisation, User, Group and Service details.

This is done with the newOrganisationDetailsFromRecords method. It’s very similar to the newUserDetailsFromRecords method, but is a bit more complex due to the structure of the Organisation type.

Summary

Working with DynamoDB might mean that you need to think differently about how you structure your data, and question some of the constraints that you place on how you work with data, but the result can be a more scalable and cost effective solution.

As an index to help you find what you want to do:

Code structure

Database operations