Friday, 4 September 2020

Streaming Data to Amazon S3 Using Kafka Connect S3

There is not a direct way to stream data to Amazon S3. You need a S3 Connector. The following demostration is performed in ap-east-1 region and the Apache Kafka version is 2.2.1.

Prerequisites

You should have

  • MSK Cluster
  • Client Machine
  • Apache Kafka topic
  • Producer
  • S3 bucket

Installation

Since we are not using Confluent Cloud, we need to download and install it manually.

Go to https://www.confluent.io/hub/confluentinc/kafka-connect-s3 to download Kafka Connect S3

Extract the ZIP file contents and copy the contents to the desired location. For example, you can create a directory named /home/ec2-user/kafka-plugins then copy the connector plugin contents.

Configuration

Add this to the plugin path in your Connect properties file connect.properties

plugin.path=/home/ec2-user/kafka-plugins/

Update bootstrap.servers

bootstrap.servers=X-X.XXXXXXXXXXX-XX.XXXXXX.XX.kafka.ap-east-1.amazonaws.com:9092,X-X.XXXXXXXXXXX-XX.XXXXXX.XX.kafka.ap-east-1.amazonaws.com:9092

Update topic

topic=<YOUR_KAFKA_TOPIC>

Define desired converter key and value. By default the value is empty, if you don't specify them, you will get JDBC Sink: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. error.

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

If you want to convert the data to parquet format. You must use the AvroConverter with ParquetFormat. Attempting to use other Converter will result in a runtime exception.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

Then, define sink connector properties. A sample properties file is available under etc/ in the zip.

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=<YOUR_KAFKA_TOPIC>

s3.region=ap-east-1
s3.bucket.name=<YOUR_BUCKET>
s3.part.size=5242880
flush.size=1

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.avro.AvroFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=NONE

With flush.size > 1 and value.converter = JsonConverter , you may get org.apache.avro.AvroRuntimeException: already open error.

In order to contacting S3 successfully, you need to provide AWS credentials to authenticate for the S3 connector.

export AWS_ACCESS_KEY_ID=foo
export AWS_SECRET_ACCESS_KEY=bar

Producing the data

Here's a sample producer

from time import sleep
from json import dumps
from kafka import KafkaProducer

# Define Amazon MSK Brokers
brokers=['<YOUR_MSK_BROKER_1>:9092', '<YOUR_MSK_BROKER_2>:9092']
# Define Kafka topic to be produced to 
kafka_topic='<YOUR_KAFKA_TOPIC>'
# A Kafka client that publishes records to the Kafka cluster
producer = KafkaProducer(bootstrap_servers=brokers, value_serializer=lambda x: dumps(x).encode('utf-8'))
# To produce 1000 numbers from 0 to 999 
for num in range(1000):
    data = {'number' : num}
    producer.send(kafka_topic, value=data)
    sleep(1)

Start to produce data to the topic

python3 ./producer.py

Streaming data to S3

Run

~/kafka_2.13-2.6.0/bin/connect-standalone.sh ~/kafka-plugins/confluentinc-kafka-connect-s3-5.5.1/etc/connector.properties ~/kafka-plugins/confluentinc-kafka-connect-s3-5.5.1/etc/s3-sink.properties

Result

You should see those objects with keys:

topics/<YOUR_KAFKA_TOPIC>/partition=0/<YOUR_KAFKA_TOPIC>+0+0000000000.avro
topics/<YOUR_KAFKA_TOPIC>/partition=0/<YOUR_KAFKA_TOPIC>+0+0000000001.avro
topics/<YOUR_KAFKA_TOPIC>/partition=0/<YOUR_KAFKA_TOPIC>+0+0000000002.avro
...

image

Download the first one verify. You can either use Avor Viewer to view it and export it as json

[
  {
    "boolean": null,
    "bytes": null,
    "double": null,
    "float": null,
    "int": null,
    "long": null,
    "string": null,
    "array": null,
    "map": [
      {
        "key": {
          "boolean": null,
          "bytes": null,
          "double": null,
          "float": null,
          "int": null,
          "long": null,
          "string": "number",
          "array": null,
          "map": null
        },
        "value": {
          "boolean": null,
          "bytes": null,
          "double": null,
          "float": null,
          "int": null,
          "long": 0,
          "string": null,
          "array": null,
          "map": null
        }
      }
    ]
  }
]

or use avro-tools-1.7.7.jar (download here) to convert it back to json

java -jar avro-tools-1.7.7.jar tojson <YOUR_KAFKA_TOPIC>+0+0000000000.avro
{"number":0}

If you want multiple records inside one .avro file, increase the value of flush.size.

Common issues

NoClassDefFoundError is thrown when using io.confluent.connect.s3.format.parquet.ParquetFormat

java.lang.NoClassDefFoundError: com/google/common/base/Preconditions

Solution

Download the missing jar guava-17.0.jar and add it back to /lib

No comments:

Post a Comment

A Fun Problem - Math

# Problem Statement JATC's math teacher always gives the class some interesting math problems so that they don't get bored. Today t...