Data Streaming — AWS Kinesis

Sandipan Ghosh
14 min readOct 16, 2021

In my current organisation, we had implemented Kafka from end to end, from acquiring resources, installing Kafka, developing the producer, consumer and streaming applications. We also maintain the Kafka cluster along with all the applications. This approach is a colossal effort. Not every situation have the luxury of time and resources to take the above policy. In the time/effort crunch situation, AWS Kinesis can help to achieve the goal.

Kinesis is an AWS managed streaming platform that can be acquired, developed and handled efficiently and quickly.

In this article, I will put together a use case of real-time streaming using Kinesis data stream. I will use Python as a programming language.

The concept is straightforward. We have a data producer(Twitter), which we would like to capture and stream the data through a streaming application (Kinesis data stream). Then, a consumer will capture the data and process it through two machine learning models to extract sentiment and keywords from the Twitter text. Finally, we will save the final output to an S3 bucket. We can also save the raw Twitter data to the S3 bucket for future use. To do this, we will use Kinesis Firehose.

Before we move forward, let's go through few of Kinesis main components.

The AWS Kinesis Overview

  • Kinesis is an AWS managed alternative to Apache Kafka
  • Great for application logs, metrics, IoT, clickstreams
  • Great for "real-time" big data streaming
  • Great for streaming processing frameworks (Spark, NiFi, etc. )
  • Data is automatically replicated synchronously to 3 AZ
  • Kinesis Streams: low latency streaming ingest at scale
  • Kinesis Analytics: perform real-time analytics on streams using SQL
  • Kinesis Firehose: load streams into S3, Redshift, ElasticSearch & Splunk etc.
  • Kinesis Video Streams: securely stream video from connected devices to AWS for analytics, machine learning (ML), playback, and other processing

Kinesis has three parts, a producer, a data stream and a consumer. Kinesis Firehose is a particular type of consumer that reads the data from the data stream, processes the data with lambda functions(optional), compress(optional), and store in a destination like an s3 bucket.

We can interact with Kinesis in three different ways — Kinesis SDK, Kinesis producer Library(KPL) and Kinesis agent

The Kinesis SDK(both producer and consumer)

  • Boto3 for Python is an example of this. We can use API calls like putRecord(s) to put the data into the stream.
  • There are many vendor-specific 3rd party lib built on the SDK, like Kafka connects, spark, flume, nifi etc.

Kinesis Producer SDK — PutRecord(s)

  • Frequently used APIs are PutRecord (one) and PutRecords (many records)
  • PutRecords uses batching and increases throughput. You can also compress the data and reduce the data load size
  • ProvisionedThroughputExceeded if we go over the limits
  • You can use this SDK with Android, iOS, etc…
  • Use case: low throughput, higher latency, simple API, easy to use with AWS Lambda
  • You can use managed AWS sources for Kinesis Data Streams, like — Cloud watch, IoT, Kinesis data analytics etc.
  • Users need to build the logic for data distribution, exception handling and data compression

Kinesis consumer SDK — GetRecord(s)

  • Classic Kinesis — Consumers poll records from a shard(Kinesis)
  • Each shard has 2 MB total aggregate throughput
  • GetRecords returns up to 10MB of data, then throttle for 5 seconds or up to 10000 records
  • Maximum of 5 GetRecords API calls per shard per second = 200ms latency
  • If five consumers in a location consume data from the same shard, means every consumer can poll once a second and receive less than 400kbs
multiple consumer fetching data from the same shard

Kinesis Producer Library (KPL)

  • Kinesis producer library — It's a JAVA/C++ based library. I haven't seen any wrapper code for Python yet.
  • Best for long-running high-performance producers
  • Great for automation and configurable retry mechanism
  • Synchronous or Asynchronous API (use async for better performance)
  • CloudWatch metrics for monitoring
  • Automated batching — increase throughput, decrease cost
  • Collect Records and Write to multiple shards in the same PutRecords API call, so you pay less for the larger amount of data
  • Aggregate and compress — increased latency and efficiency (also pay less).
  • Capability to store multiple records in one record (go over 1000 records per second limit)
  • Increase the 'recordMaxBufferedTime' to collect multiple records and then aggregate and compress to increase payload size and improve throughput (maximize I MB/s limit)
  • Users must implement all the compression/decompression mechanism
  • KPL Records must be de-coded with KCL or a special helper library
  • This is one of the best approaches for higher efficiency, throughput, with a lot of users control, while paying less $ amount

Kinesis Client Library (KCL)

  • Java-first library but exists for other languages too (Golang, Python, Ruby)
  • Read records from Kinesis produced with the KPL (de-aggregation)
  • Share multiple shards with multiple consumers in one "group", shard discovery
  • Checkpointing feature to resume progress
  • Leverages DynamoDB for coordination and checkpointing (one row per shard). Make sure you provision enough WCU / RCU or use On-Demand for DynamoDB Otherwise, DynamoDB may slow down KCL
  • Record processors will process the data
Dynamodb for checkpoint coordination

