adrianhesketh.com

Setting up AppSync GraphQL subscriptions with TypeScript and CDK

I’ve been working on some systems that would benefit from pushing realtime data to clients via Websockets. I tried out using Web Sockets in API Gateway, which was really easy to set up, but the service doesn’t handle subscription management, it’s up to you to track connections by their ID, what each connection is subscribed to, and then publish the message to all of the subscribed connections.

GraphQL Subscriptions are supported in AppSync, but I’d never used them, so I decided to give them a go. I’m not keen on using templating languages like VTL, so I originally avoided AppSync, but now that Lambda is supported for resolvers, I thought I’d give it another shot.

Schema

To start with, I created a GraphQL schema for the API. GraphQL has the nice property that the API is defined via a small schema language, I like this because the types can be generated automatically, and it also allows for introspection and discoverability of APIs without having to use something like Swagger.

All of the queries are within the Query element, there’s only one, the messages query. Following the messages query are some AppSync specific decorators (@aws_api_key and @aws_iam) that define who’s allowed to access that API. In my case, I’ve made it possible for both IAM access (i.e. granted within the AWS world, e.g. a Lambda function can be given permission to make the call) and API Key access which is used by clients (e.g. Web applications) that don’t have IAM roles.

type Query {
  messages: [Message!]! @aws_api_key @aws_iam
}

The messages query returns an array of non-null Message types. The exclamation point marks the values as non-nullable.

Return types are defined by the type keyword. The fields on each type and their corresponding types are defined in the schema.

type Message @aws_api_key @aws_iam {
  topic: ID
  text: String!
}

Input types are structures that make up the input to a GraphQL function. They’re defined with the input keyword, but other than that look exactly like a type. It’s an oddity of GraphQL that you can’t use an input type as a return type, or vice versa.

input SendMessageInput {
  topic: ID!
  text: String!
}

With the type defined, I can set it as an input to a function within a “mutation”. A mutation is a function that changes (mutates) data. In GraphQL, all requests are a HTTP POST operation so the HTTP verb doesn’t differentiate whether something will modify data or not. You’ll notice that here, only AWS IAM users that I’ve given permission to can call this mutation. That means that end users can’t send a message, messages can only be sent from a Lambda function I’m going to create that has IAM permission to access this mutation.

type Mutation {
  sendMessage(message: SendMessageInput!): Message @aws_iam
}

GraphQL supports subscriptions over web sockets. API Gateway supports subscriptions too, but you have to keep track of the connection ID and do your own routing. This is fine, and is actually a lot simpler conceptually, but the promise of AppSync is that it will do all of the connection management and routing for you.

Note the @aws_subscribe attribute. This tells AppSync to send messages to anyone that has subscribed to onSendMessage to receive a message when a sendMessage mutation is executed. It basically copies the output of the mutation over to the subscribers, with optional basic filtering (e.g. by the topic).

type Subscription {
  onSendMessage(topic: ID): Message @aws_subscribe(mutations: ["sendMessage"])
}

The last part of the schema just defines the sections.

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}

Type generation

There’s a great NPM package called graphql-codegen [0] that can build types for your programming language based on the GraphQL schema. I’ve used that to generate TypeScript that correspond to the GraphQL schema.

Dropping a codegen.yml file into the root and running npx graphql-codegen --config codegen.yml generates a file with all of the types in.

overwrite: true
schema: ./graphql/schema.graphql
documents: null
generates:
  graphql/types.ts:
    config:
      withHooks: true
    plugins:
      # Create basic TypeScript types.
      - typescript

So, the Message GraphQL type becomes this TypeScript type. It’s able to handle nullable types and all that sort of thing too.

export type Message = {
  __typename?: "Message";
  topic: Scalars["ID"];
  text: Scalars["String"];
};

CDK Setup

I’ve switched from Serverless Framework being my go-to tool over to CDK, mostly around the time that the TypeScript template became a complicated thing and they switched over to serverless.ts instead of serverless.yaml. While I can see the point of migrating toserverless.ts, it made a bunch of learning materials and examples semi-obsolete.

CDK lacks a few CLI niceities, like tailing logs and invoking Lambda functions (I use awslogs / AWS CLI instead), but it makes up for it with the higher level “constructs” that are much less verbose, and more discoverable than CloudFormation. The CDK constructs are useful and have sensible defaults, so I’ve found it to be fairly painless overall.

To define an AppSync API, requires a bit of code like this. I’ve set it up to use API_KEY authentication and authorization by default, with IAM being available too.

IAM authentication and authorization enables IAM permissions to be used to access the GraphQL API instead of the API key, which is ideal for Lambda functions, or EC2 instances to use to access privileged / admin sections of the API.

Note how I’ve used the same schema file to generate types and to define the API structure within AppSync.

