Easy serverless ETL with Momento and AWS Step Functions
ETL (Extract, Transform and Load), extracting data from a source, adapting/cleaning up and inserting them in a destination is one of the most common challenges that we have to face when architecting an application.
You have to proceed step by step so AWS Step Functions orchestrating Lambdas is a clear path and a temporary, flexible and reliable storage where to put data during the process is a fundamental tool: here comes Momento Cache.
Momento Cache is a fully serverless cache-as-a-service (CaaS) that allows you to “spawn” caches right when you need, without having to worry about provisioning.
You will pay only for the data you transfer to Momento in a clear Pay As You Go form that is proper of the serverless approach. First 50GB are covered by the Free Tier, beyond that you will pay $0.50/GB.
To manage your caches, Momento provides a Web Console and a full set of SDK for common languages (in this article I will use the Java SDK), with documentation and code examples.
Project overview
As usual you can find the complete code on my GitHub:
First things first: following the Getting-Started guide, go to the Momento Web Console and register yourself.
After logging-in you need to create a token that will allow the SDK to communicate with Momento:
Pick your Cloud Provider, Region and Expiration and store your token properly. In this guide I’ll use AWS and my token will be stored securely in the AWS Parameter Store, you can also use the AWS Secret Manager if you prefer.
Project Structure
In this project we are going to use the AWS Step Functions. Leveraging their orchestration it’s easy to “shred” the problem in little and more simple tasks.
Step Functions like Api Gateway can be natively integrated with a lot of AWS services without even needing a Lambda to “glue” them togheter.
The file simplemomentoetl.asl.json stores the definition of the state machine. ASL stands for Amazon States Language and is a json format to define step by step the workflow, let’s see what’s inside:
{
"Comment": "A state machine that exports data from a remote database table to a local DynamoDB using Momento Cache as temporary storage",
"StartAt": "Getting data from remote movie table",
"States": {
"Getting data from remote movie table": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${FromDbToCache}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"Next": "Reading from movie cache and persisting to DynamoDB"
},
"Reading from movie cache and persisting to DynamoDB": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${FromCacheToDynamoDb}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"End": true
}
}
}
To kickoff a workflow, you need to define a “StartAt” state, to transition to the next state you need to provide a “Next” definition, to pass to the next state some payload, you need to use the “OutputPath”: “$.Payload” and to receive some json request from the previous state the instruction needed is “Payload.$”: “$”
Pay also attention at the function “${FromDbToCache}”: is the way you can inject parameters from the template.yaml inside the Step Function Workflow definition.
The last state needs “End”: true to instruct the Step Function to provide the outcome.
AWS SAM Definition
I’m using for the first time in my projects the Java 17 runtime environment in Lambda because it has been added recently.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
Serverless ETL with Momento Cache
Globals:
Function:
Timeout: 720
Runtime: java17
Architectures:
- arm64
MemorySize: 256
Environment:
Variables:
JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1
As told before I use the AWS Parameter Store for managing environmental variables. Here is how to retrieve them from it in the template.yaml, you can see also the first FromDbToCache Lambda definition:
Resources:
FromDbToCache:
Type: AWS::Serverless::Function
Properties:
Handler: it.loooop.FromDbToCache::handleRequest
CodeUri: .
Environment:
Variables:
PG_HOSTNAME: "{{resolve:ssm:/pg_hostname:1}}"
PG_DB_NAME: "{{resolve:ssm:/pg_db_name:1}}"
PG_USERNAME: "{{resolve:ssm:/pg_username:1}}"
PG_PASSWORD: "{{resolve:ssm:/pg_password:1}}"
PG_PORT: "{{resolve:ssm:/pg_port:1}}"
MOMENTO_AUTH_TOKEN: "{{resolve:ssm:/momento_auth_token:2}}"
You need to specify the name of the parameter and the version.
Every time you change/update an AWS Parameter Store entry, the version automatically increases.
It’s a safety feature, just remember about it in case of problems.
The second Lambda, FromCacheToDynamoDb, contains an useful mix of Policies definition. The standard SAM’s template policy called DynamoDBCrudPolicy does not include the right to create a DynamoDB table so I had to add it as “custom policy”:
FromCacheToDynamoDb:
Type: AWS::Serverless::Function
Properties:
Handler: it.loooop.FromCacheToDynamoDb::handleRequest
CodeUri: .
Policies:
- DynamoDBCrudPolicy:
TableName: '*'
- Statement:
- Effect: Allow
Action:
- 'dynamodb:CreateTable'
Resource:
- '*'
Environment:
Variables:
MOMENTO_AUTH_TOKEN: "{{resolve:ssm:/momento_auth_token:2}}"
The simplemomentoetl.asl.json is not enough to define a Step Function, you need to structure it also in the template.yaml:
StateMachineSimpleMomentoEtl:
Type: AWS::Serverless::StateMachine
Properties:
DefinitionUri: statemachine/simplemomentoetl.asl.json
DefinitionSubstitutions:
FromDbToCache: !GetAtt FromDbToCache.Arn
FromCacheToDynamoDb: !GetAtt FromCacheToDynamoDb.Arn
Policies:
- LambdaInvokePolicy:
FunctionName: !Ref FromDbToCache
- LambdaInvokePolicy:
FunctionName: !Ref FromCacheToDynamoDb
Did you remember about the ${FromDbToCache} i told you before, inside the simplemomentoetl.asl.json? Here you can see how it is populated from the template.yaml:
FromDbToCache: !GetAtt FromDbToCache.Arn
The right to invoke the Lambdas for a Step Function is not automatically given, you need to provide it with “LambdaInvokePolicy”.
Momento Cache
Let’s see how to use Momento Cache inside a Java project, no matter if we are in an AWS Lambda environment, the way is the same.
Add the dependency in Maven (or Gradle):
<!-- https://mvnrepository.com/artifact/software.momento.java/sdk -->
<dependency>
<groupId>software.momento.java</groupId>
<artifactId>sdk</artifactId>
<version>1.0.0</version>
</dependency>
Create a CacheClient:
package it.loooop.client;
import momento.sdk.CacheClient;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.auth.EnvVarCredentialProvider;
import momento.sdk.config.Configurations;
import java.time.Duration;
public class Momento {
private final String AUTH_TOKEN_ENV_VAR;
private final Duration DEFAULT_ITEM_TTL;
public Momento(String momentoAuthToken, Integer itemTtlInSeconds) {
this.AUTH_TOKEN_ENV_VAR = momentoAuthToken;
this.DEFAULT_ITEM_TTL = Duration.ofSeconds(itemTtlInSeconds);
}
public CacheClient getCacheClient(){
final CredentialProvider credentialProvider = new EnvVarCredentialProvider(AUTH_TOKEN_ENV_VAR);
return CacheClient.builder(credentialProvider, Configurations.Laptop.latest(), DEFAULT_ITEM_TTL)
.build();
}
}
Three things to pay attention to:
- You only need the AUTH_TOKEN_ENV_VAR to interact with Momento Cache!
- You can specify the TTL of the cache entries to manage expiration.
- Momento has like a “template” configuration to handle the timeouts and all this lovely thing that every service has to manage. I’m using the Configurations.Laptop.latest() but you can choose the one that suits your Project and Infrastructure more. Please refer to the complete Momento Docs
Create a Momento Service:
package it.loooop.service;
import it.loooop.client.Momento;
import momento.sdk.CacheClient;
import momento.sdk.exceptions.AlreadyExistsException;
import momento.sdk.exceptions.LimitExceededException;
import momento.sdk.responses.cache.control.CacheCreateResponse;
import momento.sdk.responses.cache.dictionary.DictionaryFetchResponse;
import momento.sdk.responses.cache.dictionary.DictionarySetFieldResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
public class MomentoService {
Logger logger = LoggerFactory.getLogger(MomentoService.class);
Momento momentoClient;
CacheClient client;
Integer cooldown = 2;
public MomentoService (String momentoToken) {
momentoClient = new Momento(momentoToken, 4800);
client = momentoClient.getCacheClient();
}
public void create(String cacheName) throws RuntimeException {
final CacheCreateResponse createResponse = client.createCache(cacheName).join();
if (createResponse instanceof CacheCreateResponse.Error error) {
if (error.getCause() instanceof AlreadyExistsException) {
logger.atError().setCause(error.getCause())
.log("Cache with name {} already exists", cacheName);
} else {
logger.atError().setCause(error.getCause())
.log("Unable to create cache with error {} and message {}",
error.getErrorCode(), error.getMessage());
}
throw new RuntimeException(error.getCause());
}
}
public void saveItem(String cacheName, String dictionaryName, String field, String value) throws RuntimeException {
//No need to save a null value in cache
if(value != null) {
final DictionarySetFieldResponse setFieldResponse =
client.dictionarySetField(cacheName, dictionaryName, field, value).join();
if (setFieldResponse instanceof DictionarySetFieldResponse.Error error) {
if (error.getCause() instanceof LimitExceededException) {
waitRateLimiterCoolDown(dictionaryName);
this.saveItem(cacheName, dictionaryName, field, value);
} else {
logger.atError().setCause(error.getCause())
.log("Unable to create cache with error {} and message {}",
error.getErrorCode(), error.getMessage());
throw new RuntimeException(error.getCause());
}
}
}
}
public LinkedHashMap<String,String> readItem(String cacheName, String dictionaryName) throws RuntimeException {
final DictionaryFetchResponse fetchResponse =
client.dictionaryFetch(cacheName, dictionaryName).join();
if (fetchResponse instanceof DictionaryFetchResponse.Hit hit) {
return new LinkedHashMap<>(hit.valueMap());
} else if (fetchResponse instanceof DictionaryFetchResponse.Miss) {
logger.atError().log("Did not find dictionary with name {}", dictionaryName);
} else if (fetchResponse instanceof DictionaryFetchResponse.Error error) {
if(error.getCause() instanceof LimitExceededException) {
waitRateLimiterCoolDown(dictionaryName);
this.readItem(cacheName, dictionaryName);
} else {
logger.atError().setCause(error.getCause())
.log("Dictionary fetch failed with error {} and message {}",
error.getErrorCode(), error.getMessage());
throw new RuntimeException(error.getCause());
}
}
return new LinkedHashMap<>();
}
private void waitRateLimiterCoolDown(String key){
try {
logger.info("Sleeping for cooling down the rate limiter at key: {}", key);
TimeUnit.SECONDS.sleep(cooldown);
}catch (Exception e) {
logger.atError().setCause(e.getCause())
.log("Sleep failed {}",
e.getMessage());
}
}
}
Use the service to “wrap” the API Momento exposes to adapt them to your project. As you may see, I had to manage the Rate Limiter that Momento has:
Keep in mind that these are “Soft limits”. You can ask to increase them but I’m in the Free Tier and it’s also fairly easy to deal with using a retry policy. I applied a not-smart-but-reliable one, waiting few seconds to cooldown and retrying the same API call.
Deploy
As usual, let’s leverage the AWS SAM build and deploy functionalities:
When CloudFormation has finished the deploy, you can trigger the Step Functions in various ways: if this ETL will cover a real use case, maybe you can create a Scheduled EventBridge Events, for our purposes we will trigger the step function manually:
Go in the Step Functions board and select your freshly created one:
Now click on Start Execution:
Provide the Json event to start it and hit Start execution:
You will see the step function running and when it finishes you will see this “all green” diagram:
There is also other view modes of a Step function, this is the “Definition tab”, with ours definition code paired with the diagram:
If you need to define a complex Step Function you can also leverage the Workflow Studio:
But back to us, let’s see our Momento Cache working! Go to the Momento Web Console and use the List caches button on the left:
Here it is our cache! It is named after the table that I’m ETL-ing.
You can also inspect directly from Momento Web Console the cache items:
For this purpose I used the Dictionary data type because is the one that to me resembles more a row of a relational database but you can explore all the data types that Momento Cache can handle on their complete documentation.
Let’s see how this cache item was imported in DynamoDB that is the destination of our data:
Go to DynamoDB Board, you will see our new DynamoDB table:
Oh! I forgot about a thing that I learnt the hard way building this project:
When you create via AWS SDK a new DynamoDb table wou have to wait for the creation to end before been allowed to use it!
public void createTable(String table, String key) {
DynamoDbWaiter dbWaiter = dynamoDbBaseClient.waiter();
dynamoDbBaseClient.createTable(CreateTableRequest.builder()
.tableName(table)
.keySchema(KeySchemaElement.builder()
.keyType(KeyType.HASH)
.attributeName(key)
.build())
.attributeDefinitions(AttributeDefinition.builder()
.attributeName(key).attributeType(ScalarAttributeType.S).build())
.billingMode(BillingMode.PAY_PER_REQUEST)
.build());
DescribeTableRequest tableRequest = DescribeTableRequest.builder()
.tableName(table)
.build();
// Wait until the Amazon DynamoDB table is created
WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
}
After this hint that I hope will spare you debugging time, let’s inquiry DynamoDB to inspect the same element that we were inspecting on Momento Web Console:
Here it is! Our ETL works like a charm, have you done some resource provisioning? No? Me neither! 😀
Leveraging Momento Cache you can cover a lot of real world use case, whenever you need a flexible (in term of size, scaling and fast time to market) temporary cache/storage solution.
Another common use case is to leverage Momento Cache as key-value storage for your lock items in order to orchestrate distributed scheduled batches, but this is a whole different story 🙂
Last but not least I also want to mention Neon PostgreSQL: i used their Free Tier to host the source database for this project:
I hope that this will help you managing your next ETL workflow!
Ciao 😜
Lorenzo