Kinesis Agent (Both producer and consumer)

  • Monitor Log files and sends them to Kinesis Data Streams
  • Java-based agent, built on top of KPL
  • Install in Linux-based server environments
  • Write from multiple directories and write to multiple streams
  • Routing feature based on directory/logfile
  • Pre-process data before sending to streams (single line, CSV to JSON, log to JSON… )
  • The agent handles file rotation, checkpointing, and retry upon failures
  • Emits metrics to CloudWatch for monitoring
  • Best for aggregating high-frequency logs data and monitor near real-time

Kinesis Firehose

Firehoses are the opposite of Data Streams; they distribute records/data to specified endpoints configured in the Firehose from a selected source. We can also specify any transformations or formatting we'd like to apply to the data before it's pushed out in a Firehose. Firehose supports lambda functions so we can use a lambda function to achieve any inline transformation. It also supports compressions techniques and multiple destination types.

Streams vs Firehose

Amazon Kinesis offers two main products to choose from, Kinesis Streams and Kinesis Firehose.

To work with Kinesis Streams, you'll use the Kinesis Producer Library (KPL) to put data into the stream. Then, you can connect it to almost any application or process.

Kinesis Streams is not a fully managed service, which means your team need to scale it as required, manually by adding more shards. In addition, the data only stay in the stream for seven days(max), the default is 24 hours. You need to carefully plan it well in advance considering the data throughput, compression, destination types and $ values.

Kinesis Firehose is simpler to implement. Data from Firehose can be saved into Amazon S3, Amazon Redshift, or even Elasticsearch using the Kinesis Agent, and from there, you can process the data. If the data is stored in Amazon S3 or another AWS data storage system, you can leave it there for much longer than seven days. You can decide how long it stays in the system. This approach is suitable for log time rotation of data(hot/cold data layer).

What is Shard?

Shard is the base throughput unit of an Amazon Kinesis data stream. Consider this as a pipe, through which data flow in a stream from source to destination. One shard provides a capacity of 1MB/sec data input and 2MB/sec data output. Thus, one shard can support up to 1000 PUT records per second. You will specify the number of shards needed when you create a data stream. The user needs to fix the number of shards required while creating the data stream based on the requirements(keep in mind the compression and aggregation of input records can increase the throughput).

Multiple Shards

Once we have multiple shards per application, the critical concept is distributing the data across multiple shards for better performance and throughput. Our aim would be to keep every shard occupancy maximum. Using the proper unique Partition key we can achieve this.

What is Partition Key?

The partition key is used to segregate and route records to different shards of a data stream to balance the shards utilizations. A partition key is specified by your data producer while adding data to an Amazon Kinesis data stream. For example, assume you have a data stream with two shards (shards 1 and 2). You can configure your data producer to use two partition keys (key A and key B) so that all records with key A are added to shard 1, and all records with key B are added to shard 2. For example, if we use employee_id as the portion key, data would be well distributed between multiple shards. On the other hand, if we chose dept_id, then the distribution of records might not be balanced between shards, as some departments might have more employees than others. While splitting a big record in multiple chunks, we can use the same key as the partition key value, so all the chinks end up in the same shards. Then the consumer can easily capture those chunks and stitch those to recreate the original data quickly.

What is the sequence number?

A sequence number is a unique identifier for each record. The sequence number is assigned by Amazon Kinesis when a data producer calls PutRecord or PutRecords operation to add data to an Amazon Kinesis data stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between PutRecord or PutRecords requests, the larger the sequence numbers become. To put together a split record from the data stream you can use the same partition key and gradually increased sequence numbers.

Sample Application and implementations

Here is the diagram of the sample application.

Sample application
  1. My source system is Twitter.
  2. I will use EC2 machines for the producer and consumer. Both producer and consumer can be a serverless lambda function.
  3. I will write a python producer using boto3 to read the data from Twitter API and push it to the Kinesis data stream.
  4. I will use a python boto3 consumer to consume the stream and unwarp data into a JSON.
  5. On the consumer side, I will use python code to clean and pre-process the data to perper for the machine learning models.
  6. I have two models to run. One for getting the sentiment of the text and another one for extracting the keywords. Both can be replaced by AWS lambda.
  7. Finally, I will save the final data to an S3 bucket.
  8. To reduce the size of the payload, I will compress the data in the stream using gzip. You can use snappy too.

Get Twitter API access

First, you will need Twitter API access to capture the Twitter data using Python.

You can get this access by following https://developer.twitter.com/en/docs/twitter-api/getting-started/getting-access-to-the-twitter-api

Once you acquire the access, collect the access key, access token, consumer key and consumer token. You will need this to capture the data from Twitter API.

I am using tweepy python package for capturing the data

Create a new user

We need a user who will have AWS API, CLI and SDK access, so we can use the same user to connect to the Kinesis data stream from producer and consumer EC2 instances. This user also needs Kinesis and S3 full access to read/write to and from Kinesis and S3. Here are few captures of the AWS steps. Also, once you have completed the steps, do not forget to download and save the AWS access key and secret. Also, don’t forget to add this key file in gitignore file, so you don't accidentally publish the keys.