const api = new appsync.GraphqlApi(this, "subscriptionApi", {
  name: "subscriptionApi",
  schema: appsync.Schema.fromAsset(
    path.join(__dirname, "../graphql/schema.graphql")
  ),
  authorizationConfig: {
    defaultAuthorization: {
      authorizationType: appsync.AuthorizationType.API_KEY,
    },
    additionalAuthorizationModes: [
      { authorizationType: appsync.AuthorizationType.IAM },
    ],
  },
  xrayEnabled: true,
});
// print out the AppSync GraphQL endpoint to the terminal
new cdk.CfnOutput(this, "GraphQLAPIURL", {
  value: api.graphqlUrl,
});
// print out the AppSync API Key to the terminal
new cdk.CfnOutput(this, "GraphQLAPIKey", {
  value: api.apiKey || "",
});
// print out the stack region
new cdk.CfnOutput(this, "Stack Region", {
  value: this.region,
});

With that in place, I can wire up Lambda functions to handle the queries, mutations and subscriptions.

lambdaNode.NodejsFunction comes from the @aws-cdk/aws-lambda-nodejs package. The standard Lambda function package only handles JavaScript Lambda functions and doesn’t do any bundling of dependenices, whereas the aws-lambda-nodejs package uses esbuild [1] to transpile the TypeScript into JavaScript and bundle everything together.

ESBuild is like Webpack, but is about 10x faster on my builds. The package will use esbuild if you have it installed, and if you don’t, it will try and use a Docker image to do it, quite a nice experience really.

// Queries.
const queryMessages = new lambdaNode.NodejsFunction(this, "queryMessages", {
  runtime: lambda.Runtime.NODEJS_12_X,
  entry: path.join(__dirname, "../handlers/graphql/query/messages.ts"),
  handler: "handler",
  memorySize: 1024,
  tracing: lambda.Tracing.ACTIVE,
});

With a Lambda function ready to go, I can add a “data source” to handle the Query / messages operation.

This little bit of mapping results in the messages query executing my Lambda function. It’s just set up to return mock data for now, but it’s remarkably easy to do.

api.addLambdaDataSource("queryMessagesDS", queryMessages).createResolver({
  typeName: "Query",
  fieldName: "messages",
});

My code to handle it looks like this. Note how I’m able to use the autogenerated TypeScript types as the return value. I like that a lot.

import { AppSyncResolverEvent } from "aws-lambda";
import { Message } from "../../../graphql/types";

export const handler = async (
  event: AppSyncResolverEvent<Array<Message>>
): Promise<Array<Message>> => {
  console.log(JSON.stringify(event));

  return new Array<Message>(
    {
      topic: "test_topic_1",
      text: "text1",
    },
    {
      topic: "test_topic_2",
      text: "text2",
    }
  );
};

Subscription handling

I think this was the hardest thing to set up. I found the AppSync documentation fairly hard to understand, and searching around on the Web blurs the boundaries beteween Amplify (which I’m not a fan of), and AppSync.

I don’t like how the AWS examples get you to use the console, and focus on VTL. The examples lead you down a path of copy/paste. At the end of it, you have something that works, but you don’t really understand why and how you’d fix it if it was broken. Using VTL is a deal-breaker for me, I don’t to ask a team to mess around with VTL, life is too short.

Using TypeScript for front-end (React etc.), backend (Lambda functions) and infrastructure (CDK) reduces the amount of stuff a team needs to learn overall, I don’t want to add tools or techniques unless they really pay for themselves.

