Thursday, 22 October 2020
Getting started with Amazon MSK
Amazon MSK is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.
## Create MSK Cluster
### Via Console
Please check out [here](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)
### Via Amazon CDK
The source code is available [here](https://github.com/wingkwong/aws-playground/tree/master/msk/cdk)
Using CDK, we will create a VPC for Your MSK Cluster with a Single Public Subnet where you can modify ``lib/msk-stack.ts`` to meet your requirements.
```
const vpc = new Vpc(this, 'AWSKafkaVPC', {
cidr: '10.0.0.0/16',
maxAzs: 3,
subnetConfiguration: [
{ cidrMask: 24, name: 'kafka', subnetType: SubnetType.PUBLIC }
]
});
```
and then CDK will provision a MSK Cluster which is also defined in ``lib/msk-stack.ts``.
```
const cluster = new CfnCluster(this, 'mskCluster', {
clusterName: 'AWSKafkaCluster',
kafkaVersion: KafkaVersion.VERSION_2_3_1,
encryptionInfo: {
encryptionInTransit: {
inCluster: true,
clientBroker: 'TLS'
}
},
numberOfBrokerNodes: 2,
brokerNodeGroupInfo: {
clientSubnets: [
vpc.publicSubnets[0].subnetId,
vpc.publicSubnets[1].subnetId,
vpc.publicSubnets[2].subnetId
],
brokerAzDistribution: 'DEFAULT',
instanceType: KafkaInstanceType.T3_SMALL,
storageInfo: {
ebsStorageInfo: {
volumeSize: 10
}
}
}
});
```
Copy .env.sample and paste as .env and update the environment varibles.
```
CDK_DEFAULT_REGION=XXXXXXXXXXXXXXXXXXXX
CDK_DEFAULT_ACCOUNT=XXXXXXXXXXXXXXXXXXX
```
Run ``npm run build`` to compile typescript to js
Run ``cdk deploy`` to deploy this stack to your default AWS account/region
## Create a bastion machine
You need a machine to create a topic that produces and consumes data. Let's create a t2.xlarge instance of Amazon Linux 2 AMI (HVM), SSD Volume Type with Public IP enabled.
## Create Kafka Topic on the bastion machine
```
#!/bin/sh
zookeeperConnectString="" # retrieved from "View Client Information" in Amazon MSK Console
kafka_topic=""
replication_factor=1
partitions=1
# Change directory to Kafka bin
cd ~/kafka_2.13-2.6.0/bin/
# Execute kafka-topics.sh
./kafka-topics.sh --create --zookeeper $zookeeperConnectString --replication-factor $replication_factor --partitions $partitions --topic $kafka_topic
```
## Produce Data
Here's a sample producer
```
from time import sleep
from json import dumps
from kafka import KafkaProducer
# Define Amazon MSK Brokers
brokers=[':9092', ':9092']
# Define Kafka topic to be produced to
kafka_topic=''
# A Kafka client that publishes records to the Kafka cluster
producer = KafkaProducer(bootstrap_servers=brokers, value_serializer=lambda x: dumps(x).encode('utf-8'))
# To produce 1000 numbers from 0 to 999
for num in range(1000):
data = {'number' : num}
producer.send(kafka_topic, value=data)
sleep(1)
```
## Consume Data
Here's a sample consumer
```
from kafka import KafkaConsumer
from json import loads
# Define Amazon MSK Brokers
brokers=[':9092', ':9092']
# Define Kafka topic to be consumed from
kafka_topic=''
# A Kafka client that consumes records from a Kafka cluster
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=brokers,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('{}'.format(message))
```
Subscribe to:
Post Comments (Atom)
A Fun Problem - Math
# Problem Statement JATC's math teacher always gives the class some interesting math problems so that they don't get bored. Today t...
-
SHA stands for Secure Hashing Algorithm and 2 is just a version number. SHA-2 revises the construction and the big-length of the signature f...
-
Contest Link: [https://www.e-olymp.com/en/contests/19775](https://www.e-olymp.com/en/contests/19775) Full Solution: [https://github.com/...
No comments:
Post a Comment