THINKING ENERGY
AWS Managed Pubsub with Authorization

Written by Sam Thorogood

May 19, 2022

This is a technical blogpost from our CTO, Sam Thorogood.

Amazon provides a managed GraphQL service called AppSync. GraphQL supports queries, mutations and also this nebulous concept called subscriptions—these allow participants to observe other mutations as they happen.

We’ve used these concepts together, to create a generic pubsub service that correctly lets end-users (i.e., in a browser):

  • subscribe to certain global topics
  • only be delivered messages which match their groups (i.e., “Claims”, in JWT parlance)—we use AWS Cognito for our auth service
  • receive an arbitrary JSON payload on that topic

This blog post will detail how we’ve done it and how we use it in production here at Gridcognition. We’ve used AWS’ CDK to describe the API, plus use a small end-user library to make it work.

Our use-case is for our infrastructure code to publish messages—it’s the trusted party who decides who else should see what’s going on. We do not actually intend for end-users to ever publish messages to others directly via GraphQL directly from e.g., their browser or device.

Before we begin, it’s worth noting that Amazon already documents this (here and here)—and this blog post builds on those posts—but their approaches let anyone see everything. Access control is kind of important part of any large-scale software system, and that’s the value this guide provides! 😅

 

Building pubsub in CDK for AppSync

So let’s get started. Here’s the design we’re going to build out:

We basically configure AppSync over a concept called a “NONE” data source, and then set up subscription filtering for messages on that source which match a user’s Cognito groups. This filtering is the key component that ensures users only get messages intended for them, but it has nuances.

Subscription filtering is a new concept in AppSync, having only been released at the start of 2022. It has one major limitation that we work around—it only allows ten unique filters per subscription, which means an end-user, if they have more than ten groups, makes more than one subscription to our pubsub service.

This doesn’t really occur in practice for our Gridcognition services—each user (i.e., a user logging in at their domain) usually is part of only one client associated with their source domain, and as such, they’ll likely only have one Cognito group. But we’re building a scalable approach—we won’t be surprised in the future! 📈

Let’s get into building it—and to restate the goal of this post again: this is a step-by-step guide on how to build it, we use CDK, so read on. 👇

Get started

We’ll start by updating the constructor of an existing or new CDK stack:

const userPool = UserPool.fromUserPoolId(this, `${id}-pool`, "YOUR-POOL-ID");
const api = new GraphqlApi(this, 'Api', {
  name: `${id}-presence-api`,
  authorizationConfig: {
    defaultAuthorization: {
      userPoolConfig: { userPool },
      authorizationType: AuthorizationType.USER_POOL,
    },
    additionalAuthorizationModes: [{ authorizationType: AuthorizationType.IAM }],
  },
});

This sets up a brand new GraphQL API tied to your AWS Cognito User Pool. 🏊

Next, let’s define some basics about our pubsub message type, as we have to tell AppSync what shape our data will be in:

const definition: { [key: string]: GraphqlType } = {
  topic: GraphqlType.string({ isRequired: true }),
  data: GraphqlType.awsJson({ isRequired: true }),
  groups: GraphqlType.string({ isRequired: true, isRequiredList: true }),
  uuid: GraphqlType.string({ isRequired: true }), // required but immediately replaced
};
const channelType = new ObjectType('Channel', { definition, directives: [Directive.iam()] });
api.addType(channelType);

// This is required to exist even though it does nothing (it's a NoneDataSource).
// This has an invalid return type we can't satisfy.
api.addQuery('ignoredQuery', new Field({
  returnType: channelType.attribute(),
}));

const noneDataSource = api.addNoneDataSource('pubsub');

As you can see, the message is pretty generic—it supports a topic, some JSON data that’s passed along with it, as well as the Cognito groups which are allowed to receive it. We also specify UUID—because of GraphQL’s model, the sender needs to send something here, but we rewrite it in AppSync so we can guarantee it’s always unique. To put it another way, it’s rewritten to prevent accidental re-sending of a message with the same ID by an errant server.

Publish messages

We’ve set up our basic types, so now let’s add a mutation that allows a server to post a message.

api.addMutation('publish', new ResolvableField({
  returnType: channelType.attribute(),
  args: definition,
  dataSource: noneDataSource,
  requestMappingTemplate: MappingTemplate.fromString(`
## Adds a UUID to every published event in the "uuid" field.
#set( $out = { "topic": $context.arguments.topic, "data": $context.arguments.data, "groups": $context.arguments.groups, "uuid": $util.autoId() } )
{
"version": "2018-05-29",
"payload": $util.toJson($out)
}
  `),
  responseMappingTemplate: MappingTemplate.fromString(`$util.toJson($context.result)`),

  // TODO: This allows publishing from any IAM role, which is probably fine for testing.
  directives: [Directive.iam()],
}));

