Triggering an AWS Lambda with Kafka

This document explains some aspect of setting up a Kafka AWS Lambda sink connector. It is meant to be read alongside the official documentation.

In AO, we are in the process of splitting a service into multiple services. When we create more services, we must handle another service deployment and its associated costs. One of the possible solutions is to use AWS Lambda.

Our goals are:

  • determine if it is cheaper to host an AWS Lambda instead of hosting a service in Fargate
  • evaluate if the implementation is not too complex
  • see if the technology is production-ready by company standard

So, here I go with the task of creating an AWS Lambda subscribing to a Kafka topic so we can have some answers.

The setup

Let’s say we have a Kafka topic and an AWS Lambda function. To trigger the Lambda function, we need something which translates a Kafka message into a payload the function can understand. To do this translation we can use a Kafka AWS Lambda Sink Connector.

The Kafka Connect AWS Lambda Sink Connector polls the topic to get a batch of messages from a Kafka topic. It then translates the Kafka message into a json payload and returns it to the AWS Lambda. The connector can be configured to call the lambda function synchronously, expecting a response or asynchronously, not expecting a response.

Asynchronous configuration:

In an asynchronous configuration, the AWS Lambda is triggered by the Kafka Connector. The Kafka Connector does not expect any kind of response from AWS Lambda.

Synchronous configuration:

When called synchronously the Kafka connector can optionally log the response from a lambda into a different Kafka topic. The connector can optionally log errors to an error topic, the error topic is configured through “aws.lambda.error.*”.

Handling the message from Kafka Connect

Kafka connect send a batch of message within an AWS event to the AWS Lambda. The Kafka messages are embedded in an array of “Payload” json object. The “Payload” object contains the message Key, Value and, some metadata. 

Here an example:

[
     {
     "Payload": {
         "Timestamp": 1578901765779,
         "Topic": "inputTopicName",
         "Partition": 1,
         "Offset": 5,
         "Key": "messageKey",
         "Value": "messageValue"
     },
     {
     "Payload": {
         "Timestamp": 1578901765780,
         "Topic": "inputTopicName",
         "Partition": 1,
         "Offset": 6,
         "Key": "messageKey",
         "Value": "messageValue"
     }
 ]

You can configure the connector on how it should interpret the data. You have three configuration properties:

  • header.converter
  • value.converter
  • key.converter

Example:

In yellow it is everything affected by the header.converter, in green, it is everything affected by the key.converter and in blue it is everything affected by the value.converter.

[
{
"Payload": {
"Timestamp": 1578901765779,
"Topic": "inputTopicName",
"Partition": 1,
"Offset": 5,
"Key":
"messageKey",
"Value": "messageValue"
},
{
"Payload": {
"Timestamp": 1578901765780,
"Topic": "inputTopicName",
"Partition": 1,
"Offset": 6,
"Key":
"messageKey",
"Value":
"messageValue"
}
]

When I tried to interpret the Value as Json, it failed at the deserialization even when the json has been confirmed valid. To circumvent the problem, I am currently interpreting the value as a string and deserialising the Value in the .Net application with the Newtonsoft package. The reason we must do that is the .Net library only handle binary encoding for schemaless messages.

Here is an example of how to create an AWS Lambda connector configured synchronously:

curl -X POST -H "Content-Type: application/json" --data '{
"name": "staging-awslambda-requesttagging",
"config": {
"connector.class": "io.confluent.connect.aws.lambda.AwsLambdaSinkConnector",
"tasks.max": "1",
"topics": "inputTopicName", ##Topic to consume from
"aws.lambda.function.name": "mylambdaArn",
"aws.lambda.invocation.type": "async",
"aws.lambda.batch.size": "2",
"behavior.on.error": "fail",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"confluent.topic.bootstrap.servers": "bootstrapserveraddress:port",
"confluent.topic.replication.factor": "1",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.request.timeout.ms": 20000,
"confluent.topic.retry.backoff.ms": 500,
"confluent.topic.sasl.jaas.config": "jaasconfig",
"confluent.topic.security.protocol": "securityProtocol",
"aws.lambda.response.topic": "outputTopicName", ##Topic to produce to
"aws.lambda.response.bootstrap.servers": "bootstrapserveraddress:port",
"aws.lambda.response.replication.factor": "1",
"aws.lambda.response.ssl.endpoint.identification.algorithm": "https",
"aws.lambda.response.sasl.mechanism": "PLAIN",
"aws.lambda.response.request.timeout.ms": 20000,
"aws.lambda.response.retry.backoff.ms": 500,
"aws.lambda.response.sasl.jaas.config": "jaasconfig",
"aws.lambda.response.security.protocol": "securityProtocol"
}
}' http://kafkaconnect:8083/connectors/

Conclusion

Kafka AWS Lambda Connect offers multiple configurations. The classic asynchronous fire and forget and the synchronous options.

With the following limitation on the response topic on synchronous mode, I wonder what the original intent is:

extract of the confluent documentation

I see the synchronous option like a good way to provide logging through the response topic. If you want to send a message to another Kafka topic, nothing stops you creating a Kafka Producer in your Lambda.

After some tests, the Lambda solution in our case was cheaper and the implementation easy enough. However, we deemed it’s not quite ready to meet our production needs. The reason being, we needed to implement a workaround for json value deserialization. I know this is something Confluent is currently working on, so it is only a matter of time to have a fix.