- S3cmd Setup
- S3 API
- Overview
- Design Considerations
- Create Hive Table Catalog
- Supported File Types
- Password Options
- S3 GIMEL Read API for CSV
- S3 GIMEL Write API CSV
- S3 GIMEL GSQL
- Limitations
S3cmd Setup
- This CLI is used to access S3 from command line.
- Download s3cmd binary from https://s3tools.org/s3cmd.
- Configure it by creating .s3cfg file in your home folder.
Edit Access ID, Secret Key and S3 End point in the configuration file.
[default]
access_key = [ACCESS-ID]
access_token =
add_encoding_exts =
add_headers =
bucket_location = region
ca_certs_file =
cache_file =
check_ssl_certificate = True
check_ssl_hostname = True
cloudfront_host = cloudfront.amazonaws.com
content_disposition =
content_type =
default_mime_type = binary/octet-stream
delay_updates = False
delete_after = False
delete_after_fetch = False
delete_removed = False
dry_run = False
enable_multipart = True
encrypt = False
expiry_date =
expiry_days =
expiry_prefix =
follow_symlinks = False
force = False
get_continue = False
gpg_command = None
gpg_decrypt = %(gpg_command)s -d --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s
gpg_encrypt = %(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s
gpg_passphrase = noencryption
guess_mime_type = True
host_base = [S3 End Point]
host_bucket = %(bucket).s3-region.example.com
human_readable_sizes = False
invalidate_default_index_on_cf = False
invalidate_default_index_root_on_cf = True
invalidate_on_cf = False
kms_key =
limit = -1
limitrate = 0
list_md5 = False
log_target_prefix =
long_listing = False
max_delete = -1
mime_type =
multipart_chunk_size_mb = 10
multipart_max_chunks = 10000
preserve_attrs = True
progress_meter = True
proxy_host =
proxy_port = 0
put_continue = False
recursive = False
recv_chunk = 65536
reduced_redundancy = False
requester_pays = False
restore_days = 1
restore_priority = Standard
secret_key = [SECRET-KEY]
send_chunk = 65536
server_side_encryption = False
signature_v2 = False
signurl_use_https = False
simpledb_host = sdb.amazonaws.com
skip_existing = False
socket_timeout = 300
stats = False
stop_on_error = False
storage_class =
throttle_max = 100
upload_id =
urlencoding_mode = normal
use_http_expect = False
use_https = False
use_mime_magic = True
verbosity = WARNING
website_endpoint = http://%(bucket)s.s3-website-%(location)s.amazonaws.com/
website_error =
website_index = index.html
- Test the connection
./s3cmd info s3://gimeltestbucket
- List the file on S3 bucket
./s3cmd ls --recursive s3://gimeltestbucket
- Put file on S3 bucket
./s3cmd put test s3://gimeltestbucket/test
- Download file from S3 bucket
./s3cmd get s3://gimeltestbucket/test
S3 API
Overview
- This API will enable read, write objects into/from S3 storage
Design Considerations
Spark S3 connector
- https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html.
- Spark S3 uses aws-java-sdk and hadoop-aws libraries for constructing dataframes by downloading files from S3 buckets and writing dataframe to S3 buckets.
- Gimel connector is using these libraries as dependencies.
Create Hive Table Catalog
Create Hive DDLs when using HIVE as gimel.catalog.provider
.
The following hive table points to S3 bucket - gimeltestbucket for reading/writing files to/from an object location
CSV file
CREATE EXTERNAL TABLE `udc.test_s3_airports_csv`(
`data` string COMMENT 'from deserializer')
LOCATION
'hdfs://hadoopcluster/tmp/udc/test_s3_airports_csv'
TBLPROPERTIES (
'gimel.s3.object.format'='csv',
'gimel.s3.object.location'='s3a://gimeltestbucket/flights/airports.csv',
'gimel.s3.file.header'='true',
'gimel.s3.file.inferSchema'='true',
'gimel.storage.type'='S3');
Parquet file
CREATE EXTERNAL TABLE `udc.test_s3_parquet`(
`data` string COMMENT 'from deserializer')
LOCATION
'hdfs://hadoopcluster/tmp/udc/test_s3_parquet'
TBLPROPERTIES (
'gimel.s3.object.format'='parquet',
'gimel.s3.object.location'='s3a://gimeltestbucket/userdata.parquet',
'gimel.storage.type'='S3');
Json file
CREATE EXTERNAL TABLE `udc.test_s3_json`(
`data` string COMMENT 'from deserializer')
LOCATION
'hdfs://hadoopcluster/tmp/udc/test_s3_json'
TBLPROPERTIES (
'gimel.s3.object.format'='json',
'gimel.s3.object.location'='s3a://gimeltestbucket/test.json',
'gimel.storage.type'='S3');
Create JSON Dataset Properties
Create DatasetProperties json when using USER as gimel.catalog.provider
.
val dataSetProperties_s3 = s"""
{
"datasetType": "S3",
"fields": [],
"partitionFields": [],
"props": {
"gimel.storage.type":"S3",
"datasetName":"udc.sample_s3_dataset",
"fs.s3a.path.style.access":"true",
"fs.s3a.endpoint":"s3-region.example.com",
"gimel.s3.credentials.strategy": "file",
"gimel.s3.credentials.file.path": "/path/s3_credentials",
"gimel.s3.credentials.file.source": "local",
"gimel.s3.file.header": "true",
"gimel.s3.file.inferSchema": "true"
}
}"""
Supported File Types
- CSV
- JSON
- PARQUET
- TXT
Catalog Properties
Property | Mandatory? | Description | Example | Default |
---|---|---|---|---|
fs.s3a.path.style.access | N | Path style enabled in S3? | true/false | true |
fs.s3a.endpoint | Y | S3 end point | s3-us.example.com:80 | |
fs.s3a.access.key | Y | Access ID (This is a Must if gimel.s3.credentials.strategy=user ) |
||
fs.s3a.secret.key | Y | Secret Key (This is a Must if gimel.s3.credentials.strategy=user ) |
||
gimel.s3.file.header | N | File Header for CSV file | true/false | false |
gimel.s3.file.inferSchema | N | Infer Schema from header? | true/false | false |
gimel.s3.object.location | Y | Object Location on S3 | s3a://gimeltestbucket/flights/airports.csv | |
gimel.s3.credentials.strategy | Y | Credentials strategy | file/user/credentialLess | file |
gimel.s3.credentials.file.source | Y | This is a Must if gimel.s3.credentials.strategy=file |
local/hdfs | |
gimel.s3.credentials.file.path | Y | This is a Must if gimel.s3.credentials.strategy=file |
/path/xxxx/s3_credentials | |
gimel.s3.save.mode | N | Write mode for S3 (https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes) | append/overwrite/error/ignore | error |
-------------------------------------------------------------------------------------------------------------------- |
Password Options
- To access an S3 bucket, Access ID and Secret Key are required
- These credentials can be given using either local file sytem or HDFS file system
- For local file system, we need to put the credentials in a file and mention the file as below options
-
This will be useful in default or yarn client mode
"gimel.s3.credentials.strategy" -> "file" "gimel.s3.credentials.file.source" -> "local" "gimel.s3.credentials.file.path" -> "/path/xxxx/mycredentials.txt"
-
for hdfs file system, we need to put the password in a file and mention the file as below options.
-
This will be useful in yarn cluster mode
"gimel.s3.credentials.strategy" -> "file" "gimel.s3.credentials.file.source" -> "hdfs" "gimel.s3.credentials.file.path" -> "hdfs://cluster/xxxx/mycredentials.txt"
-
The credentials file must not be accessible by any other user (Set permission to 700)
- Put the Access ID and Secret Key in credentials file in the following format
ACCESS-ID SECRET-KEY
S3 GIMEL Read API for CSV
- The following example says how to read data from S3 by giving the credentials using local file system as source
Common Imports and initializations
val dataSet = new com.paypal.gimel.DataSet(spark)
val options = Map("gimel.s3.object.format" -> "csv",
"gimel.s3.credentials.strategy" -> "file",
"gimel.s3.credentials.file.source" -> "local",
"gimel.s3.credentials.file.path" -> "/path/xxxx/s3_credentials",
"gimel.s3.file.header" -> "true",
"gimel.s3.file.inferSchema" -> "true")
val dfRead = dataSet.read("udc.S3.Test.gimeltestbucket.flights_airports_csv", options)
dfRead.show
S3 GIMEL Write API CSV
- The following example says how to write data to S3 by giving the credentials using HDFS file as source
val options = Map("gimel.s3.object.format" -> "csv",
"gimel.s3.credentials.strategy" -> "file",
"gimel.s3.credentials.file.source" -> "hdfs",
"gimel.s3.credentials.file.path" -> "hdfs://hadoopcluster/user/xxxx/s3_credentials",
"gimel.s3.file.header" -> "true",
"gimel.s3.file.inferSchema" -> "true")
val dfWrite = dataSet.write("udc.S3.Test.gimeltestbucket.test_kafka_to_s3", options)
S3 GIMEL GSQL
Common Imports and initializations
import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
//Set UDC parameters for testing
gsql("set rest.service.method=https")
gsql("set rest.service.host=udc-rest-api-host")
gsql("set rest.service.port=443")
gsql("set gimel.catalog.provider=UDC")
gsql("set gimel.logging.level=CONSOLE")
CSV file Read
gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.flights_airports_csv")
JSON file Read
gsql("set gimel.s3.object.format=json")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_json")
CSV file Read with different delimeter
gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.file.delimiter=|")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_delimiter_csv")
Parquet file Read
gsql("set gimel.s3.object.format=parquet")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.userdata_parquet")
Kafka to S3
gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
gsql("set gimel.kafka.throttle.batch.fetchRowsOnFirstRun=1000")
gsql("set gimel.s3.save.mode=overwrite")
val df = gsql("insert into udc.S3.Test.gimeltestbucket.flights_airports_csv
select * from udc.Kafka.Gimel_Dev.test.flights_airports")
Limitations
- Presently we dont support Binary file type.