Thursday, 22 October 2020

Getting started with Amazon MSK

Amazon MSK is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data. ## Create MSK Cluster ### Via Console Please check out [here](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) ### Via Amazon CDK The source code is available [here](https://github.com/wingkwong/aws-playground/tree/master/msk/cdk) Using CDK, we will create a VPC for Your MSK Cluster with a Single Public Subnet where you can modify ``lib/msk-stack.ts`` to meet your requirements. ``` const vpc = new Vpc(this, 'AWSKafkaVPC', { cidr: '10.0.0.0/16', maxAzs: 3, subnetConfiguration: [ { cidrMask: 24, name: 'kafka', subnetType: SubnetType.PUBLIC } ] }); ``` and then CDK will provision a MSK Cluster which is also defined in ``lib/msk-stack.ts``. ``` const cluster = new CfnCluster(this, 'mskCluster', { clusterName: 'AWSKafkaCluster', kafkaVersion: KafkaVersion.VERSION_2_3_1, encryptionInfo: { encryptionInTransit: { inCluster: true, clientBroker: 'TLS' } }, numberOfBrokerNodes: 2, brokerNodeGroupInfo: { clientSubnets: [ vpc.publicSubnets[0].subnetId, vpc.publicSubnets[1].subnetId, vpc.publicSubnets[2].subnetId ], brokerAzDistribution: 'DEFAULT', instanceType: KafkaInstanceType.T3_SMALL, storageInfo: { ebsStorageInfo: { volumeSize: 10 } } } }); ``` Copy .env.sample and paste as .env and update the environment varibles. ``` CDK_DEFAULT_REGION=XXXXXXXXXXXXXXXXXXXX CDK_DEFAULT_ACCOUNT=XXXXXXXXXXXXXXXXXXX ``` Run ``npm run build`` to compile typescript to js Run ``cdk deploy`` to deploy this stack to your default AWS account/region ## Create a bastion machine You need a machine to create a topic that produces and consumes data. Let's create a t2.xlarge instance of Amazon Linux 2 AMI (HVM), SSD Volume Type with Public IP enabled. ## Create Kafka Topic on the bastion machine ``` #!/bin/sh zookeeperConnectString="" # retrieved from "View Client Information" in Amazon MSK Console kafka_topic="" replication_factor=1 partitions=1 # Change directory to Kafka bin cd ~/kafka_2.13-2.6.0/bin/ # Execute kafka-topics.sh ./kafka-topics.sh --create --zookeeper $zookeeperConnectString --replication-factor $replication_factor --partitions $partitions --topic $kafka_topic ``` ## Produce Data Here's a sample producer ``` from time import sleep from json import dumps from kafka import KafkaProducer # Define Amazon MSK Brokers brokers=[':9092', ':9092'] # Define Kafka topic to be produced to kafka_topic='' # A Kafka client that publishes records to the Kafka cluster producer = KafkaProducer(bootstrap_servers=brokers, value_serializer=lambda x: dumps(x).encode('utf-8')) # To produce 1000 numbers from 0 to 999 for num in range(1000): data = {'number' : num} producer.send(kafka_topic, value=data) sleep(1) ``` ## Consume Data Here's a sample consumer ``` from kafka import KafkaConsumer from json import loads # Define Amazon MSK Brokers brokers=[':9092', ':9092'] # Define Kafka topic to be consumed from kafka_topic='' # A Kafka client that consumes records from a Kafka cluster consumer = KafkaConsumer( kafka_topic, bootstrap_servers=brokers, auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: loads(x.decode('utf-8'))) for message in consumer: message = message.value print('{}'.format(message)) ```

No comments:

Post a Comment

A Fun Problem - Math

# Problem Statement JATC's math teacher always gives the class some interesting math problems so that they don't get bored. Today t...