Thursday, 22 October 2020

Getting started with Amazon MSK

Amazon MSK is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.

Create MSK Cluster

Via Console

Please check out here

Via Amazon CDK

The source code is available here

Using CDK, we will create a VPC for Your MSK Cluster with a Single Public Subnet where you can modify lib/msk-stack.ts to meet your requirements.

    const vpc = new Vpc(this, 'AWSKafkaVPC', {
      cidr: '10.0.0.0/16',
      maxAzs: 3,
      subnetConfiguration: [
        { cidrMask: 24, name: 'kafka', subnetType: SubnetType.PUBLIC }
      ]
    });

and then CDK will provision a MSK Cluster which is also defined in lib/msk-stack.ts.

const cluster = new CfnCluster(this, 'mskCluster', {
      clusterName: 'AWSKafkaCluster',
      kafkaVersion: KafkaVersion.VERSION_2_3_1,
      encryptionInfo: {
        encryptionInTransit: {
          inCluster: true,
          clientBroker: 'TLS'
        }
      },
      numberOfBrokerNodes: 2,
      brokerNodeGroupInfo: {
        clientSubnets: [
          vpc.publicSubnets[0].subnetId,
          vpc.publicSubnets[1].subnetId,
          vpc.publicSubnets[2].subnetId
        ],

        brokerAzDistribution: 'DEFAULT',
        instanceType: KafkaInstanceType.T3_SMALL,
        storageInfo: {
          ebsStorageInfo: {
            volumeSize: 10
          }
        }
      }
    });

Copy .env.sample and paste as .env and update the environment varibles.

CDK_DEFAULT_REGION=XXXXXXXXXXXXXXXXXXXX
CDK_DEFAULT_ACCOUNT=XXXXXXXXXXXXXXXXXXX

Run npm run build to compile typescript to js

Run cdk deploy to deploy this stack to your default AWS account/region

Create a bastion machine

You need a machine to create a topic that produces and consumes data. Let's create a t2.xlarge instance of Amazon Linux 2 AMI (HVM), SSD Volume Type with Public IP enabled.

Create Kafka Topic on the bastion machine

#!/bin/sh

zookeeperConnectString="<your_zookeeper_connect_string>" # retrieved from "View Client Information" in Amazon MSK Console
kafka_topic="<your_kafka_topic>"
replication_factor=1
partitions=1

# Change directory to Kafka bin 
cd ~/kafka_2.13-2.6.0/bin/
# Execute kafka-topics.sh
./kafka-topics.sh --create --zookeeper $zookeeperConnectString --replication-factor $replication_factor --partitions $partitions --topic $kafka_topic

Produce Data

Here's a sample producer

from time import sleep
from json import dumps
from kafka import KafkaProducer

# Define Amazon MSK Brokers
brokers=['<your_msk_broker_1>:9092', '<your_msk_broker_2>:9092']
# Define Kafka topic to be produced to 
kafka_topic='<your_kafka_topic>'
# A Kafka client that publishes records to the Kafka cluster
producer = KafkaProducer(bootstrap_servers=brokers, value_serializer=lambda x: dumps(x).encode('utf-8'))
# To produce 1000 numbers from 0 to 999 
for num in range(1000):
    data = {'number' : num}
    producer.send(kafka_topic, value=data)
    sleep(1)

Consume Data

Here's a sample consumer

from kafka import KafkaConsumer
from json import loads

# Define Amazon MSK Brokers
brokers=['<your_msk_broker_1>:9092', '<your_msk_broker_2>:9092']
# Define Kafka topic to be consumed from 
kafka_topic='<your_kafka_topic>'
# A Kafka client that consumes records from a Kafka cluster
consumer = KafkaConsumer(
          kafka_topic,
          bootstrap_servers=brokers,
          auto_offset_reset='earliest',
          enable_auto_commit=True,
          group_id='my-group',
          value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
          message = message.value
          print('{}'.format(message))

No comments:

Post a Comment

A Fun Problem - Math

# Problem Statement JATC's math teacher always gives the class some interesting math problems so that they don't get bored. Today t...