G-SQL
Load Flights Data into Kafka Dataset
gsql("insert into pcatalog.flights_kafka_json select * from pcatalog.flights_hdfs")
Cache Flights
gsql("cache table flights select * from pcatalog.flights_kafka_json")
Read Data from Kafka
gsql("select * from flights").show(10)
Scala API for Catalog Provider-USER
Please execute the steps in this section if you have choosen CatalogProvider as USER or if you executed the following command
gsql("set gimel.catalog.provider=USER")
Set options
val datasetKafkaPropsJson = """{
"datasetType": "KAFKA",
"fields": [],
"partitionFields": [],
"props": {
"gimel.storage.type":"kafka",
"gimel.kafka.message.value.type":"json",
"gimel.kafka.whitelist.topics":"gimel.demo.flights.json",
"bootstrap.servers":"kafka:9092",
"gimel.kafka.checkpoint.zookeeper.host":"zookeeper:2181",
"gimel.kafka.checkpoint.zookeeper.path":"/pcatalog/kafka_consumer/checkpoint/flights",
"gimel.kafka.zookeeper.connection.timeout.ms":"10000",
"gimel.kafka.throttle.batch.maxRecordsPerPartition":"10000000",
"gimel.kafka.throttle.batch.fetchRowsOnFirstRun":"10000000",
"auto.offset.reset":"earliest",
"key.serializer":"org.apache.kafka.common.serialization.StringSerializer",
"value.serializer":"org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer":"org.apache.kafka.common.serialization.StringDeserializer",
"datasetName":"pcatalog.flights_kafka_json"
}
}"""
val datasetHivePropsJson = """{
"datasetType": "HDFS",
"fields": [],
"partitionFields": [],
"props": {
"gimel.hdfs.data.format":"csv",
"location":"hdfs://namenode:8020/flights/data",
"datasetName":"pcatalog.flights_hdfs"
}
}"""
val hiveOptions = Map("pcatalog.flights_hdfs.dataSetProperties"->datasetHivePropsJson)
val kafkaOptions = Map("pcatalog.flights_kafka_json.dataSetProperties"->datasetKafkaPropsJson)
Load Flights Data into Kafka Dataset
import com.paypal.gimel._
val dataSet = DataSet(spark)
val hivedf = dataSet.read("pcatalog.flights_hdfs",hiveOptions)
val df = dataSet.write("pcatalog.flights_kafka_json",hivedf,kafkaOptions)
df.count
Read Data from Kafka
import com.paypal.gimel._
val dataSet = DataSet(spark)
val df = dataSet.read("pcatalog.flights_kafka_json",kafkaOptions)
df.show(10)
Scala API for Catalog Provider-HIVE
Please execute the steps in this section if you have choosen CatalogProvider as HIVE or if you executed the following command
gsql("set gimel.catalog.provider=HIVE")
Load Flights Data into Kafka Dataset
import com.paypal.gimel._
val dataSet = DataSet(spark)
val hivedf = dataSet.read("pcatalog.flights_hdfs")
val df = dataSet.write("pcatalog.flights_kafka_json",hivedf)
df.count
Read Data from Kafka
import com.paypal.gimel._
val dataSet = DataSet(spark)
val df = dataSet.read("pcatalog.flights_kafka_json")
df.show(10)