Amazon MSK is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.
Create MSK Cluster
Via Console
Please check out here
Via Amazon CDK
The source code is available here
Using CDK, we will create a VPC for Your MSK Cluster with a Single Public Subnet where you can modify lib/msk-stack.ts
to meet your requirements.
const vpc = new Vpc(this, 'AWSKafkaVPC', {
cidr: '10.0.0.0/16',
maxAzs: 3,
subnetConfiguration: [
{ cidrMask: 24, name: 'kafka', subnetType: SubnetType.PUBLIC }
]
});
and then CDK will provision a MSK Cluster which is also defined in lib/msk-stack.ts
.
const cluster = new CfnCluster(this, 'mskCluster', {
clusterName: 'AWSKafkaCluster',
kafkaVersion: KafkaVersion.VERSION_2_3_1,
encryptionInfo: {
encryptionInTransit: {
inCluster: true,
clientBroker: 'TLS'
}
},
numberOfBrokerNodes: 2,
brokerNodeGroupInfo: {
clientSubnets: [
vpc.publicSubnets[0].subnetId,
vpc.publicSubnets[1].subnetId,
vpc.publicSubnets[2].subnetId
],
brokerAzDistribution: 'DEFAULT',
instanceType: KafkaInstanceType.T3_SMALL,
storageInfo: {
ebsStorageInfo: {
volumeSize: 10
}
}
}
});
Copy .env.sample and paste as .env and update the environment varibles.
CDK_DEFAULT_REGION=XXXXXXXXXXXXXXXXXXXX
CDK_DEFAULT_ACCOUNT=XXXXXXXXXXXXXXXXXXX
Run npm run build
to compile typescript to js
Run cdk deploy
to deploy this stack to your default AWS account/region
Create a bastion machine
You need a machine to create a topic that produces and consumes data. Let's create a t2.xlarge instance of Amazon Linux 2 AMI (HVM), SSD Volume Type with Public IP enabled.
Create Kafka Topic on the bastion machine
#!/bin/sh
zookeeperConnectString="<your_zookeeper_connect_string>" # retrieved from "View Client Information" in Amazon MSK Console
kafka_topic="<your_kafka_topic>"
replication_factor=1
partitions=1
# Change directory to Kafka bin
cd ~/kafka_2.13-2.6.0/bin/
# Execute kafka-topics.sh
./kafka-topics.sh --create --zookeeper $zookeeperConnectString --replication-factor $replication_factor --partitions $partitions --topic $kafka_topic
Produce Data
Here's a sample producer
from time import sleep
from json import dumps
from kafka import KafkaProducer
# Define Amazon MSK Brokers
brokers=['<your_msk_broker_1>:9092', '<your_msk_broker_2>:9092']
# Define Kafka topic to be produced to
kafka_topic='<your_kafka_topic>'
# A Kafka client that publishes records to the Kafka cluster
producer = KafkaProducer(bootstrap_servers=brokers, value_serializer=lambda x: dumps(x).encode('utf-8'))
# To produce 1000 numbers from 0 to 999
for num in range(1000):
data = {'number' : num}
producer.send(kafka_topic, value=data)
sleep(1)
Consume Data
Here's a sample consumer
from kafka import KafkaConsumer
from json import loads
# Define Amazon MSK Brokers
brokers=['<your_msk_broker_1>:9092', '<your_msk_broker_2>:9092']
# Define Kafka topic to be consumed from
kafka_topic='<your_kafka_topic>'
# A Kafka client that consumes records from a Kafka cluster
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=brokers,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('{}'.format(message))