Deepgreen DB Kafka Plugin

Xdrive Kafka Plugin

The Kafka Plugin can read/write between Deepgreen DB and Kafka. JSON data format is used as communication protocol.

Mountpoint

[[xdrive.mount]]
name = "kafka"
argv = ["xdr_kafka/xdr_kafka", "kafkahost1:9092,kafkahost2:9092", "consumergroup", "10"]

# arg1 - list of kafka brokers hostname
# arg2 - kafka consumer group for multiple segment read
# arg3 - timeout in second.  plugin will timeout if no more messages received after waiting for timeout in second.

Kafka Quick Start

Try out this link to start Kafka

Create a topic customer with 3 partitions in Kafka,

(Total number of partitions MUST be more than or equal to total number of Deepgreen data segments)

% bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic customer

Read Operation

Kafka xdrive plugin uses poll method to poll the data from kafka.

Firstly, create the read-only external table,

DROP EXTERNAL TABLE IF EXISTS customer_kafka_read;
CREATE EXTERNAL TABLE customer_kafka_read ( C_CUSTKEY     INTEGER ,
                             C_NAME        VARCHAR(25) ,
                             C_ADDRESS     VARCHAR(40) ,
                             C_NATIONKEY   INTEGER ,
                             C_PHONE       VARCHAR(15) /*CHAR(15)*/ ,
                             C_ACCTBAL     DOUBLE PRECISION/*DECIMAL(15,2)*/   ,
                             C_MKTSEGMENT  VARCHAR(10) /*CHAR(10)*/ ,
                             C_COMMENT     VARCHAR(117) )
LOCATION ('xdrive://XDRIVE-HOST-PORT/kafka/customer')
FORMAT 'SPQ';

Write Operation

For writes, create a writable external table like so:

DROP EXTERNAL TABLE IF EXISTS customer_kafka_write;
CREATE WRITABLE EXTERNAL TABLE customer_kafka_write ( C_CUSTKEY     INTEGER ,
                             C_NAME        VARCHAR(25) ,
                             C_ADDRESS     VARCHAR(40) ,
                             C_NATIONKEY   INTEGER ,
                             C_PHONE       VARCHAR(15) /*CHAR(15)*/,
                             C_ACCTBAL     DOUBLE PRECISION/*DECIMAL(15,2)*/  ,
                             C_MKTSEGMENT  VARCHAR(10) /*CHAR(10)*/,
                             C_COMMENT     VARCHAR(117))
LOCATION ('xdrive://XDRIVE-HOST-PORT/kafka/customer')
FORMAT 'SPQ';

JSON Communication Protocol

JSON Array protocol is used between Kafka and Deepgreen.

["col1", "col2", "col3", ..., "colN"]

You may use the provided executable plugin/csv2kafka/csv2kafka to upload the csv file to kafka by running the command

csv2kafka kafkahost1,kafkahost2,...,kafkahostN topic csvfile

Use Case

After everything setup, run SELECT count(*) from customer_kafka_read to test it. If no data comes from kafka, 0 row will be returned. Kafka plugin will wait for data from Kafka until timeout.

During Kafka plugin waits for the data, you may upload data by uploading all csv files under /tmp/data directory to kafka topic foo with the command below,

% csv2kafka localhost:9092 foo "/tmp/data/*.csv"

Now, data will be uploaded to Deepgreen database via Kafka.

Consumer not receiving any messages?

By default, initial offsets is set to earliest offset. If you want to start over again, you have to reset the offsets with the bin/kafka-consumer-groups.sh command.

% bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group dggrp --topic test --reset-offsets  --to-earliest --execute

to review the result of offsets reset without executing,

% bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group dggrp --topic test --reset-offsets  --to-earliest

To describe the status of the consumer group,

% bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 -describe -group dggrp