JDBC Teradata API

Note

  • This is an experimental API.
  • NOT vetted yet to be used at scale in production.
  • Please contact Gimel Team if you consider using this API for your usecases.

Read API

Create a Teradata table

CREATE MULTISET TABLE scratch_db.flights_lookup_cancellation_code ,NO FALLBACK ,
     NO BEFORE JOURNAL,
     NO AFTER JOURNAL,
     CHECKSUM = DEFAULT,
     DEFAULT MERGEBLOCKRATIO
     (
      code VARCHAR(100) CHARACTER SET LATIN CASESPECIFIC,
      description VARCHAR(4000) CHARACTER SET LATIN CASESPECIFIC)
 PRIMARY INDEX (code) ;

Create Hive Table Pointing to Teradata table

create external table pcatalog.flights_lookup_carrier_code_td (
      code string,
      description string
      )
LOCATION 'hdfs:///tmp/pcatalog/teradata_smoke_test'
TBLPROPERTIES (
  'gimel.jdbc.driver.class'='com.teradata.jdbc.TeraDriver',
  'gimel.jdbc.input.table.name'='scratch_db.flights_lookup_carrier_code',
  'gimel.jdbc.url'='jdbc:teradata://td_server',
  'gimel.jdbc.output.table.name'='scratch_db.flights_lookup_carrier_code',
  'gimel.storage.type'='JDBC'
);

Setting password for JDBC datasource

Password for JDBC data sources are specified in the file stored in any HDFS location. If not specified, default the API will look for password file at location hdfs:///user/$USER/password/teradata/pass.dat . This file will contain password in the format jdbc_url/$USER,password .

e.g. The password for Teradata can be specified in file hdfs:///user/$USER/password/teradata/pass.dat as:

td_server/USER,PASSWORD

Password for JDBC data source can be specified as agruments in the APIs in two ways: 1. Spark Configuration parameters:

Use ```--conf spark.jdbc.p.file=PASSWORD_FILE_PATH```
  1. As options in READ/WRITE APIs:

    val options= Map(("spark.jdbc.p.file", "hdfs:///user/$USER/password/teradata/pass.dat"))

Options for READ API

READ without FASTEXPORT
Option Default Values to specify
spark.jdbc.username spark.user Username
spark.jdbc.p.file hdfs:///user/$USER/password/teradata/pass.dat An HDFS file path containing password
charset ASCII ASCII,UTF8
READ with FASTEXPORT
Option Default Values to specify
spark.jdbc.username spark.user Username
spark.jdbc.p.file hdfs:///user/$USER/password/teradata/pass.dat An HDFS file path containing password
gimel.jdbc.read.type - FASTEXPORT
charset ASCII ASCII,UTF8
SESSIONS 5 Required number of sessions
partitions 4 Required number of partitions
partitionColumn 10 The column on which the data to be partitioned
columnLowerBound 0 Lower bound of the values in partition column, if known
columnUpperBound 20 Upper bound of the values in partition column, if known

Common Imports

import com.paypal.gimel._
import org.apache.spark._
import org.apache.spark.sql._

Read via JDBCDataSet

//Initiate DataSet
val dataSet: DataSet = DataSet(sparkSession)
//options "can" be used to specify extra parameters to read data from teradata
 val options = Map(("charset","ASCII"),("spark.jdbc.p.file", "hdfs:///user/$USER/password/teradata/pass.dat"))
//read API
val recsDF = dataSet.read("pcatalog.flights_lookup_carrier_code_td",options)
// Do some usecase
recsDF.show()

Read via JDBCDataSet using FASTEXPORT

val dataSet: DataSet = DataSet(sparkSession)
//options "can" be used to specify extra parameters to read data from teradata
// To read with FASTEXPORT, set "TYPE" as "FASTEXPORT" and "SESSIONS" to set the number of sessions required.
 val options = Map(("gimel.jdbc.read.type","FASTEXPORT"),("SESSIONS","8"),("charset","ASCII"),("spark.jdbc.p.file", "hdfs:///user/$USER/password/teradata/pass.dat"))
//read API
val recsDF = dataSet.read("pcatalog.flights_lookup_carrier_code_td",options)
// Do some usecase
recsDF.show()

Write API Teradata

Options for WRITE API

WRITE without FASTLOAD
Option Default Values to specify
spark.jdbc.username spark.user Username
spark.jdbc.p.file hdfs:///user/$USER/password/teradata/pass.dat An HDFS file path containing password
charset ASCII ASCII,UTF8
WRITE with FASTLOAD
Option Default Values to specify
spark.jdbc.username spark.user Username
spark.jdbc.p.file hdfs:///user/$USER/password/teradata/pass.dat An HDFS file path containing password
gimel.jdbc.write.type - FASTLOAD
charset ASCII ASCII,UTF8
SESSIONS 5 Required number of sessions

Common Imports in JDBC WRITE API usages below

import com.paypal.gimel._
import org.apache.spark._
import org.apache.spark.sql._

Write into Teradata via JDBCDataSet with BatchMode

