eliasbrange.dev
Test Event-Driven Architectures with EventBridge and AppSync Events

Test Event-Driven Architectures with EventBridge and AppSync Events

Testing Event-Driven applications is difficult. Last year, I wrote a piece on testing that your application(s) produce the events you expect, using Momento Topics. At the time, the setup was a lot simpler than what was possible with AppSync.

AWS introduced AppSync Events a couple of weeks back. When reading the announcement, I immediately thought of my old blog post. I hoped that AppSync Events could be used to build a solution that’s as simple as the previous one, using only AWS services.

In this post, I will show you how to set up the required resources to test your event-producing application(s) end-to-end.

TLDR; Show me the code!

You can find the sample application and a step-by-step guide on GitHub.

AppSync Events

In the release post, AWS describes AppSync Events as follows:

Today, AWS AppSync announced AWS AppSync Events, a feature that lets developers easily broadcast real-time event data to a few or millions of subscribers using secure and performant serverless WebSocket APIs. With AWS AppSync Events, developers no longer have to worry about building WebSocket infrastructure, managing connection state, and implementing fan-out. Developers simply create their API, and publish events that are broadcast to clients subscribed over a WebSocket connection. AWS AppSync Event APIs are serverless, so you can get started quickly, your APIs automatically scale, and you only pay for what you use.

Compared to earlier alternatives, such as AppSync GraphQL subscriptions and API Gateway WebSocket API, AppSync Events are much simpler to set up. You only have to create an API, define configuration such as default authorization, and create a channel namespace. Then, you can start publishing and subscribing to channels in the namespace.

We will only use the basic features of AppSync Events in this post. We will create a single namespace and protect it using API_KEY authorization.

For the curious, there are more advanced features available, such as per-namespace authorization and custom handlers for onPublish and onSubscribe

Sample Application

The sample application is very similar to the application in the previous post. It’s a simple Order Service, consisting of an API Gateway with a single POST /event endpoint that triggers a Lambda function. This function generates a new order item and publishes an OrderCreated event to EventBridge.

The simple application architecture.
The simple application architecture.

To recap, the Lambda function looks like this:

import { EventBridgeClient, PutEventsCommand } from "@aws-sdk/client-eventbridge";
import type { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from "aws-lambda";
import { ulid } from "ulid";
const ebClient = new EventBridgeClient();
const EVENT_BUS_NAME = process.env.EVENT_BUS_NAME || "default";
export const handler = async (_event: APIGatewayProxyEventV2): Promise<APIGatewayProxyResultV2> => {
const order = {
id: ulid(),
name: "test order",
};
await ebClient.send(
new PutEventsCommand({
Entries: [
{
EventBusName: EVENT_BUS_NAME,
Source: "OrderService",
DetailType: "OrderCreated",
Detail: JSON.stringify(order),
},
],
}),
);
return {
statusCode: 201,
body: JSON.stringify(order),
};
};

When a POST /event request is sent to the API Gateway, this handler is invoked, publishing an event with the following structure to EventBridge:

{
"version": "0",
"id": "11111111-2222-4444-5555-666666666666",
"detail-type": "OrderCreated",
"source": "OrderService",
"account": "123456789012",
"time": "2023-10-19T08:00:00Z",
"region": "eu-west-1",
"resources": [],
"detail": {
"id": "01F9ZQZJZJZJZJZJZJZJZJZJZJ",
"name": "test order"
}
}

The test setup will also be very similar to before. But let’s hold off on and start with the necessary test resources.

Test infrastructure

Last time, I used another SAM template to separate the application infrastructure from the test infrastructure. To add some variation, I’m using CDK this time. I decided to create a custom construct called TestResources. I also added a stack property to conditionally deploy the resources in the environments where it makes sense:

interface StackProps extends cdk.StackProps {
includeTestStack?: boolean;
}
export class EventbridgeTestingWithAppsyncEventsStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// Sample application resources
const eventBus = ...
const apiFn = ...
const api = ...
// Test resources
if (props?.includeTestStack) {
const testResources = new TestResources(this, "TestResources", {
eventBus,
});
}
}
}