This does what it says on the tin—we add a mutation called “publish” that almost passes our messages through verbatim, but if you can look far to the right of the code, ensures that the UUID is automatically generated. 📝

Allow subscriptions to messages

This is the more complicated part where we start to add a lot of business logic. The above code had a concept of “mapping templates”: this is what we can do to a user’s request or response as it comes in and out of AppSync, and it’s written in a relatively archaic language called VTL, which is basically just Java with a bunch of oddities useful for—in our case—outputting JSON.

As an aside, it’s worth noting that when you use Amplify, another AWS product, it’s basically just generating an absolute boatload of VTL and applying that to AppSync on your behalf. Its data model isn’t more magic than that—you can peer in and discover what it’s doing, or rather, what it can’t do—more on that later. 👀

First, let’s set up a response mapping template for our user’s subscription. Despite the name, this isn’t code which runs every time a message is received by the client as part of the subscription—it’s only called once, on subscription setup. As such, it doesn’t really return anything, but we can use it to set up our filters.

The code is a bit long but somewhat self-documenting. The key is the $userGroups.contains($group) check—we aggressively fail if a user tries to set up a filter on a group that they’re not part of—thereby ensuring they can’t get access to other users’ updates.

const resonseMappingTemplate = MappingTemplate.fromString(`
#set( $userGroups = $util.defaultIfNull($context.identity.claims.get("cognito:groups"), []) )
#set( $prefix = $util.defaultIfNull($context.arguments.prefix, "") )
#set( $onlyGroups = $util.defaultIfNull($context.arguments.onlyGroups, []) )

## Creates a filterGroup per group, so we can allow many groups.
## We only include requested groups in order not to blow our limit of filters (10), and clients
## make multiple subscriptions to get all the groups they're in.

#if( $onlyGroups.size() > 8 )
  ## Give us wiggle room in case we add a special filter later (8 < 10)
  $util.error("Can't handle more than 8 groups per subscription")
#end

#set( $filterGroups = [] )
#foreach( $group in $onlyGroups )
  #if( !$userGroups.contains($group) )
    ## The client should know what they have access to
    $util.error("Can't subscribe to group, not in claims")
  #end

  $util.qr($filterGroups.add({
    "filters": [
      {
        "fieldName": "topic",
        "operator": "beginsWith",
        "value": $prefix
      },
      {
        "fieldName": "groups",
        "operator": "contains",
        "value": $group
      }
    ]
  }))
#end

$extensions.setSubscriptionFilter({
  "filterGroup": $filterGroups
})

$util.toJson(null)
`);

Once that’s done, we can use it to set up a subscription that end-users can use in their web browsers/clients, which is pretty straight-forward:

api.addSubscription('subscribe', new ResolvableField({
  returnType: channelType.attribute(),
  args: {
    prefix: GraphqlType.string(),
    onlyGroups: GraphqlType.string({ isList: true, isRequired: true })
  },
  directives: [Directive.subscribe('publish')],
  dataSource: noneDataSource,
  requestMappingTemplate: MappingTemplate.fromString(`
{
"version": "2018-05-29",
"payload": $util.toJson($context.arguments)
}
  `),
  responseMappingTemplate,
}));

This allows users to make a subscription request called subscribe that takes two arguments— prefix, to filter to topics with a specific prefix, and onlyGroups, which is is used to deal with the limit of filters we can make per subscription (as I mentioned above).

The last step in writing CDK is to exfiltrate the URL of the AppSync endpoint we just set up. You might use CfnOutput, like this, but maybe you’ll write it to a JSON file or something:

new CfnOutput(this, 'graphqlUrl', { value: api.graphqlUrl });

This will end up looking something like “https://long-id-goes-here.appsync-api.ap-southeast-2.amazonaws.com/graphql”.

It’s worth noting that the URL used to subscribe is actually the same but with “.appsync-api.” replaced with “.appsync-realtime-api.”—but AppSync does this for you in its client automagically. You only need to replace this if you’re writing your own WebSocket client.

Publishing a message from your backend

The CDK config above is set up to allow anyone with an IAM role to publish to the feed. That isn’t perfect, but is probably enough for small projects where various parts of your backend might want to announce some change to subscribed clients. You can modify it, and that’s left as an exercise for the reader. ⭐

In a context with AWS IAM credentials, say your local machine or a Lambda, you can write code like this:

import { ApolloClient, InMemoryCache, ApolloLink, HttpLink, gql } from '@apollo/client/core';
import { createAuthLink, AUTH_TYPE, AuthOptions } from 'aws-appsync-auth-link';
import * as aws from 'aws-sdk';
import fetch from 'cross-fetch';  // needed before Node 18

