Elastic Search API

Note


The following hive table points to an elastic search Index

CREATE EXTERNAL TABLE IF NOT EXISTS pcatalog.sampleESTable
(
  `data` string COMMENT 'from deserializer'
)
LOCATION '/tmp/pcatalog/sampleESTable'
TBLPROPERTIES (
  'gimel.storage.type' = 'ELASTICSEARCH',
  'es.index.auto.create'='true',
  'es.nodes'='http://es_node',
  'es.port'='8080',
  'es.resource'='sampleESTable_index/data'
)

Catalog Properties

Property Mandatory? Description Example Default
es.nodes Y ES host Elastic search host http://localhost
es.port Y ES Port the ES service port 8080
es.resource Y the index and type sampleESTable_index/data

Common Imports in all ES API Usages

import com.paypal.gimel._

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

import spray.json._
import spray.json.DefaultJsonProtocol._

Write Api ES

Prepare Test Data for Write ES

def stringed(n: Int) = s"""{"id": ${n},"name": "MAC-${n}", "address": "MAC-${n+1}", "age": "${n+1}", "company": "MAC-${n}", "designation": "MAC-${n}", "salary": "${n * 10000}" }"""
val numberOfRows=10000
val texts: Seq[String] = (1 to numberOfRows).map { x => stringed(x) }.toSeq
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(texts)
val dataFrameToWrite: DataFrame = sparkSession.read.json(rdd)
dataFrameToWrite.show
val dataSet: DataSet = DataSet(sparkSession)
val dataFrameWritten = dataSet.write("pcatalog.sampleESTable",dataFrameToWrite)

Write Data Api Elastic Search for Partitioned Index

val dataSet = DataSet(sc)
val options: Map[String, String] = Map("gimel.es.index.partition.suffix"->"20170602")
val json3 = s"""{"name" : "abcd", "age" : "28","gender":"m"}"""
val json4 = s"""{"name" : "efgh", "age" : "28","gender":"m"}"""
val rdd1 = sc.parallelize(Seq(json3,json4))
val df1 = sparkSession.read.json(rdd1)
val res = dataSet.write("pcatalog.sampleESTable",df1,options)

Read Api ES

Read Data from ES

val dataFrameRead = dataSet.read("pcatalog.sampleESTable")
dataFrameRead.show

Read Data from ES for Multiple Indexes

val options: Map[String, String] = Map("gimel.es.index.partition.isEnabled"->"true","gimel.es.index.partition.delimiter"->"_","gimel.es.index.partition.suffix"->"20170602,20170603")
val res1 = dataSet.read("pcatalog.sampleESTable",options)
res1.show

Read Data from ES using wildcard

val options: Map[String, String] = Map("gimel.es.index.partition.isEnabled"->"true","gimel.es.index.partition.delimiter"->"_","gimel.es.index.partition.suffix"->"*","gimel.es.index.read.all.partitions.isEnabled"->"true")
val res1 = dataSet.read("pcatalog.sampleESTable",options)
res1.show