So what test resources do we need? Let’s start with the obvious one, an AppSync Events API.

AppSync Events API

The core component of the test infrastructure is the AppSync Events API. For this demo, I will create an API with a namespace called default and use an API key for authorization. At the time of writing, there are still no L2 constructs for AppSync Events, so we’ll have to do with L1 constructs for now.

interface ConstructProps {
eventBus: eventbridge.IEventBus;
}
export class TestResources extends Construct {
public eventsApi: appsync.CfnApi;
constructor(scope: Construct, id: string, props: ConstructProps) {
super(scope, id);
this.eventsApi = new appsync.CfnApi(this, "TestEventsApi", {
name: "TestEventsApi",
eventConfig: {
authProviders: [{ authType: "API_KEY" }],
connectionAuthModes: [{ authType: "API_KEY" }],
defaultPublishAuthModes: [{ authType: "API_KEY" }],
defaultSubscribeAuthModes: [{ authType: "API_KEY" }],
},
});
const eventsApiKey = new appsync.CfnApiKey(this, "TestEventsApiKey", {
apiId: this.eventsApi.attrApiId,
expires: Math.floor(Date.now() / 1000) + 60 * 60 * 24 * 365, // 1 year from now
});
new appsync.CfnChannelNamespace(this, "TestEventsNamespace", {
apiId: this.eventsApi.attrApiId,
name: "default",
});
}

The above creates an AppSync Events API with API_KEY authorization. I’m setting the API key’s expiration to a year from now during each deployment for demo purposes. In a real-world scenario, you’d want to manage the API key’s lifecycle more carefully.

Attempt #1: EventBridge API Destination

EventBridge to AppSync via API Destination.
EventBridge to AppSync via API Destination.

The release post included a section on how to integrate EventBridge and AppSync Events using API destinations. In the example, an event with the following detail is used:

{
"channel": "/default/introductions",
"events": ["{\"message\":\"Hello from EventBridge!\"}"]
}

In my case, I want to send arbitrary events (such as OrderCreated) to a specific channel. With Momento, this was as straightforward as setting the topic name as part of the URL in the API destination and sending the event payload in the body.

The request format in AppSync Events is a bit different. AppSync expects the payload to contain both the destination channel and up to five events. Each event must be stringified JSON, like {\"message\":\"Hello from EventBridge!\"} above. Sadly, this request schema makes it impossible to directly integrate EventBridge and AppSync Events without having something in between.

For my first attempt, I’d assume that I could use a template like this:

{
"channel": "/namespace/channel",
"events": [
<detail>
]
}

However, this produces the following output:

{
"channel": "/namespace/channel",
"events": [
{
"id": "01JDSQKGFAHQMH866CARE8F9JC",
"name": "test order"
}
]
}

I need the event to be stringified, but there’s no way to do that with EventBridge input transformation. That AWS chose a request schema for AppSync that cannot be used this way with API destinations is a blunder, to be frank.

The integration only works if the EventBridge publisher is aware of the AppSync namespace and channel and can format the event accordingly. At that point, the publisher might as well publish to the AppSync Events API directly.

We require something between EventBridge and AppSync, but what?

Attempt #2: Lambda Function

EventBridge to AppSync via Lambda.
EventBridge to AppSync via Lambda.

After wrangling with API destinations for far too long, I took the easy way out. I wired an EventBridge rule to a basic Lambda function:

export const handler = async (event: EventBridgeEvent<string, unknown>): Promise<void> => {
await fetch(`${EVENTS_API_URL}/event`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": EVENTS_API_KEY,
},
body: JSON.stringify({
channel: "/default/debug",
events: [JSON.stringify(event)],
}),
});
};

This works and since the function is only used for testing purposes, cold starts aren’t a problem. But, it feels wrong to have to do this for such a simple transformation. Let’s try another way.

Attempt #3: Express Step Function

EventBridge to AppSync via Step Functions.
EventBridge to AppSync via Step Functions.

The only thing we need is to transform the input and send it to the AppSync Events API. This sounds like the perfect job for a Step Function. I will create an express state machine to transform the payload with JSONata and send it to AppSync using an HTTP task.

