Data Streaming — AWS Kinesis
In my current organisation, we had implemented Kafka from end to end, from acquiring resources, installing Kafka, developing the producer, consumer and streaming applications. We also maintain the Kafka cluster along with all the applications. This approach is a colossal effort. Not every situation have the luxury of time and resources to take the above policy. In the time/effort crunch situation, AWS Kinesis can help to achieve the goal.
Kinesis is an AWS managed streaming platform that can be acquired, developed and handled efficiently and quickly.
In this article, I will put together a use case of real-time streaming using Kinesis data stream. I will use Python as a programming language.
The concept is straightforward. We have a data producer(Twitter), which we would like to capture and stream the data through a streaming application (Kinesis data stream). Then, a consumer will capture the data and process it through two machine learning models to extract sentiment and keywords from the Twitter text. Finally, we will save the final output to an S3 bucket. We can also save the raw Twitter data to the S3 bucket for future use. To do this, we will use Kinesis Firehose.
Before we move forward, let's go through few of Kinesis main components.
The AWS Kinesis Overview
- Kinesis is an AWS managed alternative to Apache Kafka
- Great for application logs, metrics, IoT, clickstreams
- Great for "real-time" big data streaming
- Great for streaming processing frameworks (Spark, NiFi, etc. )
- Data is automatically replicated synchronously to 3 AZ
- Kinesis Streams: low latency streaming ingest at scale
- Kinesis Analytics: perform real-time analytics on streams using SQL
- Kinesis Firehose: load streams into S3, Redshift, ElasticSearch & Splunk etc.
- Kinesis Video Streams: securely stream video from connected devices to AWS for analytics, machine learning (ML), playback, and other processing
Kinesis has three parts, a producer, a data stream and a consumer. Kinesis Firehose is a particular type of consumer that reads the data from the data stream, processes the data with lambda functions(optional), compress(optional), and store in a destination like an s3 bucket.
We can interact with Kinesis in three different ways — Kinesis SDK, Kinesis producer Library(KPL) and Kinesis agent
The Kinesis SDK(both producer and consumer)
- Boto3 for Python is an example of this. We can use API calls like putRecord(s) to put the data into the stream.
- There are many vendor-specific 3rd party lib built on the SDK, like Kafka connects, spark, flume, nifi etc.
Kinesis Producer SDK — PutRecord(s)
- Frequently used APIs are PutRecord (one) and PutRecords (many records)
- PutRecords uses batching and increases throughput. You can also compress the data and reduce the data load size
- ProvisionedThroughputExceeded if we go over the limits
- You can use this SDK with Android, iOS, etc…
- Use case: low throughput, higher latency, simple API, easy to use with AWS Lambda
- You can use managed AWS sources for Kinesis Data Streams, like — Cloud watch, IoT, Kinesis data analytics etc.
- Users need to build the logic for data distribution, exception handling and data compression
Kinesis consumer SDK — GetRecord(s)
- Classic Kinesis — Consumers poll records from a shard(Kinesis)
- Each shard has 2 MB total aggregate throughput
- GetRecords returns up to 10MB of data, then throttle for 5 seconds or up to 10000 records
- Maximum of 5 GetRecords API calls per shard per second = 200ms latency
- If five consumers in a location consume data from the same shard, means every consumer can poll once a second and receive less than 400kbs
Kinesis Producer Library (KPL)
- Kinesis producer library — It's a JAVA/C++ based library. I haven't seen any wrapper code for Python yet.
- Best for long-running high-performance producers
- Great for automation and configurable retry mechanism
- Synchronous or Asynchronous API (use async for better performance)
- CloudWatch metrics for monitoring
- Automated batching — increase throughput, decrease cost
- Collect Records and Write to multiple shards in the same PutRecords API call, so you pay less for the larger amount of data
- Aggregate and compress — increased latency and efficiency (also pay less).
- Capability to store multiple records in one record (go over 1000 records per second limit)
- Increase the 'recordMaxBufferedTime' to collect multiple records and then aggregate and compress to increase payload size and improve throughput (maximize I MB/s limit)
- Users must implement all the compression/decompression mechanism
- KPL Records must be de-coded with KCL or a special helper library
- This is one of the best approaches for higher efficiency, throughput, with a lot of users control, while paying less $ amount
Kinesis Client Library (KCL)
- Java-first library but exists for other languages too (Golang, Python, Ruby)
- Read records from Kinesis produced with the KPL (de-aggregation)
- Share multiple shards with multiple consumers in one "group", shard discovery
- Checkpointing feature to resume progress
- Leverages DynamoDB for coordination and checkpointing (one row per shard). Make sure you provision enough WCU / RCU or use On-Demand for DynamoDB Otherwise, DynamoDB may slow down KCL
- Record processors will process the data
Kinesis Agent (Both producer and consumer)
- Monitor Log files and sends them to Kinesis Data Streams
- Java-based agent, built on top of KPL
- Install in Linux-based server environments
- Write from multiple directories and write to multiple streams
- Routing feature based on directory/logfile
- Pre-process data before sending to streams (single line, CSV to JSON, log to JSON… )
- The agent handles file rotation, checkpointing, and retry upon failures
- Emits metrics to CloudWatch for monitoring
- Best for aggregating high-frequency logs data and monitor near real-time
Kinesis Firehose
Firehoses are the opposite of Data Streams; they distribute records/data to specified endpoints configured in the Firehose from a selected source. We can also specify any transformations or formatting we'd like to apply to the data before it's pushed out in a Firehose. Firehose supports lambda functions so we can use a lambda function to achieve any inline transformation. It also supports compressions techniques and multiple destination types.
Streams vs Firehose
Amazon Kinesis offers two main products to choose from, Kinesis Streams and Kinesis Firehose.
To work with Kinesis Streams, you'll use the Kinesis Producer Library (KPL) to put data into the stream. Then, you can connect it to almost any application or process.
Kinesis Streams is not a fully managed service, which means your team need to scale it as required, manually by adding more shards. In addition, the data only stay in the stream for seven days(max), the default is 24 hours. You need to carefully plan it well in advance considering the data throughput, compression, destination types and $ values.
Kinesis Firehose is simpler to implement. Data from Firehose can be saved into Amazon S3, Amazon Redshift, or even Elasticsearch using the Kinesis Agent, and from there, you can process the data. If the data is stored in Amazon S3 or another AWS data storage system, you can leave it there for much longer than seven days. You can decide how long it stays in the system. This approach is suitable for log time rotation of data(hot/cold data layer).
What is Shard?
Shard is the base throughput unit of an Amazon Kinesis data stream. Consider this as a pipe, through which data flow in a stream from source to destination. One shard provides a capacity of 1MB/sec data input and 2MB/sec data output. Thus, one shard can support up to 1000 PUT records per second. You will specify the number of shards needed when you create a data stream. The user needs to fix the number of shards required while creating the data stream based on the requirements(keep in mind the compression and aggregation of input records can increase the throughput).
Once we have multiple shards per application, the critical concept is distributing the data across multiple shards for better performance and throughput. Our aim would be to keep every shard occupancy maximum. Using the proper unique Partition key we can achieve this.
What is Partition Key?
The partition key is used to segregate and route records to different shards of a data stream to balance the shards utilizations. A partition key is specified by your data producer while adding data to an Amazon Kinesis data stream. For example, assume you have a data stream with two shards (shards 1 and 2). You can configure your data producer to use two partition keys (key A and key B) so that all records with key A are added to shard 1, and all records with key B are added to shard 2. For example, if we use employee_id as the portion key, data would be well distributed between multiple shards. On the other hand, if we chose dept_id, then the distribution of records might not be balanced between shards, as some departments might have more employees than others. While splitting a big record in multiple chunks, we can use the same key as the partition key value, so all the chinks end up in the same shards. Then the consumer can easily capture those chunks and stitch those to recreate the original data quickly.
What is the sequence number?
A sequence number is a unique identifier for each record. The sequence number is assigned by Amazon Kinesis when a data producer calls PutRecord or PutRecords operation to add data to an Amazon Kinesis data stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between PutRecord or PutRecords requests, the larger the sequence numbers become. To put together a split record from the data stream you can use the same partition key and gradually increased sequence numbers.
Sample Application and implementations
Here is the diagram of the sample application.
- My source system is Twitter.
- I will use EC2 machines for the producer and consumer. Both producer and consumer can be a serverless lambda function.
- I will write a python producer using boto3 to read the data from Twitter API and push it to the Kinesis data stream.
- I will use a python boto3 consumer to consume the stream and unwarp data into a JSON.
- On the consumer side, I will use python code to clean and pre-process the data to perper for the machine learning models.
- I have two models to run. One for getting the sentiment of the text and another one for extracting the keywords. Both can be replaced by AWS lambda.
- Finally, I will save the final data to an S3 bucket.
- To reduce the size of the payload, I will compress the data in the stream using gzip. You can use snappy too.
Get Twitter API access
First, you will need Twitter API access to capture the Twitter data using Python.
You can get this access by following https://developer.twitter.com/en/docs/twitter-api/getting-started/getting-access-to-the-twitter-api
Once you acquire the access, collect the access key, access token, consumer key and consumer token. You will need this to capture the data from Twitter API.
I am using tweepy python package for capturing the data
Create a new user
We need a user who will have AWS API, CLI and SDK access, so we can use the same user to connect to the Kinesis data stream from producer and consumer EC2 instances. This user also needs Kinesis and S3 full access to read/write to and from Kinesis and S3. Here are few captures of the AWS steps. Also, once you have completed the steps, do not forget to download and save the AWS access key and secret. Also, don’t forget to add this key file in gitignore file, so you don't accidentally publish the keys.
Launch Producer and Consumer EC2 machine
Let’s create two EC2 instances, one for the producer and another one for the consumer. I don’t need any specific role/policies for ec2 to access kinesis and s3.
The below capture shows the steps to create the EC2 instances.
I have specified the number of instances on the configuration page as two and keeping all other options to default. Note, I have not specified any IAM role.
In the security group, we are opening all the required ports. Note, I am giving access to the entire world. This is not recommended in production. The EC2 instances should be behind custom VPC, subnet, route table, and internet gateway to secure the networks in production.
Finally, please review and launch. Also, save the key-pair to log in to the instances.
After a few min, all the instances would be running. Now we are ready to login to producer EC2 to set up the producer process.
Before login, please change the permission of the .pem file to 400, which you have downloaded in the above step.
chmod 400 *****.pem
Producer
Let's log in and start setting up our code.
ssh -i ****.pem ubuntu@public.ip.for.producer
sudo apt-get update && sudo apt-get upgrade
sudo apt install python3-pip
Create a ‘requirements.txt’ file under the home dir ‘/home/ubuntu’. We will put all the required python packages here and use pip3 to install them.
pip3 install -r ./requirements.txt
Now we are ready for the producer code. Here is the code capture. You can have the full code from the GitHub location, which is mentioned toward the end of the article.
First, I am importing a few packages, including ‘tweepy’ and ‘boto3’. Note that I have kept all the keys, ids and tokens in a file called ‘settings.py’. Using this method, I am encapsulating all the secrets and not hardcoding or passing them as parameters. I can hide this file and limit the access for other users, so my keys can be secure. Also, don't forget to add this file to the gitignore file.
Then I am creating a Kinesis client called client. I will use this client to create the data stream and push the data into this stream.
Next, I am defining two functions to create a stream and get the status of the stream.
Next, I am using tweepy’s StdOutListener class to push the Twitter data to the stream. First, I capture the data, then dumps it as a JSON, convert it to a bytes stream, and then compress the data with the gzip method. I am using uuid as the partition key. This is a great trick to uniformly distribute the data into multiple shards. I could have used twit id as the partition key; however, I do not want to parse the data here.
This is the main part of the code. First, I create the Kinesis data stream, then wait until it’s become active. Then I started collecting the data and kept pushing the data to the stream. You can use the filter methods to filter the Twitter data for any topic. Here I am using the new Bond movie name.
Now, let’s start the code.
We can stop the code at any time and re-run it easily.
Once the code we run, a new data stream was created in AWS with 2 shards.
Firehose
Before moving to the consumer, let’s create a firehose delivery stream, which will pick up the raw data from the Kinesis data stream (twitter_ds) and collect it into an S3 bucket.
Once the Firehose started, you can see that it began to accumulate data into the S3 bucket in a few minutes. Note that all the data is in gzip format as our producer is compressing the data before sending it to the data stream.
We can download the data and open it. Once downloaded, please de-compress with gzip to get the raw data. I convert the file name to .zip to open in WinZip (or similar) software.
Now we can create a consumer python code.
Consumer
We can use boto3 for the consumer as well. boto3 is easy to implement. However, this approach quickly becomes complicated if the stream has multiple shards and records are split into multiple parts. Also, users need to implement the checkpoint manually. Using Kinesis Client Library (KCL) for Python simplifies the process.
Here is a working simple consumer code using boto3. I am using multiple shards and I am keeping the records limit as 1. This is not a good practice as we will pay more for the numerous call to the API. If we increase the limit, then we can get more data in one API call. We also use ‘NextShardIterator’ for looping through the stream for the data, if no data present then the code print a message stating no data available, and wait for 5 sec before checking again. I am writing the final data into a text file. I am using ShardIteratorType as ’TRIM_HORIZON’. More about this on https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.get_records
Here is the sample capture of running both producer and consumer code.
Kinesis or Kafka
It’s very difficult to say which one is better, however from my point of view, onboarding kinesis is easy and has a great amount of flaxibitlyt from a programing point of view. We can use a python program to create a topic, producer and consumer. However, keep in mind that kinesis has data limitations. Your design consideration must consider the limitation very carefully, as once you have the topic created, you can not change it.
Github: https://github.com/ghoshm21/spark_book/tree/main/ch06_Clud_computation/Kinesis_dataStream
TBD — Parsing the consumer data and run the ML on the data.
Please contact me for anything — contact@sandipan.tech
Resources: