adrianhesketh.com

Trigger Fargate tasks on S3 upload with Go CDK

If you’ve got a file processing job that might take more than the 15 minutes of time you have with AWS Lambda, or you want to use more then 10GB RAM, you’ll need to consider an alternative way to run it.

While it’s possible to break S3 files up into chunks and process them in parallel using Step Functions [0] [1], sometimes you don’t want the jobs processed as quickly as possible, or want to take on the developer complexity of learning Step Functions.

Design

As with all things AWS, there’s a lot of options.

It’s possible to start an EC2 instance to run the job, but that requires either building an operating system image containing the program I want to run, or building a user data script that downloads the program to the instance on startup.

Both of these strategies have problems - it’s quite slow to build EC2 images, and it makes startup time slow if the user data configures the server each time it starts up. In addition, I’d have to handle pushing log data to CloudWatch myself, by installing and configuring the CloudWatch Logs Agent.

AWS Batch is an AWS service that’s focussed on this type of job, but it seems designed around the concept of having worker nodes available to run tasks. There’s a fairly complicated setup required to get going if you want to set it up using infrastructure as code techniques.

I much prefer to run a Docker container in Fargate because Docker makes it easy to test locally, and is quick to make changes to, while Fargate is a way of running Docker containers without worrying about the underlying operating system.

The overall architecture of this solution requires:

  • Creating a Dockerfile containing the task to run.
  • Building the Docker file and uploading it to a Docker registry (ECR).
  • Creating a Fargate task definition to instruct it to run the Docker image.
  • Creating an S3 bucket to store files in.
  • Creating a Lambda function that’s triggered when a new file is added to S3, and runs the Fargate Task Definition.
                                             ┌──────────────┐
                                             │  Dockerfile  │
                                             └───────┬──────┘
                                                     │
                                                     ▼
                                          ┌─────────────────────┐
                                          │ Container registry  │
                                          └──────────┬──────────┘
                                                     │
                                                     ▼
┌────────┐     ┌──────┐     ┌──────────┐    ┌────────────────┐
│  File  ├────►│  S3  ├────►│  Lambda  ├───►│  Fargate task  │
└────────┘     └──────┘     └──────────┘    └────────┬───────┘
                  ▲                                  │
                  └──────────────────────────────────┘
                                Read

Project structure

  • /task - Contains the program code, and the Dockerfile for the processing task.
  • /taskrunner - Contains the program code for the Lambda function that will start the processing task.
  • /cdk - Contains all of the code to create the infrastructure.

Task

For the example, I created a “Hello, World” Go program. Outside of this example, this program might want to downloads the uploaded file from S3 using the Go AWS SDK and processes it.

package main

import "fmt"

func main() {
	fmt.Println("OK, I ran.")
}

To build the program, I configured a multi-stage Dockerfile.

The file contains a “build container” based on golang:1.17 which runs go build. This outputs an executable called app.

The file also contains a “runtime container” based on ubuntu:latest which uses the app from the build container (COPY --from=0 /src/app ./) and runs it.

Later, we’ll see that CDK can use this Dockerfile to build and upload the container to ECR.

FROM golang:1.17
WORKDIR /src/
COPY . /src/
RUN go build -o app

FROM ubuntu:latest
RUN apt-get update && apt-get install -y ca-certificates
WORKDIR /run/
COPY --from=0 /src/app ./
CMD [ "./app" ]

Task runner

The task runner Lambda function is responsible for starting the Fargate task. The API call to start a Fargate task requires a number of parameters, some of which the Lambda function handler code expects to be found in environment variables.

The required parameters include the ID of the cluster, the container to run, the definition of the task, and the VPC subnets that the task should run within.

type config struct {
	ClusterARN        string
	ContainerName     string
	TaskDefinitionARN string
	Subnets           []string
	S3Bucket          string
	IsValid           bool
}

The Lambda function handler is configured to expect to receive S3 events, and calls the ECS RunTask API to start the Fargate task for each S3 event it receives.

I’ve configured the task to run in a public subnet so that I don’t end up paying for NAT gateways in my VPC, but your requirements here might be different.

