Data API Usage

Quick Starter for DataSet and DataStream APIs. Please refer individual storage system documentation for details.


import com.paypal.gimel._
import org.apache.spark.sql._
import scala.collection.immutable.Map

// Initiate DataSet
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
val dataSet = DataSet(sparkSession)

// Read Data
val readOptions = Map[String,Any]()
val data1 : DataFrame = dataSet.read("pcatalog.table1",readOptions)
val data2 : DataFrame = dataSet.read("pcatalog.table2")

// Write Data
val writeOptions = Map[String,Any]()
dataSet.write("pcatalog.table3",data1,writeOptions)
dataSet.write("pcatalog.table4",data2)

// Initiate DataStream
val dataStream = DataStream(sparkSession)

// Get Reference to Stream
  val streamingResult: StreamingResult = dataStream.read(datasetName)
  // Clear CheckPoint if necessary
  streamingResult.clearCheckPoint("some message")
  // Helper for Clients
  streamingResult.dStream.foreachRDD { rdd =>
    val count = rdd.count()
    if (count > 0) {
      /**
        * Mandatory | Get Offset for Current Window, so we can checkpoint at the end of this window's operation
        */
      streamingResult.getCurrentCheckPoint(rdd)
      /**
        * Begin | User's Usecases
        */
      // dataSet.write("pcatalog.targetDataSet",derivedDF)
      streamingResult.saveCurrentCheckPoint()
    }
  }
  // Start the Context
  dataStream.streamingContext.start()
  dataStream.streamingContext.awaitTermination()