// Prepare Test Data
def stringed(n: Int) = s"""{"code": "${n}", "description": "MAC-${n}"}"""
val texts: Seq[String] = (1 to 100).map { x => stringed(x) }.toSeq
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(texts)
val df: DataFrame = sparkSession.read.json(rdd)
//Initiate DataSet
val dataset = com.paypal.gimel.DataSet(sparkSession)
//DataSet Name
val datasetName = "pcatalog.user_test"
///options "can" be used to specify writing mode, deafult TYPE is batchmode. Set "BATCHSIZE" as required.
val options2 = Map(("charset","ASCII"), ("BATCHSIZE","100000"),("SESSIONS","8"),("spark.jdbc.p.file", "hdfs:///user/$$USER/password/teradata/pass.dat"))
//write some data
dataset.write(datasetName,df,options)

Write into Teradata via JDBCDataSet with FASTLOAD

// Prepare Test Data
def stringed(n: Int) = s"""{"code": "${n}", "description": "MAC-${n}"}"""
val texts: Seq[String] = (1 to 100).map { x => stringed(x) }.toSeq
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(texts)
val df: DataFrame = sparkSession.read.json(rdd)
//Initiate DataSet
val dataset = com.paypal.gimel.DataSet(sparkSession)
//DataSet Name
val datasetName = "pcatalog.user_test"
///options "can" be used to specify writing mode, set "TYPE" as "FASTLOAD" and "SESSIONS" to set the number of sessions required.
val options2 = Map(("teradata.write.type", "FASTLOAD"),("charset","ASCII"),("SESSIONS","8"),("spark.jdbc.p.file", "hdfs:///user/$USER/password/teradata/pass.dat"))
//write some data
dataset.write(datasetName,df,options)

Teradata Update API

Teradata Update API works using write API. All JDBC options for the write are supported in update API as well. Teradata update API adds one more option to update the table: jdbc.insertStrategy Requirement: Teradata table to be updated should have primary keys.

Option Default Values to specify
gimel.jdbc.insertStrategy insert insert,update,FullLoad,append

Teradata Update table [Update] : jdbc.insertStrategy=update

Gimel Teradata write API updates the table based on primary keys in the table. Setting this option, API updates the teradata table where primary key matches the corresponding column in given dataframe.

// set the update option: jdbc.insertStrategy=update
val options = Map(("spark.jdbc.username","$$USER"),("spark.jdbc.p.file","hdfs:///user/$$USER/password/teradata/pass.dat"),("gimel.jdbc.insertStrategy","update"));
// update Teradata table
val wDF = dset.write("pcatalog.emp_table_write",inputDataFrame,options);

Teradata Update table [Truncate-Insert] : jdbc.insertStrategy=FullLoad

Gimel Teradata write API updates the table based on primary keys in the table. Setting this option, API truncates the teradata target table first & then inserts the source dataframe into target table.

// set the update option: jdbc.insertStrategy=FullLoad
val options = Map(("spark.jdbc.username","$USER"),("spark.jdbc.p.file","hdfs:///user/$USER/password/teradata/pass.dat"),("gimel.jdbc.insertStrategy","FullLoad"));
// update Teradata table
val wDF = dset.write("pcatalog.emp_table_write",inputDataFrame,options);

Teradata Update table [Update-Insert] : jdbc.insertStrategy=append

Gimel Teradata write API updates the table based on primary keys in the table. Setting this option, API updates the target teradata table wherever primary key of target table matches corresponding column value in source dataframe. If key is not found in target table, it inserts the corresponding row into target teradata table.

// set the update option: jdbc.insertStrategy=update
val options = Map(("spark.jdbc.username","$USER"),("spark.jdbc.p.file","hdfs:///user/$USER/password/teradata/pass.dat"),("gimel.jdbc.insertStrategy","update"));
// update Teradata table
val wDF = dset.write("pcatalog.emp_table_write",inputDataFrame,options);

Teradata-To-Any-Storage | JDBC Query PushDown

ENTIRE TERADATA QUERY WILL BE PUSHED TO TD SERVER, ONLY RESULTS WILL BE RETURNED TO SPARK APP

Sample Steps

// Credentials
sqlContext.setConf("spark.jdbc.username","$USER")
sqlContext.setConf("spark.jdbc.p.file","hdfs:///user/$USER/password/teradata/pass.dat")
// Set this function if your entire select clause is on just one Teradata System
sqlContext.setConf("gimel.jdbc.enableQueryPushdown","true")
// Execute your Query - Entire Query
 com.paypal.gimel.sql.GimeQueryProcessor.executeBatch(
   """
   |insert into pcatalog.gimel_testing_td_to_hive_pxk
   | ANY_LARGE_COMPLEX)SELECT_QUERY_THAT_IS_TERADATA_SQL_COMPLIANT
   |""".strpiMargin ,
   sparkSession);
 // Turn off PushDown and continue with rest of activties
 sqlContext.setConf("gimel.jdbc.enableQueryPushdown","false")