Getting Started with Kinesis
Amazon Redshift is a great tool for data warehousing. At Hired, we use it in combination with Looker to provide business insights and internal dashboards for many of our internal teams. However, getting data into Redshift and keeping it up to date is a non-trivial task. There are a few great hosted solutions for this type of task (like Segment), but maybe your business case requires a more custom solution. If you find yourself in this position then you might already be thinking about a streaming solution like Kafka or Kinesis. At Hired, we chose Kinesis for a few reasons:
- Our use case is fairly straightforward, but we want something flexible enough for future use cases
- We have a small data team and managing Kafka would have been a burden
- We are already heavily invested in the AWS ecosystem
So having made our decision, it was time to get to work. Setting up a Kinesis stream is easy enough – the hardest part is estimating the amount of throughput you will require. AWS provides great APIs in many languages, so posting records to the Kinesis stream is easy as well. Though it is quite easy to set up, it’s important to understand some of the limitations that come with Kinesis before you get started. Let’s take a second to explore some of the ones that affected us the most.
In exchange for the convenience of a hosted solution, one must accept some of Kinesis’s limitations. Kinesis streams can relatively easily scale to accommodate greater write throughput. Each shard in a stream can support 1MB total of writes per second across 1000 distinct records per second. If you need to write more data to your stream, you can use more shards (adding shards to a live stream involves some subtlety, and I won’t discuss it here).
Reading data from the stream offers similar limitations, with one important difference. Each shard can support 2MB per second total for reads, but only 5 read transactions per second. Under most circumstances a consumer application would have to poll all shards in order to discover new records, so if you want a consumer application with the lowest reasonable amount of latency (at least one second between polls) then you are limited to 5 applications total. It is of course possible to have more than 5 consumer applications in practice, but if you have > 5 processes polling a single shard every second then you should expect to run into
ProvisionedThroughputExceededException errors and be equipped to handle them. This limitation was alarming at first as we were expecting something that we could scale for both reading and writing. This expectation proved to be naive, but a suitable workaround presented itself.
Now that we had a stream, the first thing to do was set up a consumer application that ships our data straight to S3. This way no matter what happens, we can eventually recover from anything. We used Kinesis firehose to achieve this. Setting up a firehose delivery stream to dump your records to S3 every few minutes can be done with a few clicks in the AWS console. One thing to note however: Firehose buffers Kinesis records by simply concatenating the payloads together. If you want to easily parse the records that Firehose writes, you’d better append a newline or similar record delimiter at the end of each record you send to Kinesis. Keep in mind also that the firehose delivery stream counts against our 5 read transactions per second. Now we have 4 transaction ‘slots’ left and the challenge becomes figuring out how to make the most of them.
If we dedicate one of the four remaining read slots to a ‘master consumer’ then we can have this consumer federate the payload it receives from the stream to secondary consumers. You can achieve behavior like this using a lambda function triggered by the Kinesis stream that asynchronously invokes child lambda functions. Be careful here, because asynchronous invocation introduces the possibility that some events may be processed out of order. Make sure any consumer application you plan to use does not require strict ordering of records.
Here is a summary of our event pipeline from Kinesis down to Redshift and S3.
This approach felt simple and elegant, but there was a problem I soon ran into…
Before I started on this project I wasn’t aware of the difference in payload size limits for Lambda functions when invoking synchronously and asynchronously (6MB and a puny 128KB respectively). Since the trigger for the federator function is our Kinesis stream, AWS will invoke my lambda function (about) once per shard per second. Each shard can only support 1 MB of writes per second, so my lambda function should have no problem ingesting new records from the stream. The problem I bumped into was that the payload size limit for asynchronous invocation is only 128 KB. I hit this limit almost immediately.
The workaround for this is to write large payloads to S3, rather than submitting them directly to the child consumers. The master consumer must submit its payload to child consumers as a JSON string. If this JSON string is larger than 128KB then the master consumer writes the payload to S3 and only submits the name of the key in S3 to the child consumers, who can then read the payload from S3.
This approach creates a lot of latency, as you might expect. On average ‘dehydrating’ the payload like this adds about 300 milliseconds to the total execution time. This is an acceptable amount of additional compute time, but the maximum amount of time added during dehydration can be on the order of seconds. This is not ideal for a process that you expect to be running about once per second. While this isn’t quite a deal-breaker for us, it certainly makes this whole solution feel like it could be improved.
The way we deal with duplication and errors is by leveraging our existing batch workflow management tool – Luigi. We use Luigi to periodically pull records in from S3, dedupe them, and ‘upsert’ them into Redshift. This gives us peace of mind that within an hour any data that might have been duplicated for some reason is sanitized, and any records that might have been dropped due to Lambda timing out etc. is added into Redshift, or wherever it needs to go. This is realistically less time than it would take someone to manually review a Dead Letter Queue or a duplicate record by hand, so we’re happy with it.
We’re still at the beginning of our journey with Kinesis, and we continue to make improvements and adjustments to this workflow. The pipeline described here gives us event data in Redshift with a latency of just over 5 minutes and a reasonably high expectation for data quality (very few examples of dropped data per week and no duplication yet). It was fairly straightforward to set up and is cheap to run (at our scale we pay under $150 per month). If you’ve got event data and an AWS account, Kinesis is a fast and easy way to get your time series data into a data warehouse or other application!