Let’s extend our TestResources construct. First, create a new EventBridge connection that the HTTP task can use:

const connection = new eventbridge.Connection(this, "TestEventsConnection", {
authorization: eventbridge.Authorization.apiKey(
"x-api-key",
cdk.SecretValue.resourceAttribute(eventsApiKey.attrApiKey),
),
});

Then, add the state machine. In this case, there is only one step, so I’ll inline the state machine definition. I’d recommend keeping it in a separate .yml file for more complex state machines.

const stateMachine = new sfn.StateMachine(this, "TestEventsStateMachine", {
stateMachineType: sfn.StateMachineType.EXPRESS,
logs: {
destination: new logs.LogGroup(this, "TestEventsStateMachineLogGroup", {
logGroupName: "/aws/vendedlogs/states/TestEventsStateMachine",
removalPolicy: cdk.RemovalPolicy.DESTROY,
}),
includeExecutionData: true,
level: sfn.LogLevel.ALL,
},
definitionBody: sfn.DefinitionBody.fromString(
JSON.stringify({
QueryLanguage: "JSONata",
StartAt: "PublishTestEvent",
States: {
PublishTestEvent: {
Type: "Task",
Resource: "arn:aws:states:::http:invoke",
Arguments: {
Method: "POST",
Authentication: {
ConnectionArn: "${ConnectionArn}",
},
ApiEndpoint: "${ApiEndpoint}",
RequestBody: {
channel: "/default/debug",
events: ["{% $string($states.input) %}"],
},
},
End: true,
},
},
}),
),
definitionSubstitutions: {
ConnectionArn: connection.connectionArn,
ApiEndpoint: `https://${this.eventsApi.attrDnsHttp}/event`,
},
});

A lot is happening here. Let’s walk through it.

I’m setting the workflow type to express. Express workflows are ideal for short-running jobs such as this. I’m creating a log group and configuring logging on the state machine to be able to view executions in the console.

The state machine comprises a single HTTP task that uses the EventBridge connection created earlier to send requests to AppSync. The state input is transformed into the format AppSync expects with a little help from the JSONata $string function.

Next, I add a couple of permissions to the state machine:

stateMachine.role.attachInlinePolicy(
new iam.Policy(this, "StateMachinePolicy", {
statements: [
new iam.PolicyStatement({
sid: "AllowUseConnection",
effect: iam.Effect.ALLOW,
actions: ["events:RetrieveConnectionCredentials"],
resources: [connection.connectionArn],
}),
new iam.PolicyStatement({
sid: "AllowGetConnectionSecret",
effect: iam.Effect.ALLOW,
actions: ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
resources: [connection.connectionSecretArn],
}),
new iam.PolicyStatement({
sid: "AllowInvokeHTTPEndpoint",
effect: iam.Effect.ALLOW,
actions: ["states:InvokeHTTPEndpoint"],
resources: ["*"],
}),
],
}),
);

Finally, I glue everything together by creating an EventBridge rule that forwards events from the event bus to the state machine:

new eventbridge.Rule(this, "TestEventsRule", {
eventBus: props.eventBus,
eventPattern: {
source: eventbridge.Match.prefix(""),
},
targets: [new eventbridgeTargets.SfnStateMachine(stateMachine)],
});

For demo purposes, I’m forwarding all events. If you want to use this method to run tests against production, you most likely only want test events to flow through the test infrastructure.

If you can distinguish test from production traffic in your event-producing services, you can format your event to allow filtering out only test events.

Verify the test infrastructure

Let’s verify that the test infrastructure is wired correctly before moving on. The AppSync console for Event APIs has a neat Pub/Sub Editor that lets you connect and subscribe to the API directly in the browser.

Connect to your API and subscribe to the debug channel in the default namespace. Then, go to the console for your event bus and send an event or two. The events you send should show up in the AppSync console.

Test the sample application end-to-end

With the test infrastructure in place, we can now test our application End-to-End and be confident that it emits the correct event(s). The full architecture looks like this:

End-to-end testing flow.
End-to-end testing flow.

