XDrive Orc/Parquet Plugin lets Deepgreen DB access files with ORC and Parquet file format residing on Local Storage/Amazon S3/Hadoop HDFS File System.
Orc/Parquet file created by Hive including the partition table file can also be read by the plugin.
Xdrive Orc/Parquet plugin is written in Java and its library is located at $XDRIVE_HOME/plugin/jars/vitessedata-file-plugin.jar
.
To execute Xdrive Orc Plugin, run Java class com.vitessedata.xdrive.orc.Main
.
To execute Xdrive Parquet Plugin, run Java class com.vitessedata.xdrive.parquet.Main
.
External table DDL that refers to Orc/Parquet Plugin must have LOCATION
clause that looks like this for Reads:
LOCATION(`xdrive://XDRIVE-HOST-PORT/MOUNTPOINT/path-to-files-with-wildcard.[orc|parquet]`)
FORMAT 'SPQ'
And for Writes:
LOCATION(`xdrive://XDRIVE-HOST-PORT/MOUNTPOINT/path-to-files-with-#UUID#.[orc|parquet]`)
FORMAT 'SPQ'
Setup the writable and readable external table as below.
CREATE WRITABLE EXTERNAL TABLE W_SUPPLIER ( S_SUPPKEY INTEGER ,
S_NAME VARCHAR(25) /*CHAR(25)*/ ,
S_ADDRESS VARCHAR(40) ,
S_NATIONKEY INTEGER ,
S_PHONE VARCHAR(15) /*CHAR(15)*/ ,
S_ACCTBAL DOUBLE PRECISION /*DECIMAL(15,2)*/ ,
S_COMMENT VARCHAR(101) ) location('xdrive://127.0.0.1:50055/local_parquet/supplier/data#UUID#_#SEGID#') format 'spq';
CREATE EXTERNAL TABLE SUPPLIER( like W_SUPPLIER)location('xdrive://127.0.0.1:50055/local_parquet/supplier/*') format 'spq';
Suppose we store data in the directory /dw/data
under local hard disk, the following mountpoint would allow access to parquet/orc files in local hard disk.
# parquet local storage mountpoint
[[xdrive.mount]]
name = "local_parquet"
argv = ["/usr/bin/java", "-Xmx1G", "-cp", "jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.parquet.Main", "nfs", "/dw/data"]
# orc local storage mountpoint
[[xdrive.mount]]
name = "local_orc"
argv = ["/usr/bin/java", "-Xmx1G", "-cp", "jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.orc.Main", "nfs", "/dw/data"]
Suppose we store data in the directory /dw/data
under Amazon S3, the following mountpoint would allow access to parquet/orc files:
Make sure add the jar file jars/aws-java-sdk-bundle-1.11.271.jar
in the classpath.
# S3 Parquet Mountpoint
[[xdrive.mount]]
name = "s3_parquet"
argv = ["/usr/bin/java", "-Xmx1G", "-cp", "jars/aws-java-sdk-bundle-1.11.271.jar:jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.parquet.Main", "s3", "S3_ENDPOINT", "S3_BUCKET", "/dw/data"]
env = ["AWS_ACCESS_KEY_ID=AWS_KEYID", "AWS_SECRET_ACCESS_KEY=AWS_SECRET_KEY", "HDFS_CLIENT_CONF=path-to-hdfs-client-conf-xml"]
# S3 Orc Mountpoint
[[xdrive.mount]]
name = "s3_orc"
argv = ["/usr/bin/java", "-Xmx1G", "-cp", "jars/aws-java-sdk-bundle-1.11.271.jar:jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.orc.Main", "s3", "S3_ENDPOINT", "S3_BUCKET", "/dw/data"]
env = ["AWS_ACCESS_KEY_ID=AWS_KEYID", "AWS_SECRET_ACCESS_KEY=AWS_SECRET_KEY", "HDFS_CLIENT_CONF=path-to-hdfs-client-conf-xml"]
# S3_ENDPOINT is the endpoint of the region, e.g. s3.amazonaws.com
# S3_BUCKET is the name of the S3 bucket
# AWS_KEYID is the AWS access key id
# AWS_SECRET_KEY is the AWS secret key
To support AWS S3 Fast upload and random file access for ORC and Parquet file, you may add the configurations in HDFS_CLIENT_CONF
. If you don’t need Fast Upload, you may remove the HDFS_CLIENT_CONF
in the env
.
For more settings, please visit Hadoop-AWS module: Integration with Amazon Web Services
Example of hdfs_client.xml file,
<configuration>
<property>
<name>fs.s3a.experimental.input.fadvise</name>
<value>random</value>
</property>
<property>
<name>fs.s3a.fast.upload</name>
<value>true</value>
<description>
Use the incremental block upload mechanism with
the buffering mechanism set in fs.s3a.fast.upload.buffer.
The number of threads performing uploads in the filesystem is defined
by fs.s3a.threads.max; the queue of waiting uploads limited by
fs.s3a.max.total.tasks.
The size of each buffer is set by fs.s3a.multipart.size.
</description>
</property>
<property>
<name>fs.s3a.fast.upload.buffer</name>
<value>disk</value>
<description>
The buffering mechanism to use when using S3A fast upload
(fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
This configuration option has no effect if fs.s3a.fast.upload is false.
"disk" will use the directories listed in fs.s3a.buffer.dir as
the location(s) to save data prior to being uploaded.
"array" uses arrays in the JVM heap
"bytebuffer" uses off-heap memory within the JVM.
Both "array" and "bytebuffer" will consume memory in a single stream up to the number
of blocks set by:
fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
If using either of these mechanisms, keep this value low
The total number of threads performing work across all threads is set by
fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
work items.
</description>
</property>
<property>
<name>fs.s3a.multipart.size</name>
<value>104857600</value>
<description>How big (in bytes) to split upload or copy operations up into.
</description>
</property>
<property>
<name>fs.s3a.fast.upload.active.blocks</name>
<value>4</value>
<description>
Maximum Number of blocks a single output stream can have
active (uploading, or queued to the central FileSystem
instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.
</description>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp/</value>
<description>Comma separated list of temporary directories use for
storing blocks of data prior to their being uploaded to S3.
When unset, the Hadoop temporary directory hadoop.tmp.dir is used</description>
</property>
<property>
<name>fs.s3a.threads.max</name>
<value>20</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
<property>
<name>fs.s3a.threads.core</name>
<value>15</value>
<description>Number of core threads in the threadpool.</description>
</property>
<property>
<name>fs.s3a.connection.maximum</name>
<value>30</value>
</property>
<property>
<name>fs.s3a.max.total.tasks</name>
<value>1000</value>
<description>The number of operations which can be queued for execution</description>
</property>
<property>
<name>fs.s3a.threads.keepalivetime</name>
<value>60</value>
<description>Number of seconds a thread can be idle before being
terminated.</description>
</property>
</configuration>
Suppose we store data in the directory /dw/data
under HDFS, the following mountpoint would allow access to parquet/orc files:
# parquet hdfs file
[[xdrive.mount]]
name = "parquet"
argv = ["/usr/bin/java", "-Xmx1G", "-cp", "jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.parquet.Main", "hdfs", "/dw/data" , "hdfsnamenode", "0"]
env = ["HDFS_CLIENT_CONF=path-to-hdfs-client.xml"]
# parquet hdfs file
[[xdrive.mount]]
name = "orc"
argv = ["/usr/bin/java", "-Xmx1G", "-cp", "jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.orc.Main", "hdfs", "/dw/data" , "hdfsnamenode", "0"]
env = ["HDFS_CLIENT_CONF=path-to-hdfs-client.xml"]
If hdfsnamenode
is a service name, the value of hdfsnamenode
will start with schema hdfs://
and the value of port will be ‘0’.
You may pass the HDFS client configuration file by the environment variable HDFS_CLIENT_CONF
,
which should explicitly point to the hdfs-client.xml
file you want to use. hdfs-client.xml
is the file
with the similar format as core-site.xml
and hdfs-site.xml
.
You may concat core-site.xml
and hdfs-site.xml
to a single hdfs-client.xml
.
For non-secure connection, you’ll need to set hadoop.security.authentication
property to simple
and com.vitessedata.xdrive.security.user.name
to username in Hadoop.
<configuration>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>com.vitessedata.xdrive.security.user.name</name>
<value>hduser<value>
</property>
</configuration>
We assume that kerberos is properly installed and configured on each Deepgreen data segment.
For secure connection, you’ll need to change hadoop.security.authentication
property from simple
to kerberos
.
<configuration>
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value>
</property>
</configuration>
Kerberos Authentication uses JAAS Configuration which is passed from java command line arguments. and the Kerberos ticket cache file path is passed by environment variable KRB5CCNAME
# parquet hdfs file
[[xdrive.mount]]
name = "parquet"
argv = ["/usr/bin/java", "-Dsun.security.krb5.debug=true", "-Djavax.security.auth.useSubjectCredsOnly=false", "-Djava.security.krb5.conf=/etc/krb5.conf", "-Djava.security.auth.login.config=path-to-jaas.conf", "-Xmx1G", "-cp", "jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.parquet.Main", "hdfs", "/data" , "hdfs://mycluster", "0"]
env = ["HDFS_CLIENT_CONF=path-to-hdfs-client.xml", "KRB5CCNAME=/run/user/krb5cc/krb5cc_1051"]
# orc hdfs file
[[xdrive.mount]]
name = "orc"
argv = ["/usr/bin/java", "-Dsun.security.krb5.debug=true", "-Djavax.security.auth.useSubjectCredsOnly=false", "-Djava.security.krb5.conf=/etc/krb5.conf", "-Djava.security.auth.login.config=path-to-jaas.conf", "-Xmx1G", "-cp", "jars/vitessedata-file-plugin.jar", "com.vitessedata.xdrive.orc.Main", "hdfs", "/data" , "hdfs://mycluster", "0"]
env = ["HDFS_CLIENT_CONF=path-to-hdfs-client.xml", "KRB5CCNAME=/run/user/krb5cc/krb5cc_1051"]
Example of jaas.conf file,
com.sun.security.jgss.krb5.initiate {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
principal="gpadmin@VITESSEDATA.COM"
useKeyTab=false
useTicketCache=true
TicketCache=/run/user/krb5cc/krb5cc_1051
storeKey=true;
};
To add debug message of the kerberos security,
java -Dsun.security.krb5.debug=true
Although HDFS is resilient to failure of data-nodes, the name-node is a single repository of metadata for the system, and so a single point of failure. High-availability (HA) involves configuring fall-back name-nodes which can take over in the event of failure.
<configuration>
<property>
<name>dfs.default.uri</name>
<value>hdfs://mycluster</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>namenode1,namenode2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.namenode1</name>
<value>hostname1:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.namenode2</name>
<value>hostname2:8020</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.namenode1</name>
<value>hostname1:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.namenode2</name>
<value>hostname2:50070</value>
</property>
</configuration>
To Test the HDFS client configuration, you may run the $GPHOME/plugin/xdr_hdfs/xdr_hdfscmd
to test
the connectivity to Hadoop server.
To read the content of a file on HDFS,
LIBHDFS3_CONF=path-to-hdfs-client.xml xdr_hdfscmd cat 127.0.0.1 8020 /data/nation.csv
To print out the list of files in a directory on HDFS,
LIBHDFS3_CONF=path-to-hdfs-client.xml xdr_hdfscmd ls 127.0.0.1 8020 /data/
In case you have created ORC file by Hive, the column name(s) specified in ORC file will not be the same as you defined in Hive. The column name specified in your ORC file will be defined as (_col0, _col1, _col2, …, colN). e.g. if you have a table supplier created in Hive, you have to use the column name (_col0, _col1, _col2, …, colN) in Deepgreen instead of original column name specified in Hive.
For example,
The column matching of the table supplier
Column in Hive | Column in Deepgreen |
---|---|
S_SUPPKEY | _col0 |
S_NAME | _col1 |
S_ADDRESS | _col2 |
S_NATIONKEY | _col3 |
S_PHONE | _col4 |
S_ACCTBAL | _col5 |
S_COMMENT | _col6 |
The DDL of external table is
CREATE WRITABLE EXTERNAL TABLE W_SUPPLIER ( _col0 INTEGER ,
_col1 VARCHAR(25) /*CHAR(25)*/ ,
_col2 VARCHAR(40) ,
_col3 INTEGER ,
_col4 VARCHAR(15) /*CHAR(15)*/ ,
_col5 DOUBLE PRECISION /*DECIMAL(15,2)*/ ,
_col6 VARCHAR(101) ) location('xdrive://127.0.0.1:50055/tpch/supplier/data#UUID#_#SEGID#') format 'spq';
CREATE EXTERNAL TABLE SUPPLIER( like W_SUPPLIER)location('xdrive://127.0.0.1:50055/tpch/supplier/*') format 'spq';
For big table transfer, you may run out of heap memory in Java. You may increase the heap size in java startup command in xdrive.toml. The default Java heap size is 1G.