Deepgreen DB Kafka Plugin

Xdrive Kafka Plugin

The Kafka Plugin can read/write between Deepgreen DB and Kafka. CSV or JSON data format can be used as communication protocol.

The Kafka Plugin consists of two xdrive plugins xdr_kafka and xdr_kafkaoffset. The xdrive plugins are used together with plsql functions in order to support read/write between Kafka and Deepgreen from multiple data segments in parallel.

We are using librdkafka for Kafka client connection so you may check out the client SSL configuration details here.

Mountpoint of xdr_kafka

xdr_kafka plugin is for reading and writing from/to the Kafka. It supports new Kafka streaming APIs.

[[xdrive.mount]]
name = "kafka"
argv = ["xdr_kafka/xdr_kafka", "kafkahost1:9092,kafkahost2:9092"]

# arg1 - list of kafka brokers hostname

For SSL support, you can use -X key=value pair to supply SSL configurations in the command line argument.

[[xdrive.mount]]
name = "kafka"
argv = ["xdr_kafka/xdr_kafka", 
     "-X", "security.protocol=SSL",
     "-X", "ssl.key.location=/var/config/kafka/ca-key",
     "-X", "ssl.key.password=kafka123",
     "-X", "ssl.certificate.location=/var/config/kafka/ca-cert",
     "-X", "ssl.ca.location=/var/config/kafka/ca-cert",
     "kafkahost1:9092,kafkahost2:9092"]

# arg1 - list of kafka brokers hostname

Mountpoint of xdr_kafkaoffset

xdr_kafkaoffset plugin is used to get the committed offset of a consumergroup and topic. It is used for offset management in Deepgreen.

[[xdrive.mount]]
name = "kafkaoffset"
argv = ["xdr_kafkaoffset/xdr_kafkaoffset", "kafkahost1:9092,kafkahost2:9092"]

# arg1 - list of kafka brokers hostname

For SSL support, you can use -X key=value pair to supply SSL configuration in the command line argument.

You can refer to the document here

[[xdrive.mount]]
name = "kafkaoffset"
argv = ["xdr_kafkaoffset/xdr_kafkaoffset",
     "-X", "security.protocol=SSL",
     "-X", "ssl.key.location=/var/config/kafka/ca-key",
     "-X", "ssl.key.password=kafka123",
     "-X", "ssl.certificate.location=/var/config/kafka/ca-cert",
     "-X", "ssl.ca.location=/var/config/kafka/ca-cert",
     "kafkahost1:9092,kafkahost2:9092"]

# arg1 - list of kafka brokers hostname

Kafka Quick Start

Try out this link to start Kafka

Create a topic customer with 3 partitions in Kafka,

% bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic customer

Read Operation

Kafka xdrive plugin uses poll method to poll the data from kafka.

Firstly, create the read-only external table,

DROP EXTERNAL TABLE IF EXISTS customer_kafka_read;
CREATE EXTERNAL TABLE customer_kafka_read ( C_CUSTKEY     INTEGER ,
                             C_NAME        VARCHAR(25) ,
                             C_ADDRESS     VARCHAR(40) ,
                             C_NATIONKEY   INTEGER ,
                             C_PHONE       VARCHAR(15) /*CHAR(15)*/ ,
                             C_ACCTBAL     DOUBLE PRECISION/*DECIMAL(15,2)*/   ,
                             C_MKTSEGMENT  VARCHAR(10) /*CHAR(10)*/ ,
                             C_COMMENT     VARCHAR(117) )
LOCATION ('xdrive://XDRIVE-HOST-PORT/kafka/customer')
FORMAT 'SPQ';

Write Operation

For writes, create a writable external table like so:

DROP EXTERNAL TABLE IF EXISTS customer_kafka_write;
CREATE WRITABLE EXTERNAL TABLE customer_kafka_write ( C_CUSTKEY     INTEGER ,
                             C_NAME        VARCHAR(25) ,
                             C_ADDRESS     VARCHAR(40) ,
                             C_NATIONKEY   INTEGER ,
                             C_PHONE       VARCHAR(15) /*CHAR(15)*/,
                             C_ACCTBAL     DOUBLE PRECISION/*DECIMAL(15,2)*/  ,
                             C_MKTSEGMENT  VARCHAR(10) /*CHAR(10)*/,
                             C_COMMENT     VARCHAR(117))
LOCATION ('xdrive://XDRIVE-HOST-PORT/kafka/customer')
FORMAT 'SPQ';

You may also limit the number of xdrive workers for write by setting the location URL as follow.

xdrive://host:port/mountpoint?nwriter=5/...
e.g. xdrive://127.0.0.1:50051/kafka?nwriter=5/customer

