Kinesis Data Streams with Java

What is Data Streaming ?

Cansu Candan
10 min readSep 6, 2023

Data streaming, also known as stream processing or event streaming, involves the continuous transmission of data as it’s generated. This approach allows for real-time processing and analysis, providing immediate insights.

There are use cases for event streaming like multiplayer games, real-time fraud detection, and social media feeds, to stock trading platforms and GPS trackin.

What is AWS Kinesis Data Streams?

AWS Kinesis is a serverless streaming service that can be used to build highly scalable event-driven applications. It allows you to ingest various types of real-time data, such as video, audio, logs, clickstreams, and IoT telemetry, for purposes like machine learning, analytics, and more. Amazon Kinesis enables you to process and analyze data as it arrives, offering real-time responsiveness instead of waiting for all data to accumulate before processing begins.

Amazon Kinesis Data Streams is a serverless streaming data service that simplifies the capture, processing, and storage of data streams at any scale.

Key Concepts

Kinesis Data Stream

Kinesis Data Streams enables real-time processing of streaming big data. It provides ordering of records, as well as the ability to read and/or replay records in the same order to multiple Amazon Kinesis Applications.

Data Record

A data record is the unit of data stored in a Kinesis data stream. Data records are composed of a sequence number, a partition key, and a data blob, which is an immutable sequence of bytes. Kinesis Data Streams does not inspect, interpret, or change the data in the blob in any way. A data blob can be up to 1 MB.

AWS SDK:

Aws SDK has two APIs for writing: putRecord API, putRecords API

Producer

Producers put records into Amazon Kinesis Data Streams. For example, a web server sending log data to a stream is a producer.

Consumer

Consumers get records from Amazon Kinesis Data Streams and process them. These consumers are known as Amazon Kinesis Data Streams Application.

Shard

A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity. Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). The data capacity of your stream is a function of the number of shards that you specify for the stream. The total capacity of the stream is the sum of the capacities of its shards.

If your data rate increases, you can increase or decrease the number of shards allocated to your stream.

Partition Key

A partition key is used to group data by shard within a stream. Kinesis Data Streams segregates the data records belonging to a stream into multiple shards. It uses the partition key that is associated with each data record to determine which shard a given data record belongs to. Partition keys are Unicode strings, with a maximum length limit of 256 characters for each key. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards. When an application puts data into a stream, it must specify a partition key. A record with a particular Partition Key goes to a specific Shard throughout the life of the Shard. This is how order of records are maintained in the Shard. AWS guarantees the order of consuming to always be the same in which the records were inserted.

Sequence Number

Each data record has a sequence number that is unique per partition-key within its shard. Kinesis Data Streams assigns the sequence number after you write to the stream with client.putRecords or client.putRecord. Sequence numbers for the same partition key generally increase over time. The longer the time period between write requests, the larger the sequence numbers become.

In this article, I’ll explain how we can build a simple AWS Kinesis data stream pipeline.

In this scenario, the purposes are:

  • Step 1: Create AWS S3 Bucket, AWS Glue Database and create delivery stream using Kinesis Data Firehouse
  • Step 2: Generate random iot data or get from API service
  • Step 3: Create kinesis data stream and write this data to s3 using Kinesis Firehouse and Glue Data Catalog
  • Step 4: Writing parquet data into an S3 bucket as monthly

Step 1: Create AWS S3 Bucket, AWS Glue Database and create delivery stream using Kinesis Data Firehouse

  1. Create a maven project

2. Create s3 bucket on AWS

AWS S3 -> Buckets -> Create Bucket

3. Create AWS Glue Database

AWS Glue -> Databases -> Create Database

4. Create AWS Glue Table

AWS Glue -> Tables -> Create Table

Set table properties:

Choose or define schema:

Review and create:

5. Create AWS Kinesis Firehouse:

Step 2: Generate random iot data or get from API service

1. Generate random iot data

