Bootstrap Data

Catalog Provider as USER

gsql("set gimel.catalog.provider=USER");

Bootstrap Flights Data

Create HDFS Dataset for loading Flights Data

gsql("""set pcatalog.flights_hdfs.dataSetProperties={ 
    "datasetType": "HDFS",
    "fields": [],
    "partitionFields": [],
    "props": {
         "gimel.hdfs.data.format":"csv",
         "location":"hdfs://namenode:8020/flights/data",
         "datasetName":"pcatalog.flights_hdfs"
    }
}""")

Create Kafka Dataset

gsql("""set pcatalog.flights_kafka_json.dataSetProperties={ 
    "datasetType": "KAFKA",
    "fields": [],
    "partitionFields": [],
    "props": {
        "gimel.storage.type":"kafka",
        "gimel.kafka.message.value.type":"json",
        "gimel.kafka.whitelist.topics":"gimel.demo.flights.json",
        "bootstrap.servers":"kafka:9092",
        "gimel.kafka.checkpoint.zookeeper.host":"zookeeper:2181",
        "gimel.kafka.checkpoint.zookeeper.path":"/pcatalog/kafka_consumer/checkpoint/flights",
        "gimel.kafka.zookeeper.connection.timeout.ms":"10000",
        "gimel.kafka.throttle.batch.maxRecordsPerPartition":"10000000",
        "gimel.kafka.throttle.batch.fetchRowsOnFirstRun":"10000000",    
        "auto.offset.reset":"earliest",
        "key.serializer":"org.apache.kafka.common.serialization.StringSerializer",
        "value.serializer":"org.apache.kafka.common.serialization.StringSerializer",
        "key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer":"org.apache.kafka.common.serialization.StringDeserializer",
        "datasetName":"pcatalog.flights_kafka_json"
    }
}""")

Load Flights Data into Kafka Dataset

gsql(
"insert into pcatalog.flights_kafka_json select * from pcatalog.flights_hdfs")

Cache Kafka Data

gsql(
"cache table flights select * from  pcatalog.flights_kafka_json")

Bootstrap Flights Lookup Data

Create HDFS Datasets for loading Flights Lookup Data

gsql("""set pcatalog.flights_lookup_carrier_code_hdfs.dataSetProperties={ 
    "datasetType": "HDFS",
    "fields": [],
    "partitionFields": [],
    "props": {
         "gimel.hdfs.data.format":"csv",
         "location":"hdfs://namenode:8020/flights/lkp/carrier_code",
         "datasetName":"pcatalog.flights_lookup_carrier_code_hdfs"
    }
}""")
gsql("""set pcatalog.flights_lookup_airline_id_hdfs.dataSetProperties={ 
    "datasetType": "HDFS",
    "fields": [],
    "partitionFields": [],
    "props": {
         "gimel.hdfs.data.format":"csv",
         "location":"hdfs://namenode:8020/flights/lkp/airline_id",
         "datasetName":"pcatalog.flights_lookup_airline_id_hdfs"
    }
}""")
gsql("""set pcatalog.flights_lookup_cancellation_code_hdfs.dataSetProperties={ 
    "datasetType": "HDFS",
    "fields": [],
    "partitionFields": [],
    "props": {
         "gimel.hdfs.data.format":"csv",
         "location":"hdfs://namenode:8020/flights/lkp/cancellation_code",
         "datasetName":"pcatalog.flights_lookup_cancellation_code_hdfs"
    }
}""")

Create HBase Datasets

gsql("""set pcatalog.flights_lookup_cancellation_code_hbase.dataSetProperties=
{
    "datasetType": "HBASE",
    "fields": [
        {
            "fieldName": "Code",
            "fieldType": "string",
            "isFieldNullable": false
        },
        {
            "fieldName": "Description",
            "fieldType": "string",
            "isFieldNullable": false
        }
    ],
    "partitionFields": [],
    "props": {
        "gimel.hbase.rowkey":"Code",
        "gimel.hbase.table.name":"flights:flights_lookup_cancellation_code",
        "gimel.hbase.namespace.name":"flights",
        "gimel.hbase.columns.mapping":":key,flights:Description",
         "datasetName":"pcatalog.flights_lookup_cancellation_code_hbase"
    }
}
""")
gsql("""set pcatalog.flights_lookup_carrier_code_hbase.dataSetProperties=
{
    "datasetType": "HBASE",
    "fields": [
        {
            "fieldName": "Code",
            "fieldType": "string",
            "isFieldNullable": false
        },
        {
            "fieldName": "Description",
            "fieldType": "string",
            "isFieldNullable": false
        }
    ],
    "partitionFields": [],
    "props": {
        "gimel.hbase.rowkey":"Code",
        "gimel.hbase.table.name":"flights:flights_lookup_carrier_code",
        "gimel.hbase.namespace.name":"flights",
        "gimel.hbase.columns.mapping":":key,flights:Description",
         "datasetName":"pcatalog.flights_lookup_carrier_code_hbase"
    }
}
""")
gsql("""set pcatalog.flights_lookup_airline_id_hbase.dataSetProperties=
{
    "datasetType": "HBASE",
    "fields": [
        {
            "fieldName": "Code",
            "fieldType": "string",
            "isFieldNullable": false
        },
        {
            "fieldName": "Description",
            "fieldType": "string",
            "isFieldNullable": false
        }
    ],
    "partitionFields": [],
    "props": {
        "gimel.hbase.rowkey":"Code",
        "gimel.hbase.table.name":"flights:flights_lookup_airline_id",
        "gimel.hbase.namespace.name":"flights",
        "gimel.hbase.columns.mapping":":key,flights:Description",
         "datasetName":"pcatalog.flights_lookup_airline_id_hbase"
    }
}
""")


Load Flights Lookup Data into HBase Datasets

gsql(
"insert into pcatalog.flights_lookup_cancellation_code_hbase select * from pcatalog.flights_lookup_cancellation_code_hdfs")

gsql(
"insert into pcatalog.flights_lookup_airline_id_hbase select * from pcatalog.flights_lookup_airline_id_hdfs")

gsql(
"insert into pcatalog.flights_lookup_carrier_code_hbase select * from pcatalog.flights_lookup_carrier_code_hdfs")

Cache lookup Tables from HBase

gsql("cache table lkp_carrier select * from pcatalog.flights_lookup_carrier_code_hbase")

gsql("cache table lkp_airline select * from pcatalog.flights_lookup_airline_id_hbase")

gsql("cache table lkp_cancellation select * from pcatalog.flights_lookup_cancellation_code_hbase")


Cache Airports Data from HDFS

val sql="""cache table lkp_airport
select 
struct(lat,lon) as location
,concat(lat,",",lon) as location1
, * 
from 
(
select iata, lat, lon, country, city, name
, row_number() over (partition by iata order by 1 desc ) as rnk
from pcatalog.flights_lookup_airports_hdfs
) tbl
where rnk  = 1
"""

gsql(sql)