// This is the URL previously generated in CDK.
const { GRAPHQL_URL } = process.env;

const config = {
  url: GRAPHQL_URL,
  auth: {
    type: AUTH_TYPE.AWS_IAM,
    credentials: aws.config.credentials!,
  } as AuthOptions,

  // We don't need a region, because it's implied in the AppSync domain.
  region: '',
};

const apolloClient = new ApolloClient({
  link: ApolloLink.from([createAuthLink(config), new HttpLink({ uri: config.url, fetch })]),
  cache: new InMemoryCache(),
});

const mutationGql = gql(/* GraphQL */ `
  mutation Publish($topic: String!, $data: AWSJSON!, $groups: [String!]!) {
    publish(topic: $topic, data: $data, groups: $groups, uuid: "") {
      # We need to request all these values back, otherwise the subscriptions won't see them.
      topic
      data
      groups
      uuid
    }
  }
`);

async function sendTestPayload() {
  const variables = {
    topic: "project:foo",
    data: JSON.stringify({ hello: 'there' }),
    groups: ["Project-Foo", "Client-Bar"],
  };
  const { data } = await apolloClient.mutate({
    mutation: mutationGql,
    variables,
  });
  const { uuid } = data.publish;
  console.info('Broadcast test message with uuid', uuid);
  return uuid;
}

There’s a lot here, but basically it pulls the previous graphqlUrl from an env var—maybe you’ll pass it through some other way—before setting up an Apollo client and mutation that we can call to announce over pubsub. Finally, we have a test helper which posts to groups “Project-Foo” and “Client-Bar”.

The key here is that your server needs to know what groups are allowed to see an update for a certain topic. It’s not magically implied through the topic name itself, which is just an arbitrary string that a user might be interested in. You might get this from a database row or some other source.

For Gridcognition, we have a concept of projects which are available to users directly, or as part of an owning client group. So for an update about a single project, we allow access to users in that project’s group directly (”Project-Foo”), or to the parent group which contains all that client’s projects (in our case, group “Client-Bar”).

Receiving messages on the front-end

This is the last step that puts our code together. We have to subscribe on the client side to receive these pubsub messages. We can follow the Apollo-client-with-AppSync-recommended-path to do this. First, we need to set up our client:

// nb. If you already talk to AppSync, you probably have a setup like this already.
// However we need to generate a new one for our new GraphQL/AppSync API.
const appsyncClientConfig = {
  url: GRAPHQL_URL,  // from our original CDK
  region: '',
  auth: {
    type: AUTH_TYPE.AMAZON_COGNITO_USER_POOLS,
    jwtToken: getAccessToken,
  } as AuthOptions,
};

const apolloClient = new ApolloClient({
  link: ApolloLink.from([
    // these create... methods are imported from the appsync JS library
    createAuthLink(appsyncClientConfig),
    createSubscriptionHandshakeLink(appsyncClientConfig),
  ]),
  cache: new InMemoryCache(),
});

Note that you need to provide getAccessToken, which is a helper that might look a bit like:

/**
 * Called constantly by GraphQL whenever it needs a token. (Deals with user signin/signout fine.)
 */
const getAccessToken = () => {
  return Auth.currentSession().then((session) => session.getIdToken().getJwtToken());
};

And then once apolloClient is available, we use it to allow subscriptions for arbitrary prefixes based on the user’s current groups:

const pubsubQuery = gql`
subscription MySub($prefix: String, $onlyGroups: [String!], $allForAdmins: Boolean) {
  subscribe(prefix: $prefix, allForAdmins: $allForAdmins, onlyGroups: $onlyGroups) {
    topic
    data
    groups
    uuid
  }
}`;

type SubData = { topic: string, data: string, uuid: string };

async function subscribeToPrefix(prefix: string, callback: (x: SubData) => void): void {
  const seenUuids = new Set<string>();

  const allGroups = groupsForToken(token);
  while (allGroups.length) {
    const onlyGroups = allGroups.splice(0, 8);
    const s = apolloClient.subscribe<{ subscribe: SubData }, { prefix: string, onlyGroups: string[] }>({
      query: pubsubQuery,
      variables: { prefix, onlyGroups },
    });
    s.subscribe((result) => {
      if (!result.data) { return; }

      // Store the UUID in case it was across many fanned-out subscriptions.
      const { uuid } = result.data.subscription;
      if (seenUuids.has(uuid)) { return; }
      seenUuids.add(uuid);

      callback(result.data.subscription);
    }));
  }
}