func handler(ctx context.Context, s3Event events.S3Event) (err error) {
	logger.Info("Starting...")
	svc := ecs.New(session.New())
	for i, record := range s3Event.Records {
		s3 := record.S3
		logger.Info("Processing File", zap.Int("index", i), zap.Int("count", len(s3Event.Records)), zap.String("bucketName", s3.Bucket.Name), zap.String("objectKey", s3.Object.Key))
		input := &ecs.RunTaskInput{
			Cluster:        &c.ClusterARN,
			TaskDefinition: &c.TaskDefinitionARN,
			NetworkConfiguration: &ecs.NetworkConfiguration{
				AwsvpcConfiguration: &ecs.AwsVpcConfiguration{
					// Set to true if in the public subnet so that the container can be downloaded.
					AssignPublicIp: aws.String(ecs.AssignPublicIpEnabled),
					Subnets:        aws.StringSlice(c.Subnets),
				},
			},
			Overrides: &ecs.TaskOverride{
				ContainerOverrides: []*ecs.ContainerOverride{{
					Name: &c.ContainerName,
					Environment: []*ecs.KeyValuePair{
						{Name: aws.String("S3_KEY"), Value: &s3.Object.Key},
						{Name: aws.String("S3_BUCKET"), Value: &c.S3Bucket},
					},
				}},
			},
			LaunchType: aws.String(ecs.LaunchTypeFargate),
		}

		res, err := svc.RunTask(input)
		if err != nil {
			logger.Error("Failed to run task", zap.Error(err))
			return err
		}
		for _, task := range res.Tasks {
			logger.Info("Started task", zap.String("taskId", *task.TaskArn))
		}
	}
	return
}

CDK code

With the task code, and the task starter code in place, the components can be connected with CDK.

Source data bucket

The bucket that will receive files will later be configured to trigger the Lambda function to fire, but for now, it’s just a basic bucket.

sourceBucket := awss3.NewBucket(stack, jsii.String("sourceBucket"), &awss3.BucketProps{})
sourceBucket.DisallowPublicAccess()

Networking

ECS tasks need to be ran inside a VPC, so one needs to be created.

I’ve chosen to run everything in the public subnets to save on costs, because if you run in private subnets, you’ll need to use NAT gateways to access the Internet, or configure a VPC endpoint to enable access to download the ECR images.

vpc := awsec2.NewVpc(stack, jsii.String("taskVpc"), &awsec2.VpcProps{
	// If you're setting up NAT gateways, you might want to drop to 2 to save a few pounds.
	MaxAzs: jsii.Number(2),
	// If NatGateways are available, we can host in any subnet.
	// But they're a waste of money for this.
	// I'll host them in the public subnet instead.
	NatGateways: jsii.Number(0),
})

ECS cluster

The ECS cluster itself has a required parameter for the VPC it will run tasks within. There’s no servers in the cluster, since it’s going to run on-demand Fargate tasks which shut themselves down after completion.

cluster := awsecs.NewCluster(stack, jsii.String("ecsCluster"), &awsecs.ClusterProps{
	Vpc: vpc,
})

Task definition

Two IAM roles are required to be able to create an ECS task.

Task execution role

The first role required to create an ECS task is a task execution role (ter). This is used to start the task, and needs to have permission to download the containers from ECR, and write to logs.

There’s a managed task execution role that could be used, but the CDK type doesn’t have the handy GrantPassRole helper on it, so I recreated it using the documentation at [2] instead.

