SFTP API

  • This API will enable read, write files into SFTP servers

Design Considerations

Spark SFTP connector

  • https://github.com/springml/spark-sftp
  • SFTP spark is a library for constructing dataframes by downloading files from SFTP and writing dataframe to a SFTP server
  • Gimel connector is using this library as dependency

Create Hive Table Catalog

The following hive table points to SFTP server

  CREATE EXTERNAL TABLE pcatalog.sftp_drop_zone (
        payload string)
      ROW FORMAT SERDE
        'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
      STORED AS INPUTFORMAT
        'org.apache.hadoop.mapred.TextInputFormat'
      OUTPUTFORMAT
        'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
      LOCATION
        'hdfs://cluster1/tmp/pcatalog/sftp_drop_zone'
      TBLPROPERTIES (
        'gimel.sftp.host'='sftp_server',
        'gimel.storage.type'='SFTP')

Supported File Types

  • CSV
  • JSON
  • AVRO
  • PARQUET
  • TXT
  • XML

Password Options

  • The passwords can be given using either local file sytem or HDFS file system
  • for local file system, we need to put the password in a file and mention the file as below options
  • This will be useful in default or yarn client mode

    "gimel.sftp.file.password.strategy" -> "file" "gimel.sftp.file.password.source" -> "local" "gimel.sftp.file.password.path" -> "/x/home/xxxx/mypass.txt"

  • for hdfs file system, we need to put the password in a file and mention the file as below options.

  • This will be useful in yarn cluster mode

    "gimel.sftp.file.password.strategy" -> "file" "gimel.sftp.file.password.source" -> "hdfs" "gimel.sftp.file.password.path" -> "hdfs://cluster/xxxx/mypass.txt"


SFTP GIMEL Read API for CSV

  • The following example says how to give the password for SFTP server using local file system as source
val options = Map("gimel.sftp.username" -> "USERNAME",
                  "gimel.sftp.file.password.strategy" -> "file"
                  "gimel.sftp.file.password.source" -> "local",
                  "gimel.sftp.file.password.path" -> "/x/home/xxxx/mypass.txt",
                  "hdfsTempLocation" -> "/tmp/basu", 
                  "header" -> "false", 
                  "gimel.sftp.filetype" -> "csv", 
                  "gimel.sftp.file.location" -> "bus_use.csv" )
val sftpDF = dataSet.read("pcatalog.SFTP.SFTP_SERVER.default.Files", options )
sftpDF.show


SFTP GIMEL Write API JSON

  • The following example says how to give the password for SFTP server using HDFS file as source
val options = Map("gimel.sftp.username" -> "USERNAME",
                  "gimel.sftp.file.password.strategy" -> "file"
                  "gimel.sftp.file.password.source" -> "hdfs",
                  "gimel.sftp.file.password.path" -> "hdfs://cluster1/user/xxxxxx/mypass.txt",
                  "hdfsTempLocation" -> "/tmp/basu", 
                  "header" -> "false", 
                  "gimel.sftp.filetype" -> "json", 
                  "gimel.sftp.file.location" -> "myJsonNew.json" )
val sftpDFRes = dataSet.write("pcatalog.SFTP.SFTP_SERVER.default.Files", sftpDF, options )

SFTP GIMEL GSQL

spark.sql("set gimel.sftp.username=USERNAME")
spark.sql("set gimel.sftp.password=*****")
spark.sql("set gimel.sftp.filetype=csv")
spark.sql("set gimel.sftp.file.location=bus_use.csv")
spark.sql("set header=false")
spark.sql("set gimel.jdbc.password.strategy=file")
spark.sql("set hdfsTempLocation=/tmp/basu")
spark.sql("set gimel.jdbc.p.file=hdfs://cluster1/user/USERNAME/pass.dat")

val newDF = com.paypal.gimel.scaas.GimelQueryProcessor.executeBatch("create table pcatalog.teradata.mycluster.test_db.myTable1 as select * from pcatalog.SFTP.SFTP_SERVER.default.Files",spark)

val sampleDF = com.paypal.gimel.scaas.GimelQueryProcessor.executeBatch("insert into pcatalog.teradata.mycluster.test_db.myTable1 as select * from pcatalog.SFTP.SFTP_SERVER.default.Files",spark)

// Here pcatalog.SFTP.SFTP_SERVER.default.Files is pcatalog table created through gimel pcatalog UI
// This is created by pointing sftp_server of paypal corp.


SFTP GIMEL READ WRITE FOR XML

// Read xml 
val options = Map("gimel.sftp.file.password.strategy" -> "file",
                        "gimel.sftp.file.password.source" -> "local",
                        "gimel.sftp.file.password.path" -> "/x/home/USER/mypass.txt",
                        "gimel.sftp.username" -> "username",
                        "header" -> "false",
                        "hdfsTempLocation" -> "hdfs://cluster1/user/username/",
                        "gimel.sftp.filetype" -> "xml",
                       "rowTag" -> "YEAR",
                         "gimel.sftp.file.location" -> "myxml.xml" );
val sftpDFRes = dataSet.read("pcatalog.SFTP.SFTP_SERVER.default.Files", options )

// Write XML

val options = Map("gimel.sftp.file.password.strategy" -> "file",
                        "gimel.sftp.file.password.source" -> "local",
                        "gimel.sftp.file.password.path" -> "/x/home/USER/mypass.txt",
                        "gimel.sftp.username" -> "username",
                        "header" -> "false",
                        "hdfsTempLocation" -> "hdfs://cluster1/user/username/",
                        "gimel.sftp.filetype" -> "xml",
                       "rowTag" -> "YEAR",
                       "rootTag" -> "YEARS",
                         "gimel.sftp.file.location" -> "myxml.xml" );
dataSet.write("pcatalog.SFTP.SFTP_SERVER.default.Files", sftpDFRes, options )



Limitations

  • While writing files to SFTP we have to specify the file names ending with .gz as it is stored in zipped format.
  • Else the write API adds .gz as the suffix and store the file in SFTP server