Blog

xdrive kafka

Parallel data transfer between Deepgreen and Kafka via Xdrive

by Eric Lam

Deepgreen just released the latest Kafka extension dgkafka that can read/write data between Deepgreen and Kakfa in parallel fashion. It reads the data from Kafka partitions simultaneously via Xdrive plugins and transfer the data to Deepgreen DB table.

We will show you how to setup the configurations and read the data from Kafka today.


1. We are going to use the customer table schema from Tpch. Here is the data structure of the customer table:

Column Type
c_custkey integer
c_name varchar(25)
c_address varcahr(40)
c_nationkey integer
c_phone varchar(15)
c_acctbal double precision
c_mktsegment varchar(10)
c_comment varchar(117)

2. Assume Kafka is running in standlone mode with localhost and port number 9092. Create a topic customer with 3 partitions in Kafka by the command below:

% bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic customer

3. Add two Xdrive endpoints into the Xdrive configuration file xdrive.toml. The value of dgkafka.xdrive_offset_endpoint must match to the name of the endpoint of xdr_kafkaoffset/xdr_kafkaoffset which is kafkaoffset in this case.

Copy and paste the following to create the configuration file xdrive.toml.

[xdrive]
dir = "/tmp/xdrive"
port = 7171
host = [ "127.0.0.1" ]


[[xdrive.mount]]
name = "kafka"
argv = ["xdr_kafka/xdr_kafka", "127.0.0.1:9092"]


[[xdrive.mount]]
name = "kafkaoffset"
argv = ["xdr_kafkaoffset/xdr_kafkaoffset", "127.0.0.1:9092"]

4. Deploy and Start the xdrive server

% xdrctl deploy xdrive.toml
% xdrctl start xdrive.toml

5. We then need to create dgkafka configuration file dgkafka.toml in toml format.

[dgkafka]
database = "template1"
user = "ericlam"
password = ""
host = "127.0.0.1"
port = 5432
sslmode = "disable"
xdrive_host = "127.0.0.1"
xdrive_port = 7171
xdrive_offset_endpoint = "kafkaoffset"
xdrive_kafka_endpoint = "kafka"

[kafka]
  [kafka.input]
  format = "csv"
  delimiter = "|"
  consumer_group = "dggrp"
  topic = "customer"
  partition_num = 3
  nwriter = 2
  ext_read_table = "customer_kafka_read"
  ext_write_table = "customer_kafka_write"
  ext_offset_table = "kafka_offset"
  
    [[kafka.input.columns]]
    name = "c_custkey"
    type = "integer"

    [[kafka.input.columns]]
    name = "c_name"
    type = "varchar(25)"

    [[kafka.input.columns]]
    name = "c_address"
    type = "varchar(40)"

    [[kafka.input.columns]]
    name = "c_nationkey"
    type = "integer"

    [[kafka.input.columns]]
    name = "c_phone"
    type = "varchar(15)"

    [[kafka.input.columns]]
    name = "c_acctbal"
    type = "double precision"

    [[kafka.input.columns]]
    name = "c_mktsegment"
    type = "varchar(10)"

    [[kafka.input.columns]]
    name = "c_comment"
    type = "varchar(117)"

  [kafka.output]
  offset_table = "kafka_offset_summary"
  output_table = "customer"

  [kafka.commit]
  max_row = 10000
  minimal_interval = -1


6. Running the dgkafka setup command and create the Database tables.

% dgkafka setup dgkakfa.toml

After running the setup, table kafka.input.ext_read_table, kafka.input.ext_write_table, the offset tables kafka.input.ext_offset_table, kafka.output.offset_table and kafka.output.output_table defined in configuration file will be created.

Create the output table manually.

drop table if exists customer;
create table customer (c_custkey integer, 
			c_name varchar(25), 
			c_address varchar(40),
			c_nationkey integer,
			c_phone varchar(15),
			c_acctbal double precision,
			c_mktsegment varchar(10),
			c_comment varchar(117));

7. Now, you are ready to run dgkafka. Let’s check the status of the topic customer by running dgkafka check

% dgkafka check dgkafka.toml
Database = template1, Topic = customer, Group = dggrp

[Last recorded offset]
Group	Topic	      Partition	 Committed	    Latest	               Start	                 End
dggrp	customer	    0	    144211	    144211	2019-05-17 03:45:02.646998 +0000 +0000	2019-05-17 03:45:05 +0000 +0000
dggrp	customer	    1	    143409	    143409	2019-05-17 03:45:02.646998 +0000 +0000	2019-05-17 03:45:05 +0000 +0000
dggrp	customer	    2	    144713	    144713	2019-05-17 03:45:02.646998 +0000 +0000	2019-05-17 03:45:05 +0000 +0000
dggrp	customer	    3	    143699	    143699	2019-05-17 03:45:02.646998 +0000 +0000	2019-05-17 03:45:05 +0000 +0000

