S3cmd Setup

  • This CLI is used to access S3 from command line.
  • Download s3cmd binary from https://s3tools.org/s3cmd.
  • Configure it by creating .s3cfg file in your home folder.
    Edit Access ID, Secret Key and S3 End point in the configuration file.

[default] access_key = [ACCESS-ID] access_token = add_encoding_exts = add_headers = bucket_location = region ca_certs_file = cache_file = check_ssl_certificate = True check_ssl_hostname = True cloudfront_host = cloudfront.amazonaws.com content_disposition = content_type = default_mime_type = binary/octet-stream delay_updates = False delete_after = False delete_after_fetch = False delete_removed = False dry_run = False enable_multipart = True encrypt = False expiry_date = expiry_days = expiry_prefix = follow_symlinks = False force = False get_continue = False gpg_command = None gpg_decrypt = %(gpg_command)s -d --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_encrypt = %(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_passphrase = noencryption guess_mime_type = True host_base = [S3 End Point] host_bucket = %(bucket).s3-region.example.com human_readable_sizes = False invalidate_default_index_on_cf = False invalidate_default_index_root_on_cf = True invalidate_on_cf = False kms_key = limit = -1 limitrate = 0 list_md5 = False log_target_prefix = long_listing = False max_delete = -1 mime_type = multipart_chunk_size_mb = 10 multipart_max_chunks = 10000 preserve_attrs = True progress_meter = True proxy_host = proxy_port = 0 put_continue = False recursive = False recv_chunk = 65536 reduced_redundancy = False requester_pays = False restore_days = 1 restore_priority = Standard secret_key = [SECRET-KEY] send_chunk = 65536 server_side_encryption = False signature_v2 = False signurl_use_https = False simpledb_host = sdb.amazonaws.com skip_existing = False socket_timeout = 300 stats = False stop_on_error = False storage_class = throttle_max = 100 upload_id = urlencoding_mode = normal use_http_expect = False use_https = False use_mime_magic = True verbosity = WARNING website_endpoint = http://%(bucket)s.s3-website-%(location)s.amazonaws.com/ website_error = website_index = index.html

  • Test the connection

./s3cmd info s3://gimeltestbucket

  • List the file on S3 bucket

./s3cmd ls --recursive s3://gimeltestbucket

  • Put file on S3 bucket

./s3cmd put test s3://gimeltestbucket/test

  • Download file from S3 bucket

./s3cmd get s3://gimeltestbucket/test


S3 API

Overview

  • This API will enable read, write objects into/from S3 storage

Design Considerations

Spark S3 connector

  • https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html.
  • Spark S3 uses aws-java-sdk and hadoop-aws libraries for constructing dataframes by downloading files from S3 buckets and writing dataframe to S3 buckets.
  • Gimel connector is using these libraries as dependencies.

Create Hive Table Catalog

Create Hive DDLs when using HIVE as gimel.catalog.provider.
The following hive table points to S3 bucket - gimeltestbucket for reading/writing files to/from an object location

CSV file

  CREATE EXTERNAL TABLE `udc.test_s3_airports_csv`(
    `data` string COMMENT 'from deserializer')
  LOCATION
    'hdfs://hadoopcluster/tmp/udc/test_s3_airports_csv'
  TBLPROPERTIES (
    'gimel.s3.object.format'='csv',
    'gimel.s3.object.location'='s3a://gimeltestbucket/flights/airports.csv',
    'gimel.s3.file.header'='true',
    'gimel.s3.file.inferSchema'='true',
    'gimel.storage.type'='S3');

Parquet file

  CREATE EXTERNAL TABLE `udc.test_s3_parquet`(
    `data` string COMMENT 'from deserializer')
  LOCATION
    'hdfs://hadoopcluster/tmp/udc/test_s3_parquet'
  TBLPROPERTIES (
    'gimel.s3.object.format'='parquet',
    'gimel.s3.object.location'='s3a://gimeltestbucket/userdata.parquet',
    'gimel.storage.type'='S3');

Json file

  CREATE EXTERNAL TABLE `udc.test_s3_json`(
    `data` string COMMENT 'from deserializer')
  LOCATION
    'hdfs://hadoopcluster/tmp/udc/test_s3_json'
  TBLPROPERTIES (
    'gimel.s3.object.format'='json',
    'gimel.s3.object.location'='s3a://gimeltestbucket/test.json',
    'gimel.storage.type'='S3');

Create JSON Dataset Properties

Create DatasetProperties json when using USER as gimel.catalog.provider.

val dataSetProperties_s3 = s"""
{
      "datasetType": "S3",
      "fields": [],
      "partitionFields": [],
      "props": {
            "gimel.storage.type":"S3",
            "datasetName":"udc.sample_s3_dataset",
            "fs.s3a.path.style.access":"true",
            "fs.s3a.endpoint":"s3-region.example.com",
            "gimel.s3.credentials.strategy": "file",
            "gimel.s3.credentials.file.path": "/path/s3_credentials",
            "gimel.s3.credentials.file.source": "local",
            "gimel.s3.file.header": "true",
            "gimel.s3.file.inferSchema": "true"

         }
 }"""

Supported File Types

  • CSV
  • JSON
  • PARQUET
  • TXT

Catalog Properties

Property Mandatory? Description Example Default
fs.s3a.path.style.access N Path style enabled in S3? true/false true
fs.s3a.endpoint Y S3 end point s3-us.example.com:80
fs.s3a.access.key Y Access ID (This is a Must if gimel.s3.credentials.strategy=user)
fs.s3a.secret.key Y Secret Key (This is a Must if gimel.s3.credentials.strategy=user)
gimel.s3.file.header N File Header for CSV file true/false false
gimel.s3.file.inferSchema N Infer Schema from header? true/false false
gimel.s3.object.location Y Object Location on S3 s3a://gimeltestbucket/flights/airports.csv
gimel.s3.credentials.strategy Y Credentials strategy file/user/credentialLess file
gimel.s3.credentials.file.source Y This is a Must if gimel.s3.credentials.strategy=file local/hdfs
gimel.s3.credentials.file.path Y This is a Must if gimel.s3.credentials.strategy=file /path/xxxx/s3_credentials
gimel.s3.save.mode N Write mode for S3 (https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes) append/overwrite/error/ignore error
--------------------------------------------------------------------------------------------------------------------

Password Options

  • To access an S3 bucket, Access ID and Secret Key are required
  • These credentials can be given using either local file sytem or HDFS file system
  • For local file system, we need to put the credentials in a file and mention the file as below options
  • This will be useful in default or yarn client mode

    "gimel.s3.credentials.strategy" -> "file" "gimel.s3.credentials.file.source" -> "local" "gimel.s3.credentials.file.path" -> "/path/xxxx/mycredentials.txt"

  • for hdfs file system, we need to put the password in a file and mention the file as below options.

  • This will be useful in yarn cluster mode

    "gimel.s3.credentials.strategy" -> "file" "gimel.s3.credentials.file.source" -> "hdfs" "gimel.s3.credentials.file.path" -> "hdfs://cluster/xxxx/mycredentials.txt"

  • The credentials file must not be accessible by any other user (Set permission to 700)

  • Put the Access ID and Secret Key in credentials file in the following format ACCESS-ID SECRET-KEY

S3 GIMEL Read API for CSV

  • The following example says how to read data from S3 by giving the credentials using local file system as source

Common Imports and initializations

val dataSet = new com.paypal.gimel.DataSet(spark)
val options = Map("gimel.s3.object.format" -> "csv", 
                   "gimel.s3.credentials.strategy" -> "file",
                   "gimel.s3.credentials.file.source" -> "local",
                   "gimel.s3.credentials.file.path" -> "/path/xxxx/s3_credentials",
                   "gimel.s3.file.header" -> "true",
                   "gimel.s3.file.inferSchema" -> "true")

val dfRead = dataSet.read("udc.S3.Test.gimeltestbucket.flights_airports_csv", options)
dfRead.show


S3 GIMEL Write API CSV

  • The following example says how to write data to S3 by giving the credentials using HDFS file as source
val options = Map("gimel.s3.object.format" -> "csv", 
                   "gimel.s3.credentials.strategy" -> "file",
                   "gimel.s3.credentials.file.source" -> "hdfs",
                   "gimel.s3.credentials.file.path" -> "hdfs://hadoopcluster/user/xxxx/s3_credentials",
                   "gimel.s3.file.header" -> "true",
                   "gimel.s3.file.inferSchema" -> "true")

val dfWrite = dataSet.write("udc.S3.Test.gimeltestbucket.test_kafka_to_s3", options)

S3 GIMEL GSQL

Common Imports and initializations

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._

// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)

//Set UDC parameters for testing
gsql("set rest.service.method=https")
gsql("set rest.service.host=udc-rest-api-host")
gsql("set rest.service.port=443")

gsql("set gimel.catalog.provider=UDC")
gsql("set gimel.logging.level=CONSOLE")

CSV file Read

gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.flights_airports_csv")

JSON file Read

gsql("set gimel.s3.object.format=json")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_json")

CSV file Read with different delimeter

gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.file.delimiter=|")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_delimiter_csv")

Parquet file Read

gsql("set gimel.s3.object.format=parquet")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.userdata_parquet")

Kafka to S3

gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
gsql("set gimel.kafka.throttle.batch.fetchRowsOnFirstRun=1000")
gsql("set gimel.s3.save.mode=overwrite")

val df = gsql("insert into udc.S3.Test.gimeltestbucket.flights_airports_csv 
                select * from udc.Kafka.Gimel_Dev.test.flights_airports")

Limitations

  • Presently we dont support Binary file type.