There’s a few interesting parts of the above code.

  • As mentioned, it fans out per every 8 groups—this is less than AppSync’s limit of ten, just to give you leeway in future (it could be 10 to be risky, or less if you wanted to check the behavior).
  • You need to provide a method which extracts the user’s groups from the passed token—this is the JWT that AWS Cognito provides you, perhaps through its Auth helper as we used it above. You could look at https://github.com/auth0/jwt-decode for this purpose.
  • Malicious users can’t specify other groups here—because they end up being validated by our VTL code I wrote a few steps back. But it’s now convention for the client to split them into a limited number per request, and include the ones they’re part of.
  • We store all UUIDs seen, in case a broadcasted message is actually sent to groups that span the “split” of a user’s subscriptions. That way user code will only see it once. I’ve not done it here, but an obvious extension is to clear a UUID a few seconds after it’s seen, otherwise that Set will grow infinitely.

Concerns about Apollo

Gridcognition does not actually use JS like the above—we reimplement our library that talks GraphQL to the AppSync server. This is because there’s one huge problem with the way the subscription is set up above, which is an issue with Apollo and derived libraries (e.g., such as its React hooks implementation).

The issue boils down to how things like pubsub or presence are used. In our case, we use it to create real-time collections—our server announces when e.g., a comment is made or updated.

So the user will:

  1. Loads a page which will show some real-time comments
  1. Simultaneously fetch the data, and subscribe to updates

The challenge is around timing. Here’s a diagram which represents what happens:

The subscription and fetch might finish, and return to the client, at different times. If our subscription hasn’t started yet, we don’t know if we missed some pubsub messages saying that there has been new comments.

Now, the solution to this is fairly simple—when subscriptions start, we do another fetch, because that’s now in the “safe” period—we know that we’ll be told about any changes from that point forward.

However… 🤦

GraphQL as a protocol literally has a callback saying – yes, the subscription is active. But Apollo literally throws this away and does not expose it to our client, so we can’t refetch at this point. The only update we get is when a subscription fires, which could even never happen if a collection is pretty stale.

So instead, we have a custom library which implements a simple WebSocket client, which talks subscriptions, that announces when the subscription is ready. We in fact literally follow Amazon’s guide on how to do this, which was an invaluable reference.

And this library gives us the “active” event. At that point, we either fetch or refetch our underlying data—to be absolutely sure that we’ve not missed an update. Your use case may not need to do this, but be aware of the possibility of a race condition.

So if I were Amazon, I’d add support for this “active” event into their AppSync client libraries—it’s such an easy win.

That’s it

That was a lot of code. I hope it’s been helpful. Whew. Have a donut to celebrate. 🍩

Background on Subscriptions

While this is a post on building a pubsub service, it’s worth noting that we need this because subscriptions are such a poor part of GraphQL—they’re not practical to use directly. Naïvely, you’d build a real-time application on AppSync, and Amazon suggests/documents this, by listening to a subscription for whenever another user makes a mutation.

Seems easy, right? Well, … no.

There’s two major problems. First, a subscription only receives the fields that are requested in response to a mutation. If I mutate a hypothetical Todo item, I could send this request:

mutation {
  modifyTodo(id: "1234", done: true) {
    done
  }
}

…the only thing subscribers will see is the modifyTodo with parameters of… {done: true}.

And turns out, I cannot look up or invalidate an item with {done: true}. You can use this message to request an entire collection again—or if you’re being extra efficient, only rows that have been modified after you—but it’s limiting and bizarre that the caller decides this.

Secondly, the issue I mentioned above—it’s not at all clear where a user “jumps on the train” for subscriptions. I basically have to hold on and hope that I’m now properly connected and getting updates—It’s like being on the internet but not knowing whether it’s working until someone else decides to send you an email.

Some of these concerns (and more) are detailed in a great post on why AppSync (and implicitly, Amplify, which is a common user of it) aren’t good for subscriptions. This also includes even more horrible cases around Amplify’s auth model—subscriptions actually just aren’t enabled when things get too hard.

And yes, AWS does document this… somewhere, deep inside their docs. But our team struggled with this for months and months before having to literally trawl through that generated VTL I mentioned before, and discovering that certain functionality just nulls out because it’s unsupported. 🫠

Summary

Unlike most of GraphQL, subscriptions aren’t built well and they’re not particularly useful when exposed to end-users directly. They should not be part of a service’s public API. 🙅

But they are a useful primitive when we wrap them in a library and very specific approach—here, we’re never really exposing them to end-users, partially because they can’t do mutations directly (you’d wrap them through some user action that a lambda or something else performs), and partially because you can wrap the client code up in a helper too.

Thanks for reading! Gridcognition is a small energy startup here in Australia, and if you’d like to find out more about us, then do click around and see what we’re up to. Or, hit me up on Twitter. 🐦

You May Also Like…

DER Orchestration in the WEM

Western Australia is a global hotspot for high-penetration of distributed generation in the electricity system, and...

Subscribe to Thinking Energy

We promise we don't send spam