Fields Bind to Feature

  • This feature allows to select additional/missing columns in input dataset while reading data using gimel Data API/GSQL
  • This is mainly used for querying datasets which have dynamic schemas like Kafka/HBase.
  • If a field is present in input dataframe it will take that value, otherwise the default value will be assigned.
  • Each field is casted to the data type specified in json or in DataSetProperties of a dataset.
  • The fields to bind to can be passed in 2 ways:
  • Json of array of fields with their type and default value
  • Dataset name: It will bind to the fields possessed by that dataset

Here is a sample execution

Fields passed as Json


// Common Imports
import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
gsql("set gimel.logging.level=CONSOLE")

// c1 column is not present in input data so default value gets assigned
val fieldsBindToString=s"""[{"fieldName":"name","fieldType":"string","defaultValue":"null"},{"fieldName":"address","fieldType":"string","defaultValue":"null"}, {"fieldName":"company","fieldType":"string","defaultValue":""}, {"fieldName":"designation","fieldType":"string","defaultValue":""}, {"fieldName":"c1","fieldType":"int","defaultValue":"4"} ]"""

gsql(s"""set gimel.fields.bind.to.json=$fieldsBindToString""")

val df = gsql("select * from udc.Kafka.Gimel_dev.default.test_topic")

df.show(1)
+--------+----------+-------------+--------------------+---+
|    name|   address|   company|         designation|    c1|
+--------+----------+-------------+--------------------+---+
|    John|  Kery St.|       ABC|             Manager|     4|
+--------+----------+-------------+--------------------+---+

df.printSchema
root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- company: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- c1: integer (nullable = true)


Fields taken from dataset

Create test hive dataset for which columns c1 and c2 are not present in input kafka topic

CREATE EXTERNAL TABLE `default.test_dataset_fields_bind_to`(
 `name` string,
 `address` string,
 `company` string,
 `designation` string,
 `c1` int
 )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION
'hdfs://cluster/user/bind_to_test';

Bind to the fields in above dataset so that even if c1 are not present in input kafka topic, it will not fail while writing data to target hive table


// Common Imports
import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
gsql("set gimel.logging.level=CONSOLE")

gsql(s"""set gimel.fields.bind.to.dataset=default.test_dataset_fields_bind_to""")

val df = gsql("select * from udc.Kafka.Gimel_dev.default.test_topic")

// c1 is not present in input kafka dataset so default value is assigned
df.show(1)
+--------+----------+-------------+--------------------+---+
|    name|   address|   company|         designation|    c1|
+--------+----------+-------------+--------------------+---+
|    John|  Kery St.|       ABC|             Manager|     4|
+--------+----------+-------------+--------------------+---+


df.printSchema
root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- company: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- c1: integer (nullable = true)

// Write data to above target hive dataset
// Note: The fields should be selected in the order of target dataset as missing fields get added to the end
val sql = s"""insert into default.test_dataset_fields_bind_to 
select `name`,
`address`,
`company`,
`designation`,
`c1`
from udc.Kafka.Gimel_dev.default.test_topic"""

gsql(sql)


Bind to fields when source is empty and has dynamic schema

Example Use case: Handle empty kafka topic consisting of json messages

Empty dataframe without gimel.fields.bind.to.dataset property

spark.sql("set gimel.kafka.throttle.batch.fetchRowsOnFirstRun=0")

gsql(s"""set gimel.fields.bind.to.json=""")

val df = gsql("select * from udc.Kafka.Gimel_dev.default.test_topic")

df.show
++
||
++
++

Dataframe with schema with gimel.fields.bind.to.dataset property


// Common Imports
import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
gsql("set gimel.logging.level=CONSOLE")

// This will fetch no message from kafka
spark.sql("set gimel.kafka.throttle.batch.fetchRowsOnFirstRun=0")

// if this propety is not set, it will return an empty data frame with no schema
gsql(s"""set gimel.fields.bind.to.dataset=default.test_dataset_fields_bind_to""")

val df = gsql("select * from udc.Kafka.Gimel_dev.default.test_topic")

df.show
+--------+----------+-------------+--------------------+---+
|    name|   address|   company|         designation|    c1|
+--------+----------+-------------+--------------------+---+
+--------+----------+-------------+--------------------+---+