What is ExecSQLWrapper

  • The Tool Allows Developer to Write a SQL in a .sql file.
  • Run the SQL via copyDataSet tool, which inturn calls a spark application
  • Copy Data Set runs in several mode
Mode Logic Notes
batch runs batch pull from kafka once, writes data to HDFS
Schedule jobs in your scheduler (say UC4) for "not less than" every 30 mins (minimum latency - to avoid any small files problem)
This is best fit for HDFS, where you have a downstream dependent job
stream runs a never ending job (streaming) - NOT recommmended for HDFS Sink - creates Small Files
batch_iterative runs a never ending batch daemon
use "batchRecursionMinutes=30" (minimum latency - to avoid any small files problem)
This is best fit for HDFS where you have a "ASYNC" downstream consumer, that can detect delta partitions

Here is a sample execution (2.2.0)


#cat copy_dataset_time_partitioning.sql
#insert into GIMEL.SQL.PARAM.TARGET.DB.GIMEL.SQL.PARAM.TARGET
# select kafka_ds.*
#,substr(fl_date,1,4) as yyyy
#,substr(fl_date,6,2) as mm
#,substr(fl_date,9,2) as dd
#from GIMEL.SQL.PARAM.SOURCE.DB.GIMEL.SQL.PARAM.SOURCE kafka_ds

# Copy SQL file to HDFS
hadoop fs -copyFromLocal -f copy_dataset_time_partitioning.sql hdfs:///tmp/copy_dataset_time_partitioning.sql

# Env Variables
export hdfsjar=hdfs:///tmp/gimel_new.tools.jar

# Set source target sql_file
export SOURCE=flights_kafka
export TARGET=flights_elastic
export QUERY_SOURCE_FILE=hdfs:///tmp/copy_dataset_time_partitioning.sql

# Set all other Args
args="gimel.sql.param.source=${SOURCE} \
      gimel.sql.param.target=${TARGET} \
      gimel.sql.param.source.db=PCATALOG \
      gimel.sql.param.target.db=DEFAULT \
      querySourceFile=${QUERY_SOURCE_FILE} \
      mode=batch \
      isBatchRecursionInfinite=true \
      batchRecursionCount=2 \
      batchRecursionMinutes=30 \
      partition_column=timestamp \
      throttle.batch.fetchRowsOnFirstRun=100 \
      gimel.logging.level=CONSOLE \
      gimel.kafka.reader.checkpoint.clear=false \
      gimel.kafka.reader.checkpoint.save=true \
      gimel.query.results.show.rows.only=true \
      gimel.kafka.throttle.batch.maxRecordsPerPartition=100 \
      gimel.kafka.throttle.batch.parallelsPerPartition=2 \
      gimel.kafka.throttle.batch.minRowsPerParallel=1000000 \
      hive.exec.dynamic.partition=true \
      hive.exec.dynamic.partition.mode=nonstrict \
      hive.exec.max.dynamic.partitions.pernode=400
      "


# Start the batch
spark-submit  \
  --master yarn \
   --deploy-mode client \
#  --deploy-mode cluster \
  --conf spark.yarn.principal=$USER \
  --conf spark.yarn.keytab=hdfs:///user/$USER/.keytab/$USER.keytab \
  --conf spark.driver.memory=2g \
  --conf spark.yarn.driver.memoryOverhead=512 \
  --conf spark.executor.memory=2g \
  --conf spark.yarn.executor.memoryOverhead=512 \
  --conf spark.executor.cores=1 \
  --class com.paypal.gimel.tools.ExecSQLWrapper \
  ${hdfsjar} \
  $args

Here is a sample execution (2.x)

Create a Kafka Table

sql CREATE EXTERNAL TABLE `pcatalog`.`flights_kafka`(`payload` string) STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs:///tmp/flights' TBLPROPERTIES ( 'key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', 'numFiles' = '0', 'auto.offset.reset' = 'earliest', 'gimel.kafka.checkpoint.zookeeper.host' = 'zk_host:2181', 'gimel.storage.type' = 'kafka', 'gimel.kafka.throttle.batch.fetchRowsOnFirstRun' = '10000000', 'gimel.kafka.throttle.batch.maxRecordsPerPartition' = '10000000', 'bootstrap.servers' = 'kafka_broker:9092', 'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', 'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer', 'gimel.kafka.checkpoint.zookeeper.path' = '/pcatalog/kafka_consumer/checkpoint/flights', 'gimel.kafka.zookeeper.connection.timeout.ms' = '10000', 'key.serializer' = 'org.apache.kafka.common.serialization.StringSerializer', 'gimel.kafka.message.value.type' = 'json' )

### Create a Elastic Search Target Table sql CREATE EXTERNAL TABLE `pcatalog`.`flights_elastic`(`payload` string) LOCATION 'hdfs:///tmp/elastic_smoke_test' TBLPROPERTIES ( 'numFiles' = '0', 'gimel.storage.type' = 'ELASTIC_SEARCH', 'es.index.auto.create' = 'true', 'es.mapping.date.rich' = 'true', 'es.port' = '9200', 'es.resource' = 'flights/data', 'es.nodes' = 'http://es_host' )