KAFKA 2 API

  • It supports Structured Streaming API for consuming/producing messages to/from kafka.
  • This API is backward compatible with Kafka 1 connector.

Note

Format Support Differentiating Property
AVRO Supports READ + WRITE 'gimel.kafka.avro.schema.string'=''
STRING Supports READ + WRITE 'gimel.kafka.message.value.type'='string'
JSON Supports READ + WRITE 'gimel.kafka.message.value.type'='json'
BINARY Supports READ + WRITE 'gimel.kafka.message.value.type'='binary'

Limitations

  • Supports Datastream write only for Elastic Search and Kafka
  • Supports Datastream read only for Kafka

Batch & Streaming Features

Checkpointing

  • Save The CheckPoint States in Supplied or a Default ZooKeeper.
  • Clear the CheckPoint and start over from the beginning via function calls.

Throttling

  • Advanced parameters to consume data from Kafka via Parallelism (Batch only)
  • Horizontal Scaling for tasks/executors
  • Control Number of messages per executor (Batch only)
  • Ability to Control the Number of rows/messages to fetch from kafka in each run. (Batch only)

Batch & Streaming SQL on Kafka with checkpointing & throttling capabilities

  • Express your entire logic in SQL that can combine various datasets from storages such as Hive, HDFS, ES, Kafka
  • Explore Data by "select statements"
  • Insert Data into Targets by "insert statements" --> On Successful Insert : Consumer's Kafka States are Saved Implicitly in CheckPoint Nodes in Zookeeper

Gimel Serde

  • Supports loading external deserializer and serializer class on runtime to add custom logic for processing the messages in kafka.
  • Generic serializers/deserializers like avro, string and json are provided in the module gimel-serde

Catalog Properties

Common Props