package org.example;
import java.io.FileWriter;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
public class GenerateTimeSeriesData {
private static final String CSV_FILE_PATH = "./iot_data.csv";
private static final int ROW_COUNT = 10000;
private static final int min = 100000; // Minimum 6-digit value
private static final int ASSET_ID_COUNT = 12;
private static final int max = 999999; // Maximum 6-digit value
public static void generateAndWriteCsvData() {
try (FileWriter writer = new FileWriter(CSV_FILE_PATH)) {
//Create header row
writer.write("Timestamp,AssetID, Value\n");
//Generate and write data rows
Random random = new Random();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

for (int i = 0; i < ROW_COUNT; i++) {
int assetId = random.nextInt(ASSET_ID_COUNT) + 100000; // Generate 6-digit asset ID
LocalDateTime timestamp = LocalDateTime.now().minusSeconds(random.nextInt(60 * 60 * 24 * 365)); // Random date within the past year
double value = random.nextDouble();
String row = timestamp.format(formatter) + ","+assetId+ ","+ value +"\n";
writer.write(row);

}
writer.close(); // Ensure the CSV file is properly closed
System.out.println("CSV file created successfully.");
} catch (IOException e) {
e.printStackTrace();
}
}
}

2. Read Csv File

package org.example;

import com.opencsv.CSVReader;
import com.opencsv.exceptions.CsvException;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class CsvReader {

private static final String CSV_FILE_PATH = "./iot_data.csv";

public static List<PDMItem> readCsvFile() throws IOException, CsvException {
List<PDMItem> pdmItems = new ArrayList<>();

try (CSVReader reader = new CSVReader(new FileReader(CSV_FILE_PATH))) {
List<String[]> csvData = reader.readAll();

boolean isFirstRow = true;

// Assuming the CSV structure is: assetDate, assetID, assetValue
for (String[] row : csvData) {
if (isFirstRow) {
isFirstRow = false;
continue; // Skip the header row
}

if (row.length >= 3) {
String assetDate = row[0];
String assetID = row[1];
double assetValue;

try {
assetValue = Double.parseDouble(row[2]);
} catch (NumberFormatException e) {
// Handle the case where the asset value is not a valid double
// You can choose to skip the row or handle the error as per your requirements
System.out.println("Invalid asset value: " + row[2]);
continue; // Skip the row with an invalid asset value
}

PDMItem pdmItem = new PDMItem(assetDate, assetID, assetValue);
pdmItems.add(pdmItem);
}
}
}

return pdmItems;

}
}

3. PDMItem Class

package org.example;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;

public class PDMItem {

@JsonProperty("asset_date")
private String assetdate;

@JsonProperty("asset_id")
private String assetid;

@JsonProperty("asset_value")
private double assetvalue;

public PDMItem(){};
public PDMItem(String asset_date,String asset_id , double asset_value) {
this.assetdate = asset_date;
this.assetid = asset_id;
this.assetvalue = asset_value;
}

public String getAssetdate() {
return assetdate;
}

public void setAssetdate(String assetdate) {
this.assetdate = assetdate;
}

public String getAssetid() {
return assetid;
}

public void setAssetid(String assetid) {
this.assetid = assetid;
}

public double getAssetvalue() {
return assetvalue;
}

public void setAssetvalue(double assetvalue) {
this.assetvalue = assetvalue;
}

@Override
public String toString() {
return String.format("{Date='%s', asset_id='%s', Value=%f}",
assetdate, assetid, assetvalue);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PDMItem orderItem = (PDMItem) o;
return Double.compare(orderItem.assetvalue, assetvalue) == 0 &&
Objects.equal(assetdate, orderItem.assetdate) &&
Objects.equal(assetid, orderItem.assetid) ;
}

@Override
public int hashCode() {
return Objects.hashCode(assetdate, assetid, assetvalue);
}
}

4. PDM Class