I ran into problems with the AWS Console silently hiding errors with subscriptions (you can see a report of that in this thread about OIDC [2] and I spent a long time trying to get the magic incantations right that would enable subscriptions in the first place [3]. I think I wasted a day banging my head against the problem.

Ultimately, I found that you can only get basic subscription filtering to work with AWS Lambda. When a GraphQL client subscribes, it hits the subscription endpoint to register its subscription, so I wired up a Lambda to the onSendMessage function of my GraphQL schema using CDK.

// Subscriptions.
const onSendMessage = new lambdaNode.NodejsFunction(this, "onSendMessage", {
  runtime: lambda.Runtime.NODEJS_12_X,
  entry: path.join(
    __dirname,
    "../handlers/graphql/subscription/onSendMessage.ts"
  ),
  handler: "handler",
  memorySize: 1024,
  tracing: lambda.Tracing.ACTIVE,
});

api.addLambdaDataSource("onSendMessageDS", onSendMessage).createResolver({
  typeName: "Subscription",
  fieldName: "onSendMessage",
});

Then I set to writing the code. With trial and error and wondering why things didn’t work, I found that the onSendMessage Lambda function is only called once, when each client initiates its subscription is initiated. It has to return something that is used by AppSync to filter the data that is sent to subscribers. I’ll explain what I think is going on - the docs require careful reading and contain a lot of irrelevant information for most people (you’re probably not using MQTT).

This bit is key:

However, if the mutation lacked the author in its selection set, the subscriber would get a null value for the author field (or an error in case the author field is defined as required/not-null in the schema).

That means that if your thing that’s executing mutations doesn’t request all of the fields that the subscription supports to be returned when it executes a mutation, your subscriber will get “null” values for those fields. Actually, it’s worse than that, because if you define your schema to have a non-null field, then AppSync will throw errors with little information about what’s going on.

My understanding of this is that the output fields of the mutation are put through basic filtering, then sent directly to subscribers.

So, in this case, the subscription is onSendMesssage(topic: ID): Message!. The arguments are used to filter, so the topic argument must be returned in the output object to enable equality filtering on this value. If you don’t have a topic field in the output, you’ll get nothing in your subscription, and be very confused.

export const handler: AppSyncResolverHandler<
  SubscriptionOnSendMessageArgs,
  Message
> = async (
  event: AppSyncResolverEvent<SubscriptionOnSendMessageArgs>
): Promise<Message> => {
  log.debug("received identity", event.identity);
  return {
    topic: event.arguments.topic,
    text: "",
  };
};

To subscribe to all topics, the topic field in the return type must be set to null.

The “text” field that’s also returned by the Lambda function will not affect the filtering, because there’s no matching “text” input argument. It’s actually safe to set it to anything, it doesn’t have to be null etc.

The return type of the subscription must match the schema of the mutation. The docs tell you this, but don’t tell you what happens if you don’t (problems with null values, errors with mapping). It’s especially annoying that you get an error at runtime, not compile time.

Error: {
 "errors": [
     {
	 "message": "Connection failed: {\"errors\":[{\"errorType\":\"MessageProcessingError\",\"message\":\"There was an error processing the message\"}]}"
     }
 ]
}

The line of the docs that details this is:

The return type of a subscription field in your schema must match the return type of the corresponding mutation field. In the previous example, this was shown as both addPost and addedPost returned as a type of Post.

To stop someone from subscribing to something that they shouldn’t have access to, you can design your subscription to use the event.identity details in the filter, you can throw an error within the subscription Lambda, or set the filter field to a string that will never be used if they’re attempting to subscribe to something that they’re not allowed to.

If the mutation is just there to send messages to subscribers, it doesn’t actually need to do anything except return the published data. Of course, the mutation could also store the data in a database:

import { AppSyncResolverEvent } from "aws-lambda";
import { MutationSendMessageArgs, Message } from "../../../graphql/types";

export const handler = async (
  event: AppSyncResolverEvent<MutationSendMessageArgs>
): Promise<Message> => {
  return {
    topic: event.arguments.message.topic,
    text: event.arguments.message.text,
  };
};

The next challenge was actually testing it. The AWS console does work for API Key console subscriptions, so I was able to use that to get a subscription going, but I needed to get a mutation to trigger the subscription.

Executing a mutation from Lambda

So, I wanted to use IAM permisisons to execute the Lambda. The AWS documentation has an example which doesn’t use authentication [4] so wasn’t really much use. It would allow any user to send subscriptions to everyone, when I want them to only come from the server-side code only.

I found an example on a great blog that I referred to a lot, but it seemed quite complicated [5]

I couldn’t believe that was the simplest way to get it done. The AWS documentation does have an example of triggering a mutation from a Node.js client app that I could convert to TypeScript at [6] so I tried that out. It does have instructions on how to make it work on a Lambda, but it just didn’t work. The AWSAppSyncClient is actually a wrapper around the Apollo client, I tried all sorts of configurations, but I got error after error.

Reading through the code of these clients, I couldn’t believe the complexity of what I was reading just to send a signed HTTP POST, and I thought about just writing it myself (as I’ve done in Go for the IoT service before), but I finally found a nicely written package [7] that just worked. I’ve fed back to AWS that the docs need updating, because I wasted ages on this.

With that in place, I was able to create a simple Lambda that’s designed to be triggered from the console, or from the CLI with a payload. I just need to hook it up to an EventBridge event, SQS queue, Kinesis stream or something, to enable all of the subscribers to receive realtime updates.

import { Context } from "aws-lambda";
import gql from "graphql-tag";
import { ExecutionResult } from "graphql";
import { Message, SendMessageInput } from "../../graphql/types";
import log from "../../log/";
import AppsyncClient from "appsync-client";

export const handler = async (
  event: Message,
  _context: Context
): Promise<void> => {
  if (!process.env.APPSYNC_ENDPOINT_URL || !process.env.APPSYNC_REGION) {
    throw new Error(
      "must set the APPSYNC_ENDPOINT_URL and APPSYNC_REGION environment variables"
    );
  }
  log.debug("sending request", {
    endpoint: process.env.APPSYNC_ENDPOINT_URL,
    region: process.env.APPSYNC_REGION,
  });
  const message: SendMessageInput = {
    topic: event.topic,
    text: event.text,
  };
  await sendMessage(
    process.env.APPSYNC_ENDPOINT_URL,
    process.env.APPSYNC_REGION,
    message
  );
};

const SEND_MESSAGE = gql(`
mutation sendMessage($message: SendMessageInput!) {
  sendMessage(message: $message) {
    topic
    text
  }
}`);

const sendMessage = async (
  endpoint: string,
  region: string,
  message: SendMessageInput
): Promise<ExecutionResult<Message>> => {
  log.debug("creating client", { endpoint, region });
  const client = new AppsyncClient({
    apiUrl: endpoint,
  });
  log.debug("executing request", { endpoint, region });
  return await client.request({
    query: SEND_MESSAGE,
    variables: {
      message,
    },
  });
};

To grant the Lambda function access to the mutation, I needed to use the CDK again. The GraphQL API defined as the api variable has a grant function on it which allows IAM permissions to be granted.

const lambdaMessageSender = new lambdaNode.NodejsFunction(
  this,
  "lambdaMessageSender",
  {
    runtime: lambda.Runtime.NODEJS_12_X,
    entry: path.join(__dirname, "../handlers/none/sendMessage.ts"),
    handler: "handler",
    memorySize: 1024,
    tracing: lambda.Tracing.ACTIVE,
    timeout: cdk.Duration.seconds(20),
    environment: {
      APPSYNC_REGION: this.region,
      APPSYNC_ENDPOINT_URL: api.graphqlUrl,
    },
  }
);
api.grant(
  lambdaMessageSender,
  appsync.IamResource.custom("types/Mutation/fields/sendMessage"),
  "appsync:GraphQL"
);
api.grant(
  lambdaMessageSender,
  appsync.IamResource.custom("types/Mutation/types/Message"),
  "appsync:GraphQL"
);

With all of this in place, I could subscribe through the AppSync console, and send a message via Lambda using the CLI. Here, my IAM permissions allow me to invoke the Lambda, and the Lambda’s IAM permissions allow it to execute the GraphQL sendMessage mutation.

aws lambda invoke --cli-binary-format raw-in-base64-out --function-name SubscriptionStack-lambdaMessageSenderEA11D641-TNPVXWLEWXLQ --payload '{ "topic": "a", "text": "payload" }' /dev/stdout

Next

Next, I’m converting the GraphQL API to use OIDC, so that I can use Google-issued JWTs authentication for the service. So far, I’ve ran into a few problems. The out-and-out bug in the console I mentioned earlier, a bit of hassle getting hold of JWT for testing, and a lack of good tools for testing GraphQL subscriptions. I think there’s a bug in AppSync that subscriptions just don’t work with OIDC, so I’ve reported my investigation at [8] but I might just be missing something basic.

Summary

The full example is available at [9] hopefully it will save you some time.

Overall, the experience of getting into AppSync subscriptions has left me a little wary of it. It’s a little opaque and I don’t feel like I have much trust in the service based on some of the difficulties I had troubleshooting it. I love the schema-first approach, and the ease of getting Lambda-based queries and mutations up and running, but the experience for subscriptions was pretty rough, there’s a few sharp edges you can waste a lot of time on.

One alternative is to use API Gateway Web Sockets, but the problem with API Gateway is that you have to manage subscriptions yourself, keeping track of connection IDs, and sending data to each subscriber yourself. What makes this worse, is that unfathomably, the API endpoint only supports sending to a single connection at a time:

const apigatewaymanagementapi = new ApiGatewayManagementApi({
  apiVersion: "2018-11-29",
  endpoint: "https://" + event.hostAndPath,
});
await apigatewaymanagementapi
  .postToConnection({
    ConnectionId: event.connectionId,
    Data: event.message,
  })
  .promise();

This means that if I have 1000 subscribers to the same event, I’d need to look that up in a DB, then make 1000 HTTP requests. To do this reliably, I’d have to introduce a step function (to ensure I’m able to page through all the subscribers in that DB query), a queue or parallel execution in my step function (probably a queue, so I can use the SQS batching) to make sure that all of the subscribers get notified, and a Lambda function to process it all. Quite a lot of work. I know some teams are using AWS IoT for this sort of thing, because MQTT has subscription support, but I don’t think I want to go down that route.

AppSync has quite a learning curve, and I find the documentation and examples a little lacking, so I’m still researching whether I want to commit to it for the project I’m evaluating it for, or if I want to use API Gateway Web Sockets and do it myself so that I can have complete control over the authentication, subscription filtering, and be able to monitor the latency closely. It might come down to building a PoC of the the WebSocket connection Step Function, it won’t be very different to my Step Function for DynamoDB imports after all.