Kafka Offset Table

Create the Kafka offset table with the data type in exact order. Only one offset table should be created for multiple Kafka external tables per database.

DROP EXTERNAL TABLE IF EXISTS kafka_offset;
CREATE EXTERNAL TABLE kafka_offset (consumer_group text,
                      topic text,
                      partition_number integer,
                      committed_offset bigint,
                      ts timestamp)
LOCATION ('xdrive://XDRIVE-HOST-PORT/kafkaoffset/')
FORMAT 'SPQ';

Usage of xdr_kafka plugin

The xdr_kafka plugin is used for read/write between Kafka and Deepgreen. The plugin will return results when it received the requested number of records (limit) or timeout.

You may run the SELECT statment with parameters inside dg_utils.xdrive_query.

To read data from Kafka,

SQL> select * from kafka_r where dg_utils.xdrive_query($$group=dggrp&start=500,500,500&limit=100$$);
  • group is the name of consumer group (Mandatory)
  • start is the partition offset to start with (Comma as separator between offsets). Possible values are natural number, earliest = -2, latest = -1. If start is not provided. The default offset is latest (-1). (Optional)
  • timeout is the timeout value in second. The default value is -1 second which means plugin will exit as long as no message received. (Optional)
  • limit is the limit of records received. The default value is 1000 records (Optional)

To write data to Kafka, run an INSERT statement to write data to Kafka.

SQL> insert into customer_kafka_w values (1, 'eric', 1.0);

Usage of xdr_kafkaoffset plugin

The xdr_kafkaoffset plugin is used for getting the committed offset from Kafka.

To get committed offset and latest offset from Kafka, run SELECT statment with parameters inside dg_utils.xdrive_query.

SQL> select * from kafkaoffset_r where dg_utils.xdrive_query($$group=dggrp&topic=test$$);

consumer_group | topic | partition_number | committed_offset | latest_offset |         ts 
         
----------------+-------+------------------+------------------+---------------+------------
---------
 dggrp          | test  |                0 |            -1001 |           319 | 2018-10-10 
02:24:32
 dggrp          | test  |                1 |            -1001 |           335 | 2018-10-10 
02:24:32
 dggrp          | test  |                2 |            -1001 |           346 | 2018-10-10 
02:24:32
(3 rows)

  • group is the name of consumer group (Mandatory)
  • topic is the name of the topic (Mandatory)

Communication Protocol

Both CSV and JSON formats are supported as message protocol in Kafka. You can specify the format in CREATE TABLE DDL FORMAT field. Set format 'SPQ' for JSON format and format 'CSV' for CSV format.

For JSON format, JSON Array protocol is used,

["col1", "col2", "col3", ..., "colN"]

You may use the provided executable plugin/csv2kafka/csv2kafka to upload the csv file to kafka by running the command

csv2kafka -X key1=value1 -X key2=value2 kafkahost1,kafkahost2,...,kafkahostN topic csvfile

To support SSL,

% csv2kafka -X security.protocol=SSL
-X ssl.key.location=/home/dg/config/kafka/ca-key
-X ssl.key.password=kafka123
-X ssl.certificate.location=/home/dg/config/kafka/ca-cert
-X ssl.ca.location=/home/dg/config/kafka/ca-cert  localhost:9092 topic data.csv

Consumer not receiving any messages?

By default, initial offsets is set to earliest offset. If you want to start over again, you have to reset the offsets with the bin/kafka-consumer-groups.sh command.

% bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group dggrp --topic test --reset-offsets  --to-earliest --execute

to review the result of offsets reset without executing,

% bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group dggrp --topic test --reset-offsets  --to-earliest

To describe the status of the consumer group,

% bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 -describe -group dggrp

Kafka SSL Configuration

We are using librdkafka for Kafka client connection so you may check out the SSL configuration details here. For setting up SSL server authentication, please refer to the document.

Example of Kafka broker configuration server.properties

listeners=SSL://:9092
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.truststore.location=/home/dg/config/kafka/kafka.server.truststore.jks
ssl.truststore.password=kafka123
ssl.keystore.location=/home/dg/config/kafka//kafka.server.keystore.jks
ssl.keystore.password=kafka123
ssl.key.password=kafka123

Example of Kafka consumer and producer configuration consumer.properties and producer.properties

security.protocol=SSL
ssl.truststore.location=/home/dg/config/kafka/kafka.client.truststore.jks
ssl.truststore.password=kafka123

ssl.keystore.location=/home/dg/config/kafka/kafka.server.keystore.jks
ssl.keystore.password=kafka123
ssl.key.password=kafka123