ter := awsiam.NewRole(stack, jsii.String("taskExecutionRole"), &awsiam.RoleProps{
	AssumedBy: awsiam.NewServicePrincipal(jsii.String("ecs-tasks.amazonaws.com"), &awsiam.ServicePrincipalOpts{}),
})
ter.AddToPolicy(awsiam.NewPolicyStatement(&awsiam.PolicyStatementProps{
	Actions:   jsii.Strings("ecr:BatchCheckLayerAvailability", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage", "logs:CreateLogStream", "logs:PutLogEvents", "ecr:GetAuthorizationToken"),
	Resources: jsii.Strings("*"),
}))

Task role

The second role required to create an ECS task is the task role (tr). This role is used by the container when it’s executing to access AWS resources.

The role needs permission to allow stdout and stderr to be written to CloudWatch Logs, and I’ve also given it permission to read from the source bucket with sourceBucket.GrantRead(tr, nil) so that the task can read the file that triggered the event.

tr := awsiam.NewRole(stack, jsii.String("taskRole"), &awsiam.RoleProps{
	AssumedBy: awsiam.NewServicePrincipal(jsii.String("ecs-tasks.amazonaws.com"), &awsiam.ServicePrincipalOpts{}),
})
tr.AddToPolicy(awsiam.NewPolicyStatement(&awsiam.PolicyStatementProps{
	Actions:   jsii.Strings("logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"),
	Resources: jsii.Strings("*"),
}))
sourceBucket.GrantRead(tr, nil)

Task definition implementation

With that out of the way, it’s possible to define the task. The first part defines the roles that are used, and how much RAM and CPU the task is allocated. There are limited options for the RAM and CPU values, so you’ll need to refer to the documentation at [3] to choose valid values.

td := awsecs.NewFargateTaskDefinition(stack, jsii.String("taskDefinition"), &awsecs.FargateTaskDefinitionProps{
	MemoryLimitMiB: jsii.Number(512),
	Cpu:            jsii.Number(256),
	ExecutionRole:  ter,
	TaskRole:       tr,
})

Task Docker container

The task definition needs to be given a Docker conatiner that it should run.

The awsecs.AssetImage_FromAsset function call in CDK builds the Dockerfile, creates an ECR repository in AWS, and uploads the build to it. So simple.

taskContainer := td.AddContainer(jsii.String("taskContainer"), &awsecs.ContainerDefinitionOptions{
	Image: awsecs.AssetImage_FromAsset(jsii.String("../task"), &awsecs.AssetImageProps{}),
	Logging: awsecs.LogDriver_AwsLogs(&awsecs.AwsLogDriverProps{
		StreamPrefix: jsii.String("task"),
	}),
})

Task starter Lambda function

To start the ECS task when the file hits the S3 bucket, a Lambda function needs to be configured to run the Lambda handler code we defined earlier.

Task starter IAM role

The Lambda function needs some permission to run the ECS task. There’s a built-in policy for Lambda called service-role/AWSLambdaBasicExecutionRole which we can use as a base. This gives the Lambda function ability to write to CloudWatch Logs, for example.

taskStarterRole := awsiam.NewRole(stack, jsii.String("taskStarterRole"), &awsiam.RoleProps{
	AssumedBy: awsiam.NewServicePrincipal(jsii.String("lambda.amazonaws.com"), &awsiam.ServicePrincipalOpts{}),
})
taskStarterRole.AddManagedPolicy(awsiam.ManagedPolicy_FromAwsManagedPolicyName(jsii.String("service-role/AWSLambdaBasicExecutionRole")))

With the basic permissions in place, we can give the role permission to run the ECS task on the cluster.

taskStarterRole.AddToPolicy(awsiam.NewPolicyStatement(&awsiam.PolicyStatementProps{
	Actions:   jsii.Strings("ecs:RunTask"),
	Resources: jsii.Strings(*cluster.ClusterArn(), *td.TaskDefinitionArn()),
}))

This is the tricky bit. The Lambda permission needs permission to run the task using the specific “task execution role” and “rask role” that we defined earlier. This is done by granting PassRole to the Lambda function role to those tasks.

td.ExecutionRole().GrantPassRole(taskStarterRole)
td.TaskRole().GrantPassRole(taskStarterRole)

Task starter Lambda function

Now CDK can be used to build the taskrunner Lambda handler Go code, zip it up and create a Lambda function.

I’ve given the Lambda function 512MB of RAM, but it would happily run with 128MB because the function code is just making a single AWS API call to start the ECS task. It’s the ECS task that will do all the hard work.

The Lambda handler code expects some environment variables containing configuration to be present, so these are populated by the CDK code.

taskStarter := awslambdago.NewGoFunction(stack, jsii.String("taskStarter"), &awslambdago.GoFunctionProps{
	Runtime: awslambda.Runtime_GO_1_X(),
	Entry:   jsii.String("../taskrunner"),
	Bundling: &awslambdago.BundlingOptions{
		GoBuildFlags: &[]*string{jsii.String(`-ldflags "-s -w"`)},
	},
	Environment: &map[string]*string{
		"CLUSTER_ARN":         cluster.ClusterArn(),
		"CONTAINER_NAME":      taskContainer.ContainerName(),
		"TASK_DEFINITION_ARN": td.TaskDefinitionArn(),
		"SUBNETS":             jsii.String(strings.Join(*getSubnetIDs(vpc.PublicSubnets()), ",")),
		"S3_BUCKET":           sourceBucket.BucketName(),
	},
	MemorySize: jsii.Number(512),
	Role:       taskStarterRole,
	Timeout:    awscdk.Duration_Millis(jsii.Number(60000)),
})

Lambda trigger

Finally, the Lambda function can be configured to fire when an object is added to the S3 bucket.

taskStarter.AddEventSource(awslambdaeventsources.NewS3EventSource(sourceBucket, &awslambdaeventsources.S3EventSourceProps{
	Events: &[]awss3.EventType{
		awss3.EventType_OBJECT_CREATED,
	},
}))

Summary

Lambda’s time and RAM limitations mean that sometimes you might find it easier to use a Serverless container environment to run one-off processing jobs instead of using Lambda.

A good way of doing this is to use Fargate tasks, which can be used to spin up ephemeral containers and execute your code.

The code is available over at: