Building Kinesis Apps with KCL
Let's see how to build applications that consume records from Kinesis using the Amazon's Kinesis Consumer Library (KCL).
Before starting, we need to understand the main Kinesis concepts:
- Stream: it's an ordered sequence of records. It’s the equivalent of a Kafka topic.
- Shard: throughput unit of a stream. Every stream has at least one shard, and as many as you want, depending on your volume needs. Each shard allows 1000 writes/s and 2000 reads/s.
- Record: unit of data. A record consists of a sequence number, a partition key and a binary blob.
And the KCL abstractions:
- App: it's an application that reads and processes data from a single stream. The output can be written to various streams, to a database, etc. KCL doesn't enforce anything on the output.
- Worker: contains the logic of the application. Each worker processes data from a single shard of the stream and there can only be one active worker per shard. That is to say, there is a one to one relationship between workers and shards.
Tying It Together: An Example
Let's say that we send raw page view events to a stream. We have an app that counts them and every five minutes stores the number of page views in a database. We also have another app that reads the raw events and enriches them, resolving the user geolocation from the IP. It then writes those enriched events to another stream so other apps can process them further.
Each app will read from the raw stream independently, at its own speed.
How does that view change if we know that the raw stream has two shards? Each app would need two workers, one per shard. And because we need two shards for the raw stream, the enriched stream will also need two shards to handle the volume of records.
The workers of the enricher app will write to both shard 1 and 2 of the enriched stream, depending on the value of the partition key of the record they write.
You have to create your own cluster of machines to run the application processes, perhaps using an auto scaling group. In our example, we could have two instances to maintain availability and run both apps in each. KCL will assign shards to a worker in each app, regardless of the instance in which they run. It will also detect if a worker becomes irresponsive and will assign its shard to an available worker of the same app.
Although not in the diagram, workers need to have access to DynamoDB. There they store the shard that they're processing and the processed offset. This way, if a worker goes down another one can continue processing the shard from the last saved offset.
Writing the App
KCL apps can be written in a number of languages, including Java, Python, Ruby and Scala. That said, all of them follow the same principles. The main logic of the application resides in a
RecordProcessor class, which has three methods:
#initialize(shard_id): it’s called before the RecordProcessor starts to process records. You can use it to initialise connections to databases or to output streams.
#process_records(records, checkpointer): this method contains the logic to process the records.
#shutdown(checkpointer, reason): it’s called just before shutting down the RecordProcessor. You can use it to shut down database connections or to close files.
Besides implementing your
RecordProcessor, you also have to configure the app. Things like how often do you want to check for new records, the name of the stream you are reading from, etc.
Writing Kinesis apps it's quite simple, but it has some important differences compared to other stream processing frameworks like Storm or Samza.
Kinesis apps should be fully featured by themselves, not parts of an app (like Storm's bolts). So avoid creating a complex graph of apps, because it's quite tedious to create streams and configure the apps to read and write to them. Imagine if you want to test them locally or have different environments.
The other big difference is that with KCL you have to take care of where and how to deploy the apps. Amazon should make it easy to create Hadoop-like clusters, where you could submit the apps and the cluster would deploy and run the apps automatically.