[Offset from Kafka]
Group	Topic	      Partition	 Committed	    Latest	           Timestamp
dggrp	customer	    0	    144211	    144211	2019-05-17 04:28:35 +0000 +0000
dggrp	customer	    1	    143409	    143409	2019-05-17 04:28:35 +0000 +0000
dggrp	customer	    2	    144713	    144713	2019-05-17 04:28:35 +0000 +0000
dggrp	customer	    3	    143699	    143699	2019-05-17 04:28:35 +0000 +0000

8. If everything is fine, we are ready to load data from Kafka to Deepgreen. Open a new terminal and run dgkafka load

% dgkafka load dgkafka.toml

It will wait for the messages from Kafka forever. You may press CTRL-C to stop the process.

9. Submit the data to Kafka.

Below is the sample CSV file that will submit to Kafka:

1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even, regular platelets. regular, ironic epitaphs nag e
2|Customer#000000002|XSTf4,NCwDVaWNe6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts. blithely ironic theodolites integrate boldly: caref
3|Customer#000000003|MG9kdTD2WBHm|1|11-719-748-3364|7498.12|AUTOMOBILE| deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov
4|Customer#000000004|XxVSJsLAGtn|4|14-128-190-5944|2866.83|MACHINERY| requests. final, regular ideas sleep final accou
5|Customer#000000005|KvpyuHCplrB84WgAiGV6sYpZq7Tj|3|13-750-942-6364|794.47|HOUSEHOLD|n accounts will have to unwind. foxes cajole accor
6|Customer#000000006|sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn|20|30-114-968-4951|7638.57|AUTOMOBILE|tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious

Open another terminal and run the command $XDRIVE_HOME/plugin/csv2kafka/csv2kafka to submit the data to Kafka.

% csv2kafka -d "|" -w "|" 127.0.0.1:9092 customer data.csv

10. All data will be loaded to the output table customer.

template1=# select count(*) from customer;
 count  
--------
 150000
(1 row)

template1=# select * from customer limit 10;
 c_custkey |       c_name       |               c_address                | c_nationkey |     c_phone     | c_acctbal | c_mktsegment |     
                                               c_comment                                                    
-----------+--------------------+----------------------------------------+-------------+-----------------+-----------+--------------+-----
------------------------------------------------------------------------------------------------------------
        22 | Customer#000000022 | QI6p41,FNs5k7RZoCCVPUTkUdYpB           |           3 | 13-806-545-9701 |    591.98 | MACHINERY    | s no
d furiously above the furiously ironic ideas. 
        39 | Customer#000000039 | nnbRg,Pvy33dfkorYE FdeZ60              |           2 | 12-387-467-6509 |   6264.31 | AUTOMOBILE   | tion
s. slyly silent excuses slee
        41 | Customer#000000041 | IM9mzmyoxeBmvNw8lA7G3Ydska2nkZF        |          10 | 20-917-711-4011 |    270.95 | HOUSEHOLD    | ly r
egular accounts hang bold, silent packages. unusual foxes haggle slyly above the special, final depo
        53 | Customer#000000053 | HnaxHzTfFTZs8MuCpJyTbZ47Cm4wFOOgib     |          15 | 25-168-852-5363 |   4113.64 | HOUSEHOLD    | ar a
ccounts are. even foxes are blithely. fluffily pending deposits boost
        54 | Customer#000000054 | ,k4vf 5vECGWFy,hosTE,                  |           4 | 14-776-370-4745 |     868.9 | AUTOMOBILE   | sual
, silent accounts. furiously express accounts cajole special deposits. final, final accounts use furi
        55 | Customer#000000055 | zIRBR4KNEl HzaiV3a i9n6elrxzDEh8r8pDom |          10 | 20-180-440-8525 |   4572.11 | MACHINERY    | ully
 unusual packages wake bravely bold packages. unusual requests boost deposits! blithely ironic packages ab
        88 | Customer#000000088 | wtkjBN9eyrFuENSMmMFlJ3e7jE5KXcg        |          16 | 26-516-273-2566 |   8031.44 | AUTOMOBILE   | s ar
e quickly above the quickly ironic instructions; even requests about the carefully final deposi
        90 | Customer#000000090 | QxCzH7VxxYUWwfL7                       |          16 | 26-603-491-1238 |   7354.23 | BUILDING     | sly 
across the furiously even 
        96 | Customer#000000096 | vWLOrmXhRR                             |           8 | 18-422-845-1202 |   6323.92 | AUTOMOBILE   | pres
s requests believe furiously. carefully final instructions snooze carefully. 
       110 | Customer#000000110 | mymPfgphaYXNYtk                        |          10 | 20-893-536-2069 |   7462.99 | AUTOMOBILE   | nto 
beans cajole around the even, final deposits. quickly bold packages according to the furiously regular dept
(10 rows)