The test code is very similar to the test code in my previous post. The major difference is that it subscribes to AppSync instead of Momento.

The test case itself is almost identical:

describe("When an order is created", async () => {
const API_URL = process.env.API_URL || "";
const subscription = await subscribe("default", "debug");
afterAll(async () => {
subscription.unsubscribe();
});
it("It should publish an OrderCreated event to EventBridge", async () => {
const response = await fetch(`${API_URL}/event`, { method: "POST" });
expect(response.status).toBe(201);
const order = await response.json();
const message = await subscription.waitForMessageMatching({
source: "OrderService",
"detail-type": "OrderCreated",
detail: {
id: order.id,
name: order.name,
},
});
expect(message).not.toBeNull();
}, 5000);
});

First, we subscribe to the debug channel in the default namespace. The test then creates an order through the API and stores the returned order ID. Then, it waits a maximum of five seconds for a message matching the expected format to appear on the channel.

The major difference from the Momento post is how subscribe works:

import "dotenv/config";
import { Amplify } from "aws-amplify";
import { events } from "aws-amplify/data";
import * as _ from "lodash";
import { ReplaySubject, firstValueFrom } from "rxjs";
import { filter } from "rxjs/operators";
import { WebSocket } from "ws";
// @ts-expect-error type
globalThis.WebSocket = WebSocket;
Amplify.configure({
API: {
Events: {
endpoint: process.env.EVENTS_API_URL || "missing",
defaultAuthMode: "apiKey",
apiKey: process.env.EVENTS_API_KEY || "missing",
},
},
});
export const subscribe = async (namespace: string, channelName: string) => {
const messages = new ReplaySubject(100);
const channel = await events.connect(`/${namespace}/${channelName}`);
channel.subscribe({
next: (data) => {
messages.next(data.event);
},
error: (err) => console.error("error", err),
});
// Give the subscription some time to establish
await new Promise((resolve) => setTimeout(resolve, 250));
const unsubscribe = async () => {
channel.close();
};
const waitForMessageMatching = async (expected: object) => {
const predicate = (message: unknown) => {
if (typeof message !== "object" || message === null) {
return false;
}
return _.isMatch(message, expected);
};
const data = messages.pipe(filter((message) => predicate(message)));
return firstValueFrom(data);
};
return {
unsubscribe,
waitForMessageMatching,
};
};

I’m using the AWS Amplify client to handle the connection for me. The Amplify implementation requires a WebSocket client to be available. In node, this was previously available behind an experimental flag. Starting with Node.js 22, a browser-compatible implementation of WebSocket is enabled by default. For earlier versions, you can use the ws package and set globalThis.WebSocket = WebSocket.

In the subscribe function, I create an RxJS ReplaySubject and connect and subscribe to the AppSync Events API using the Amplify client. All incoming messages are stored in the ReplaySubject.

Individual tests can use the waitForMessageMatching to find events matching a specific pattern. In our test, we are looking for an OrderCreated event:

const message = await subscription.waitForMessageMatching({
source: "OrderService",
"detail-type": "OrderCreated",
detail: {
id: order.id,
name: order.name,
},
});
expect(message).not.toBeNull();

Conclusion

Serverless applications are inherently event-driven and can be challenging to test. Developers have gone to great lengths to try and emulate managed AWS services locally to facilitate testing. I’m a strong advocate for testing in the cloud, as it is the only way to truly test the behavior of your application(s) in a production-like environment.

With the introduction of AppSync Events, AWS has made it easier to set up pub/sub patterns using websockets with little to no effort. In this post, you have learned how to combine EventBridge, Step Functions, and AppSync Events to test that your application(s) emit the correct events.

Try it out!

You can find the sample application and a step-by-step guide on GitHub.


About the author

I'm Elias Brange, a Cloud Consultant and AWS Community Builder in the Serverless category. I'm on a mission to drive Serverless adoption and help others on their Serverless AWS journey.

Did you find this article helpful? Share it with your friends and colleagues using the buttons below. It could help them too!

Are you looking for more content like this? Follow me on LinkedIn & Twitter !