New user with programmatic access
The new users should have Kinesis full access
The new users should have S3 full access
Final step

Launch Producer and Consumer EC2 machine

Let’s create two EC2 instances, one for the producer and another one for the consumer. I don’t need any specific role/policies for ec2 to access kinesis and s3.
The below capture shows the steps to create the EC2 instances.

Use Ubuntu image for the EC2
I am using free EC2, t2 micro

I have specified the number of instances on the configuration page as two and keeping all other options to default. Note, I have not specified any IAM role.

Config details
RAM size
Put some tags

In the security group, we are opening all the required ports. Note, I am giving access to the entire world. This is not recommended in production. The EC2 instances should be behind custom VPC, subnet, route table, and internet gateway to secure the networks in production.

open ports. Be careful in production

Finally, please review and launch. Also, save the key-pair to log in to the instances.

After a few min, all the instances would be running. Now we are ready to login to producer EC2 to set up the producer process.

Before login, please change the permission of the .pem file to 400, which you have downloaded in the above step.

chmod 400 *****.pem

Producer

Let's log in and start setting up our code.

ssh -i ****.pem ubuntu@public.ip.for.producer

sudo apt-get update && sudo apt-get upgrade

sudo apt install python3-pip

Create a ‘requirements.txt’ file under the home dir ‘/home/ubuntu’. We will put all the required python packages here and use pip3 to install them.

requirements.txt

pip3 install -r ./requirements.txt

Now we are ready for the producer code. Here is the code capture. You can have the full code from the GitHub location, which is mentioned toward the end of the article.

Code part — 1

First, I am importing a few packages, including ‘tweepy’ and ‘boto3’. Note that I have kept all the keys, ids and tokens in a file called ‘settings.py’. Using this method, I am encapsulating all the secrets and not hardcoding or passing them as parameters. I can hide this file and limit the access for other users, so my keys can be secure. Also, don't forget to add this file to the gitignore file.
Then I am creating a Kinesis client called client. I will use this client to create the data stream and push the data into this stream.

Code part — 2

Next, I am defining two functions to create a stream and get the status of the stream.

Code part — 3

Next, I am using tweepy’s StdOutListener class to push the Twitter data to the stream. First, I capture the data, then dumps it as a JSON, convert it to a bytes stream, and then compress the data with the gzip method. I am using uuid as the partition key. This is a great trick to uniformly distribute the data into multiple shards. I could have used twit id as the partition key; however, I do not want to parse the data here.

Code part — 4

This is the main part of the code. First, I create the Kinesis data stream, then wait until it’s become active. Then I started collecting the data and kept pushing the data to the stream. You can use the filter methods to filter the Twitter data for any topic. Here I am using the new Bond movie name.
Now, let’s start the code.

Running the code for first time

We can stop the code at any time and re-run it easily.

Rerunning the code
Code dir structure

Once the code we run, a new data stream was created in AWS with 2 shards.

New Data Stream in AWS

Firehose

Before moving to the consumer, let’s create a firehose delivery stream, which will pick up the raw data from the Kinesis data stream (twitter_ds) and collect it into an S3 bucket.

Firehose — settings 1
Firehose — settings 2

Once the Firehose started, you can see that it began to accumulate data into the S3 bucket in a few minutes. Note that all the data is in gzip format as our producer is compressing the data before sending it to the data stream.

Data into S3

We can download the data and open it. Once downloaded, please de-compress with gzip to get the raw data. I convert the file name to .zip to open in WinZip (or similar) software.

Download the data
de-compress and open it notepad

Now we can create a consumer python code.

Consumer

We can use boto3 for the consumer as well. boto3 is easy to implement. However, this approach quickly becomes complicated if the stream has multiple shards and records are split into multiple parts. Also, users need to implement the checkpoint manually. Using Kinesis Client Library (KCL) for Python simplifies the process.
Here is a working simple consumer code using boto3. I am using multiple shards and I am keeping the records limit as 1. This is not a good practice as we will pay more for the numerous call to the API. If we increase the limit, then we can get more data in one API call. We also use ‘NextShardIterator’ for looping through the stream for the data, if no data present then the code print a message stating no data available, and wait for 5 sec before checking again. I am writing the final data into a text file. I am using ShardIteratorType as ’TRIM_HORIZON’. More about this on https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.get_records

Here is the sample capture of running both producer and consumer code.

Producer code
Consumer code

Kinesis or Kafka

It’s very difficult to say which one is better, however from my point of view, onboarding kinesis is easy and has a great amount of flaxibitlyt from a programing point of view. We can use a python program to create a topic, producer and consumer. However, keep in mind that kinesis has data limitations. Your design consideration must consider the limitation very carefully, as once you have the topic created, you can not change it.

--

--

Sandipan Ghosh

Bigdata solution architect & Lead data engg with experience in building data-intensive applications,tackling challenging architectural and scalability problems.