Property Mandatory? Description Example Default
bootstrap.servers Y the broker list for kafka host1:9092,host2:9092
gimel.kafka.whitelist.topics Y comma separated list of the topic names in kafka flights,flights_demo
zookeeper.connection.timeout.ms Y ZooKeeper Time out Millisec 10000
gimel.kafka.checkpoint.zookeeper.host Y Zookeeper host where the kafka offsets will be checkpointed for each application localhost:2181
gimel.kafka.checkpoint.zookeeper.path Y The root note in ZK for checkpointing, additional child paths will be appended based on spark.app.name, spark.user, datasetName to get uniqueness of Zk Node /gimel/kafka/checkpoints
gimel.kafka.custom.offset.range N The Custom Offset/Partiton range to getch from a Kafka Topic [{\"topic\": \"flights\",\"offsetRange\": [{\"partition\": 0,\"from\": 0,\"to\": 5}]}]
gimel.kafka.source.fields.list N The list of kafka source fields key,value,offset or all value
gimel.kafka.api.version N Kafka API Version 1 or 2 2

Batch Props

Property Mandatory? Description Example Default
gimel.kafka.throttle.batch.fetchRowsOnFirstRun N The number of last N messages to fetch, if consumer is pulling data first time ever. 25000000 25000000
gimel.kafka.throttle.batch.maxRecordsPerPartition N Total Number of Records that will be limited per partition in the Kafka Topic 10000000 10000000
gimel.kafka.throttle.batch.parallelsPerPartition N
This is a very advanced option to Parallelize the number of connections per Partition.
It is best left defaulted. The configuration was introduced during troubleshooting & Performance optimization.
250 250
gimel.kafka.throttle.batch.minRowsPerParallel N
This is to ensure we do not over subscribe to parallelism & cause very few records to be processed per executor.
For instance : providing 100000 messages will be read at minimum from each executor.
100000 100000

Streaming Props

Property Mandatory? Description Example Default
gimel.kafka.throttle.streaming.isParallel N
Use in Streaming Mode to parallelize the steps where deserialization happens
this feature is recommended if preserving ordering is not a necessity in the sink (like HDFS)
Once messages are fetched from kafka, with this flag turned ON, messages can be repartition across executors to process data in parallel via below listed properties
false true
gimel.kafka.throttle.stream.parallelism.factor N The number of executors / repartitions to create while deserializing. 10 10
gimel.kafka.throttle.streaming.maxOffsetsPerTrigger N Maximum Offsets per trigger None 3600

Kafka 1 props (Backward compatible)

Property Mandatory? Description Example Default
key.serializer Y the kafka key serializer org.apache.kafka.common.serialization.StringSerializer
value.serializer Y the kafka message serializer org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.ByteArraySerializer
key.deserializer Y the kafka key DeSerializer org.apache.kafka.common.serialization.StringDeserializer
value.deserializer Y the kafka message DeSerializer org.apache.kafka.common.serialization.StringDeserializer org.apache.kafka.common.serialization.ByteArrayDeserializer
gimel.kafka.avro.schema.source Y
INLINE - indicates avro schema is supplied via gimel.kafka.avro.schema.string
CSR - picks up schema from confluent schema registry based on gimel.kafka.avro.schema.source.url
gimel.kafka.avro.schema.string N This is a Must if gimel.kafka.avro.schema.source=INLINE Refer Examples below for Avro Data
gimel.kafka.avro.schema.source.url N This is a Must if gimel.kafka.avro.schema.source=CSR Refer Examples below
gimel.kafka.avro.schema.source.key N This is a Must if gimel.kafka.avro.schema.source=CSR, the key to lookup confluent schema registry Refer Examples below for Avro Data

Scale Kafka API

  • By default the property gimel.kafka.throttle.batch.minRowsPerParallel is set to 100k, this means each spark partition will process 100k records.
  • If gimel.kafka.throttle.batch.maxRecordsPerPartition is set to more than gimel.kafka.throttle.batch.minRowsPerParallel, it will divide the records in each kafka partition.
  • It removes the dependency upon number of kafka partitions for parallelism in the spark job.

Spark Shell command to test the examples below

spark-shell --jars 
gimel-tools-2.0.0-SNAPSHOT-uber.jar,
generic-deserializers-1.0-SNAPSHOT-uber.jar,
generic-serializers-1.0-SNAPSHOT-uber.jar


Common Imports in all the KAFKA API usages below

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._

Set Catalog Provider and Dataset Properties

Catalog Provider = UDC : Search your dataset on UDC (Unified Data Catalog)

  • Dataset name is used as udc.Kafka.Gimel_Dev.default.user which is in UDC format -> udc.{storageType}.{storageSystem}.{container}.{topicName}
    Where:
    storageType = Kafka
    storageSystem or Cluster name discovered on UDC = Gimel_Dev
    container = default (its database name in case of hive and namespace in case of hbase)
    topicName = topic name with all special characters replaced by '_'
  • You will be able to find following properties on UDC if the dataset is discovered
    gimel.storage.type' = 'KAFKA', gimel.kafka.bootstrap.servers'='localhost:9092' gimel.kafka.whitelist.topics'='user' gimel.kafka.checkpoint.zookeeper.host'='localhost:2181' gimel.kafka.checkpoint.zookeeper.path'='/pcatalog/kafka_consumer/checkpoint' zookeeper.connection.timeout.ms'='10000' auto.offset.reset'='earliest' All other properties can be set or passed at runtime.

Catalog Provider = HIVE : Create Hive Table Pointing to Kafka Topic

Avro Schema can be INLINE

CREATE EXTERNAL TABLE `udc.Kafka_Dev_default_user`(
    payload string
)
LOCATION '/tmp/test/gimel/default_user'
TBLPROPERTIES (
  'gimel.storage.type' = 'KAFKA',
  'gimel.kafka.bootstrap.servers'='localhost:9092',
  'gimel.kafka.whitelist.topics'='user',
  'gimel.kafka.checkpoint.zookeeper.host'='localhost:2181',
  'gimel.kafka.checkpoint.zookeeper.path'='/pcatalog/kafka_consumer/checkpoint',
  'key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
  'value.serializer'='org.apache.kafka.common.serialization.ByteArraySerializer',
  'zookeeper.connection.timeout.ms'='10000',
  'auto.offset.reset'='earliest',
  'gimel.kafka.avro.schema.string'=' {
   "type" : "record",
   "namespace" : "default",
   "name" : "user",
   "fields" : [
      { "name" : "name" , "type" : "string" },
      { "name" : "age" , "type" : "int" },
      { "name" : "rev" , "type" : "long" }
   ]}'
   )

Avro Schema can be fetched from confluent Schema Registry

CREATE EXTERNAL TABLE `udc.Kafka_Dev_default_avro_user`(
    payload string
)
LOCATION '/tmp/test/gimel/default_avro_user'
TBLPROPERTIES (
  'gimel.storage.type' = 'KAFKA',
  'gimel.kafka.bootstrap.servers'='localhost:9092',
  'gimel.kafka.whitelist.topics'='user',
  'gimel.kafka.checkpoint.zookeeper.host'='localhost:2181',
  'gimel.kafka.checkpoint.zookeeper.path'='/pcatalog/kafka_consumer/checkpoint',
  'gimel.kafka.avro.schema.source'='CSR',
  'gimel.kafka.avro.schema.source.url'='http://schemaregistry:8081',
  'gimel.kafka.avro.schema.source.wrapper.key'='user_schema', -- This is the Schema Lookup Key for Scheme Registry
  'key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
  'value.serializer'='org.apache.kafka.common.serialization.ByteArraySerializer',
  'zookeeper.connection.timeout.ms'='10000',
  'auto.offset.reset'='earliest',
   )

Catalog Provider = USER : Provide a json of dataset properties

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._

val dataset = com.paypal.gimel.DataSet(spark)

val dataSetProperties_json ="""
{
    "datasetType": "KAFKA",
    "fields": [],
    "partitionFields": [],
    "props": {
            "gimel.storage.type":"KAFKA",
            "bootstrap.servers":"localhost:9092",
              "gimel.kafka.whitelist.topics":"gimel.test.json",
              "zookeeper.connection.timeout.ms":"10000",
              "gimel.kafka.checkpoint.zookeeper.host":"localhost:2181",
              "gimel.kafka.checkpoint.zookeeper.path":"/pcatalog/kafka_consumer/checkpoint",
              "auto.offset.reset":"earliest",
              "datasetName":"udc.kafka_test_json",
              "gimel.deserializer.class":"com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer",
              "gimel.serializer.class":"com.paypal.gimel.serializers.generic.JsonSerializer"
       }
}
"""

val options = Map("udc.kafka_test_json.dataSetProperties" -> dataSetProperties_json, 
  "gimel.kafka.throttle.batch.fetchRowsOnFirstRun" -> 1000, 
  "gimel.kafka.throttle.batch.parallelsPerPartition" -> 250, 
  "gimel.catalog.provider"->"USER")
val df = dataset.read("udc.kafka_test_json", options)

Note: Here you need to set the property {datasetName}.dataSetProperties to the json value. For example here: dataset name = "udc.kafka_test_json" thats why we are setting "udc.kafka_test_json.dataSetProperties" property to the json property string.


Generic Kafka Integration | Data Published as Avro Serialized Messages

Write to your Topic via KafkaDataSet

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._

// Prepare Test Data
def stringed(n: Int) = s"""{"id": ${n}, "name": "MAC-${n}", "rev": ${n * 10000}}"""
val texts: Seq[String] = (1 to 100).map { x => stringed(x) }.toSeq
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(texts)
val df: DataFrame = sparkSession.read.json(rdd)
//Initiate DataSet
val dataset = com.paypal.gimel.DataSet(sparkSession)
//DataSet Name
val datasetName = "udc.Kafka_Dev_default_user"
//write some data
dataset.write(datasetName, df)

Read your Kafka Topic as bytes via KafkaDataSet

//Initiate DataSet
val dataSet: DataSet = DataSet(sparkSession)
//options "can" be used to pick smaller subset of rows
val options = "gimel.kafka.throttle.batch.fetchRowsOnFirstRun=2500:gimel.kafka.throttle.batch.parallelsPerPartition=250:gimel.kafka.throttle.batch.maxRecordsPerPartition=25000000"
//read API
val recsDF = dataSet.read("udc.Kafka_Dev_default_user",options)
// Get Kafka Operator for CheckPoint Operations
val kafkaOperator = dataSet.latestKafkaDataSetReader.get
// If required, clear checkpoint to begin reading from kafka from beginning
kafkaOperator.clearCheckPoint()
// Do some usecase
recsDF.show()
// Save CheckPoint at the end of each batch
kafkaOperator.saveCheckPoint()

Read avro data from topic with gimelserde deserializer class

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
gsql("set gimel.logging.level=CONSOLE")

// Overwriting gimel serde props
gsql("set gimel.deserializer.avro.schema.source=CSR")
gsql("set gimel.deserializer.avro.schema.subject=user_schema")
gsql("set gimel.deserializer.class=com.paypal.gimel.deserializers.generic.AvroDeserializer")
gsql("set gimel.deserializer.avro.schema.url=http://localhost:8081")
val df = gsql("select * from udc.Kafka.Gimel_Dev.default.user")
df.show(10)

Write to kafka topic in avro format from another topic with json messages

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
gsql("set gimel.logging.level=CONSOLE")

spark.sql("set gimel.kafka.avro.schema.source=INLINE")
val schema=s"""{"type":"record","name":"userData","namespace":"test","doc":"Test User Data","fields":[{"name":"name","type":{"type":"string","avro.java.string":"String"}},{"name":"age","type":{"type":"string","avro.java.string":"String"}},{"name":"rev","type":{"type":"string","avro.java.string":"String"}}]}"""
gsql("set gimel.serializer.class=com.paypal.gimel.serializers.generic.AvroSerializer")
gsql(s"""set gimel.serializer.avro.schema.string=$schema""")
gsql("set gimel.deserializer.class=com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer")
val df = gsql("insert into udc.Kafka.Gimel_Dev.default.gimel_test_avro select * from udc.Kafka.Gimel_Dev.default.user")


Read your Kafka Topic via KafkaDataStream using Structured Streaming

import com.paypal.gimel._

val dataStream = DataStream2(spark)

val options = Map("gimel.kafka.avro.schema.source.key" -> "user_schema", 
  "auto.offset.reset" -> "latest", 
  "gimel.deserializer.avro.schema.source" -> "CSR", 
  "gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.AvroDeserializer")
val streamingResult = dataStream.read(s"udc.Kafka.Gimel_Dev.default.user", options)

// Display the messages on console
val df = streamingResult.df
df.isStreaming
val writer = df.writeStream.outputMode("append").format("console").start

// Stop the Streaming Query
writer.stop

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gstreamsql: (String) => String = com.paypal.gimel.scaas.GimelQueryProcessor.executeStream2(_: String, spark)
spark.sql("set gimel.logging.level=CONSOLE")

spark.sql("set gimel.deserializer.class=com.paypal.gimel.deserializers.generic.AvroDeserializer")
spark.sql("set gimel.deserializer.avro.schema.url=http://localhost:8081")
spark.sql("set gimel.deserializer.avro.schema.source=CSR")
spark.sql("set gimel.deserializer.avro.schema.subject=user_schema")
val sql="select * from udc.Kafka.Gimel_Dev.default.user"
gstreamsql(sql)


Simple Kafka Topic With String | JSON | Binary Messages

Create Hive Table Pointing to Kafka Topic with json


CREATE EXTERNAL TABLE `udc.Kafka_Gimel_Dev_default_gimel_test_json`(
 payload string
)
LOCATION '/tmp/test/gimel'
TBLPROPERTIES (
  'gimel.storage.type' = 'KAFKA',
  'bootstrap.servers'='localhost:9092',
  'gimel.kafka.whitelist.topics'='gimel.test.json',
  'gimel.kafka.checkpoint.zookeeper.host'='localhost:2181',
  'gimel.kafka.checkpoint.zookeeper.path'='/pcatalog/kafka_consumer/checkpoint',
  'zookeeper.connection.timeout.ms'='10000',
  'auto.offset.reset'='latest',
  "gimel.deserializer.class"="com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer",
  "gimel.serializer.class"="com.paypal.gimel.serializers.generic.JsonSerializer"
   );

Note: Here we added external deserializer and serializer class for Json. It will use the serializer class to write to kafka in json format.

Write to your Topic via KafkaDataSet

// Prepare Test Data
def stringed(n: Int) = s"""{"id": ${n}, "name": "MAC-${n}", "rev": ${n * 10000}}"""
val texts: Seq[String] = (1 to 100).map { x => stringed(x) }.toSeq
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(texts)
val df: DataFrame = sparkSession.read.json(rdd)

//Initiate DataSet
val dataset = com.paypal.gimel.DataSet(sparkSession)
//write some data
dataset.write("udc.Kafka_Gimel_Dev_default_gimel_test_json", df)

Read your Kafka Topic via KafkaDataSet

//Initiate DataSet
val dataSet: DataSet = DataSet(sparkSession)
//options "can" be used to pick smaller subset of rows
val options = "gimel.kafka.throttle.batch.fetchRowsOnFirstRun=2500:gimel.kafka.throttle.batch.batch.parallelsPerPartition=250:gimel.kafka.throttle.batch.maxRecordsPerPartition=25000000"
//read API
val recsDF = dataSet.read("udc.Kafka_Gimel_Dev_default_gimel_test_json", options)
// Get Kafka Operator for CheckPoint Operations
val kafkaOperator = dataSet.latestKafkaDataSetReader.get
// If required, clear checkpoint to begin reading from kafka from beginning
kafkaOperator.clearCheckPoint()
// Do some usecase
recsDF.show()
// Save CheckPoint at the end of each batch
kafkaOperator.saveCheckPoint()

Read your Topic via KafkaDataStream

Note: JsonDynamicDeserializer which can discover the fields dynamically is not supported in streaming. So, we need to use JsonStaticDeserializer which accepts the list of fields in a json.

import com.paypal.gimel._

val dataStream = com.paypal.gimel.DataStream2(sparkSession)
val fieldsBindToString=s"""[{"fieldName":"name","fieldType":"string","defaultValue":"null"},{"fieldName":"age","fieldType":"string","defaultValue":"null"}, {"fieldName":"rev","fieldType":"string","defaultValue":""}]"""
val options = Map("gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.JsonStaticDeserializer", 
  "gimel.fields.bind.to.json" -> fieldsBindToString)
val streamingResult = dataStream.read(s"udc.Kafka.Gimel_Dev.default.user", options)

val df = streamingResult.df
df.isStreaming

// Writing the output to console
val writer = df.writeStream.outputMode("append").format("console").start

// Stop the Streaming Query
writer.stop


Read data from Kafka Custom Offset Ranges

gsql(s"""set gimel.kafka.custom.offset.range=[{"topic": "user","offsetRange": [{"partition": 0,"from": 33223688879,"to": 33223688889}]}]""")
gsql("select * from udc.Kafka.Gimel_Dev.default.user").count()

// Now Unset the property to read normally again
spark.conf.unset("gimel.kafka.custom.offset.range")
gsql("select * from udc.Kafka.Gimel_Dev.default.user").count()

// Make sure your offset range is the same as your subscribed topics in kafka.whitelist.topics or else you get an exception
gsql(s"""set gimel.kafka.custom.offset.range=[{"topic": "flights","offsetRange": [{"partition": 0,"from": 33223688879,"to": 33223688889}]}]""")
gsql("select * from udc.Kafka.Gimel_Dev.default.user").count()
// Gives -> java.lang.Exception: The topic specified in custom offset range does not match the subscribed topic! Please unset the previous value or check your properties

Load Kafka properties with custom kafka options loader

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
gsql("set gimel.logging.level=CONSOLE")

//Setting UDC rest api properties
gsql("set rest.service.method=https")
gsql("set rest.service.host=udc-rest-api-host")
gsql("set rest.service.port=443")

// Setting kafka properties
gsql("set gimel.kafka.throttle.batch.fetchRowsOnFirstRun=1")
gsql("set gimel.deserializer.class=com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer")
gsql("set gimel.kafka.options.loader=com.test.CustomKafkaOptionsLoader")

// Reading from kafka dataset
val df = gsql("select * from udc.Kafka.Gimel_Dev.default.user")

Create your custom Kafka Options Loader

Users can implement their own logic for getting the kafka properties which will be pushed down to Kafka Consumer/Producer inside Kafka read/write API and plug it with Gimel Data API at runtime by following steps: * Add gimel-common dependency in your project xml <dependency> <groupId>com.paypal.gimel</groupId> <artifactId>gimel-common</artifactId> <version>2.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> * Implement com.paypal.gimel.common.conf.KafkaOptionsLoader interface. * Add your logic to the method:
scala def loadKafkaOptions(config : Map[String, String]): Map[String, Map[String, String]] Input to this method will be all dataset properties which are passed through any one of the catalog provider mentioned above (UDC/HIVE/USER). * Add your jar in the spark job along with gimel jar. * Pass the custom kafka options loader class via property "gimel.kafka.options.loader"