.NET 5 AWS Lambdas: MQ Triggers
I recently created a reference architecture for a client using AWS Lambdas, ElastiCache, Amazon MQ, SSM, Secrets Manager and RDS. One of the goals of the reference architecture was to demonstrate how the solution could support Kestrel as well as AWS proxies and could be exercised via dotnet command line, in a docker container and eventually hosted within AWS. Additionally, the reference architecture demonstrates how one may use events to drive management of user caches so that they very seldom read from the underlying RDBMS. Today however, I’d like to focus on sharing the discoveries I made working with Lambdas and specifically those triggered via Brokers.
Background
Serverless computing has 2 very advantageous value propositions. The first is simply doing away with requiring management in any form of compute such as containers or EC2 instances. As engineers, we can focus on the business cases we need to solve for. AWS handles the compute platform for Lambdas. The second is spend. As one of my colleagues-Antoine Campbell-pointed out, with cloud computing Solution Architects are called upon more and more to help manage costs. As responsible stewards of our clients’ budget, we need to provide solutions that balance cost with performance, availability and other nonfunctional requirements. AWS Lambdas are billed based purely on compute. If they aren’t being called, they incur no charge.
For products that aren’t used 24x7, lambdas are better suited for cost management than containerized solutions. If products are used around the clock, pick your poison as it’s highly likely that there will be little if any difference in compute costs between containerized services and lambdas.
Beware though! Cold starts suck. When running the reference architecture from a cold start, the first request can take anywhere from 15–30 seconds. To avoid this, you may want to schedule requests to your lambdas that will keep them warmed up around the beginning of business hours.
MQ Triggers
MQ Triggers are relatively easy to set up. Since Lambdas are serverless, they lack stateful compute to host listeners such as what you’d find in a docker container. Whether you’re using RabbitMQ.Client with subscribers or Mass Transit with the IConsumer<T> interface, they are largely the same. With Lambdas, we instead define an MQ trigger and rely on Amazon to poll the queue and then fire the trigger on the service.
I was surprised though, at the general lack of documentation on how to consume messages.
Failing Fast
There is very sparse documentation on how to implement a .NET 5 Lambda that is triggered via an MQ trigger. My first foray into this was to use the out of box Api Gateway Proxy. Note that I started the messaging service off using the Web API Blueprint. This gave me an asynchronous endpoint API Gateway would call upon invocation with the following method signature:
Task<APIGatewayProxyResponse> FunctionHandlerAsync(APIGatewayProxyRequest request, ILambdaContext lambdaContext)
Inspecting the serverless template, the lambda entrypoint was defined as such:
“AspNetCoreFunction”: {
“Type”: “AWS::Serverless::Function”,
“Properties”: {
“PackageType”: “Image”,
“ImageConfig”: {
“EntryPoint”: [
“/lambda-entrypoint.sh”
],
“Command”: [
“ReferenceArchitecture.UserServices::ReferenceArchitecture.UserServices.LambdaEntryPoint::FunctionHandlerAsync”
]
}
My first inclination was simply to override the function handler. Publishing a message through the broker console, I immediately saw that the lambda was triggered but there was nothing indicating that a message was received.
First Almost Successful Solution
After many rounds of consulting “The Google”, one of my colleagues found an article that informed us that a Lambda could consume a stream. Testing that theory, I added a simple Function class to the Lambda and changed the command entrypoint out to target a method in the Function class. The method signature was defined as:
Task<Stream> Get(System.IO.Stream stream, ILambdaContext context)
This produced the desired results. Upon reading in the stream as a string and dumping it to CloudWatch, I was presented with the following message:
{
“eventSource”: “aws:rmq”,
“eventSourceArn”: “arn:aws:mq:<my broker>:b-20ad9db0–57a5–4773-b942–5b073e73f852”,
“rmqMessagesByQueue”: {
“SampleMessages::/”: [
{
“basicProperties”: {
“contentType”: “application/vnd.masstransit+json”,
“contentEncoding”: null,
“headers”: {
“MT-Activity-Id”: {
“bytes”: [
48,
]
},
“Content-Type”: {
“bytes”: [
]
},
“publishId”: {
“bytes”: [
49
]
}
},
“deliveryMode”: 2,
“priority”: null,
“correlationId”: null,
“replyTo”: null,
“expiration”: null,
“messageId”: “2aba0000–5fe2-a651–092d-08d96a62963b”,
“timestamp”: null,
“type”: null,
“userId”: null,
“appId”: null,
“clusterId”: null,
“bodySize”: 1166
},
“redelivered”: false,
“data”: “a bunch of base 64 encoded data”
}
]
}
}
Just to note, messages that are passed in are base 64 encoded. This makes sense given that binaries can also be passed via messages. The second interesting thing to note is the name of the array inside rmqMessagesByQueue element. This array is named as Queue Name + 2 colons + 1 forward slash. Since the polling service batches messages, your lambda will need to take this into account and handle each message appropriately. The older documentation suggests storing messages in memory until they are all processed.
The actual message payload is stored within the data element as a base 64 encoded string.
{
“messageId”: “06a60000-e77c-6ac5-cc1d-08d96a718afd”,
“conversationId”: “06a60000-e77c-6ac5–56d6–08d96a718b0d”,
“sourceAddress”: “rabbitmqs://<my broker>.amazonaws.com/169_AmazonLambdaRuntimeSupport_bus_y4uyyy88xtick51bbdcswhm5gs?temporary=true”,
“destinationAddress”: “rabbitmqs://<my broker>/SampleMessages?bind=true”,
“messageType”: [
“urn:message:ReferenceArchitecture.Models.Messages:SampleMessage”
],
“message”: {
“messageUserId”: 1,
“todoItem”: {
“taskId”: 1,
“taskName”: “Sample Task 1”,
“dueDate”: “2021–08–28T20:00:36.517”,
“isCompleted”: true
}
},
“sentTime”: “2021–08–28T22:16:54.5373213Z”,
“headers”: {
“MT-Activity-Id”: “00–04b30e4f5d62db43bd4f35c322e644a8-cf48fa2e665ff143–00”
},
“host”: {
“machineName”: “169”,
“processName”: “Amazon.Lambda.RuntimeSupport”,
“processId”: 11,
“assembly”: “Amazon.Lambda.RuntimeSupport”,
“assemblyVersion”: “1.3.0.0”,
“frameworkVersion”: “5.0.7”,
“massTransitVersion”: “7.2.2.0”,
“operatingSystemVersion”: “Unix 4.14.243.194”
}
}
In the message, you’ll clearly see that Amazon handled the movement of the message from the queue to the Lambda via the RuntimeSupport bus.
My first attempt with the new Function class I created actually failed. My constructor was defined with the necessary dependencies. In the CloudWatch logs, I got an exception informing me of a lack of a parameterless constructor. This was a big “Uh Oh!”. After wasting further time setting up a service locator, I continued to get null reference exceptions when requesting services from it. In short, my .NET 5 code was never going through the Init() or Startup() methods to spin up my services.
Needless to say, I had all my services configured in my Startup class. After replacing the Api Gateway class with a simple function class, all my wonderful configured services were no longer available. While I solved the first problem of consuming messages, I was now left with a relatively useless class that was unable to acquire its dependencies.
The Grand Finale
On a whim, I decided to change my function entry point in my serverless.template file to use the APIGatewayProxyFunction class. I copied the Get method from my Function class and mapped that as the entry point. When I ran the lambda, the Lambda Support service correctly called the Get method. In addition, the LambdaEntryPoint class was correctly initialized and I had access to my services. I still had to leverage the Service Locator to acquire my dependencies but all in all, I had a viable solution.
Summary
When consuming MQ messages in a .NET 5 AWS Lambda, use the ApiGatewayProxyFunction class and add a method that receives and returns a stream as in:
public async Task<Stream> Get(Stream stream, ILambdaContext context)
You will need to build appropriate classes to support deserialization of the payload however, you don’t need to write custom converters unless you really want to. Providing simple POCOs with Json property attributes is more than sufficient.
If you have a lambda with multiple MQ triggers (i.e., receive many messages from many queues), you’ll need some way to inspect the payload and determine which message you’re actually handling so it can be correctly deserialized. Remember that the queue name is defined in the payload as Queue Name + “::/” (.e.,g “SampleMessages::/”).
To date, I haven’t determined if it’s possible to have multiple entry points into a Lambda for the off chance I’d want to support an API as well as MQ triggers. Maybe next time, I’ll provide some insights into how one might implement this capability.