Analyze Data

Enrich Flights Data, Denormalize, Add Geo Coordinate

val sql = """cache table flights_log_enriched

 to_date(substr(fl_date,1,10)) as flight_date
,lkp_airport_origin.location as origin_location
, as origin_airport_name
, as origin_airport_city
, as origin_airport_country
,lkp_airport_dest.location as dest_location
, as dest_airport_name
, as dest_airport_city
, as dest_airport_country
,lkp_carrier.description as carrier_desc
,lkp_airline.description as airline_desc
,lkp_cancellation.description as cancellation_reason

from flights flights_kafka                                  

left join lkp_carrier lkp_carrier                          
on flights_kafka.unique_carrier = lkp_carrier.code 

left join lkp_airline lkp_airline                          
on flights_kafka.airline_id = lkp_airline.code

left join lkp_cancellation lkp_cancellation                 
on flights_kafka.CANCELLATION_CODE = lkp_cancellation.code

left join lkp_airport lkp_airport_origin                   
on flights_kafka.origin = lkp_airport_origin.iata

left join lkp_airport lkp_airport_dest                    
on flights_kafka.dest = lkp_airport_dest.iata


Create Elastic Search Dataset

gsql("""set pcatalog.gimel_flights_elastic.dataSetProperties=
    "datasetType": "ELASTIC_SEARCH",
    "fields": [],
    "partitionFields": [],
    "props": {
        "":"{\"location\": { \"type\": \"geo_point\" } }",
          "":"{\"executionStartTime\": {\"format\": \"strict_date_optional_time||epoch_millis\", \"type\": \"date\" }, \"createdTime\": {\"format\": \"strict_date_optional_time||epoch_millis\", \"type\": \"date\"},\"endTime\": {\"format\": \"strict_date_optional_time||epoch_millis\", \"type\": \"date\"}}",
val sql = """insert into pcatalog.gimel_flights_elastic
select * from flights_log_enriched
where cancelled = 1"""


Explore, Visualize and Discover Data on Kibana

  • Go to Kibana at http://localhost:5601
  • Create the index pattern for flights index
  • Explore and Visualize your data on Kibana Dashboard