package org.example;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class PDM {
@JsonProperty("asset_items")
private List<PDMItem> pdmItems = new ArrayList<>();

public PDM(List<PDMItem> pdmItems) {
this.pdmItems = pdmItems;

}

public List<PDMItem> getPdmItems() {
return pdmItems;
}

public void setPdmItems(List<PDMItem> pdmItems) {
this.pdmItems = pdmItems;
}

@Override
public String toString() {
return String.format("Asset{PdmItems=%s}",
Arrays.toString(pdmItems.toArray()));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PDM pdm = (PDM) o;
return Objects.equal(pdmItems, pdm.pdmItems);
}

@Override
public int hashCode() {
return Objects.hashCode(pdmItems);
}
}

Step 3: Create kinesis data stream and write this data to s3 using Kinesis Firehouse and Glue Data Catalog

Write Records — Kinesis Streams API with AWS SDK

AWS SDK provides few APIs to manage and insert data records. Most notable being PutRecord and PutRecords. The PutRecords operation sends multiple records to your stream per HTTP request, and the singular PutRecord operation sends records to your stream one at a time (a separate HTTP request is required for each record). You should prefer using PutRecords for most applications because it will achieve higher throughput per data producer.

A sample code for PutRecord

package org.example;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.exceptions.CsvException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.firehose.FirehoseClient;
import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
import software.amazon.awssdk.services.firehose.model.Record;
import software.amazon.awssdk.services.firehose.model.PutRecordResponse;


import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.UUID;

public class PutRecordApp {

static final Logger logger = LogManager.getLogger(PutRecordApp.class);
static final ObjectMapper objMapper = new ObjectMapper();

public static void main( String[] args ) throws IOException, CsvException {
logger.info("Starting PutRecord Producer");
String streamName = ""; //

// Instantiate the client
var client = KinesisClient.builder().build();

var firehoseClient = FirehoseClient.builder()
.region(Region.US_EAST_1) // Specify your desired region
.build();

// Add shutdown hook to cleanly close the client when program terminates
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down program");
client.close();
}, "producer-shutdown"));


List<PDMItem> pdmItems = CsvReader.readCsvFile();
for (PDMItem pdmItem : pdmItems) {

logger.info(String.format("Generated %s", pdmItem));
try {
// Construct a single PutRecord request
var putRequest = PutRecordRequest.builder()
.deliveryStreamName(streamName)
.record(Record.builder()
.data(SdkBytes.fromByteArray(objMapper.writeValueAsBytes(pdmItem)))
.build())
.build();

PutRecordResponse response = firehoseClient.putRecord(putRequest);
logger.info("Successfully sent data to Kinesis Data Firehose. Record ID: " + response.recordId());
} catch (SdkClientException e) {
logger.error("Failed to send data to Kinesis Data Firehose.", e);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

// Introduce an artificial delay for demonstration purposes / visual tracking of logging
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

Step 4: Writing parquet data into an S3 bucket as monthly

  1. Create kinesis data stream
aws kinesis create-stream - stream-name kinesis-data-stream - shard-count 2

2. Maven build to build Java project

mvn clean package

3. Run Java project jar file with dependencies

java -jar target/aws-kinesis-putrecord-jar-with-dependencies.jar kinesis-data-stream

4. Write parquet file in AWS S3 Bucket

Conclusion

Real-time data streaming has become indispensable to analyze and process data in real time instead of waiting hours, days, or weeks to get answers.

With AWS real-time data streaming services, we built AWS Kinesis Data Stream pipeline. This pipeline ingests how to produce record using AWS real-time services. We created dummy data and we produced each data as record with AWS Kinesis. We used AWS Firehouse delivery stream is a resource that enables you to transport data between a specified source and a specified destination. Then we used AWS Glue Database to write record as parquet to S3.

Feel free to contact me if there is a problem or share with me if you have a better solution. 😉

Resources:

--

--