The Data Connector for SFTP_V2 enables you to import files stored on your SFTP server to Treasure Data.


Prerequisites

  • Basic knowledge of Treasure Data.

  • Before using this integration, determine valid protocols for your environment.

If you intend to SFTP, you can use this integration for SFTP.
If FTP/FTPS, try connect with FTP Import Integration.

    • If you are using a firewall, check your accepted IP range/port. Server administrators sometimes change the default port number from TCP/22 for security reasons.

    • “PuTTY” and other formats are not supported.

Limitations and Supported

  • Support only the STORED and DEFLATE compression methods.
  • Multi-part gzip file may not work

Static IP Address of Treasure Data

The static IP address of Treasure Data is the access point and source of the linkage for this Integration. To determine the static IP address, contact your Customer Success representative or Technical support.

Use the TD Console to Create Your Connection

Create a New Connection

In Treasure Data, you must create and configure the data connection prior to running your query. As part of the data connection, you provide authentication to access the integration.

1. Open TD Console.
2. Navigate to Integrations Hub  Catalog.
3. Search for and select SFTP_V2.

4. Select Create Authentication.
5. The following dialog opens. Edit the parameters. Select Continue.

     

ParametersDescription
HostThe host information of the remote SFTP instance, for example an IP address.
PortThe connection port on the remote SFTP instance, the default is 22.
UserThe user name used to connect to the remote SFTP instance.
Authentication modeThe way you choose to authenticate with your SFTP server.
Secret key fileRequired if 'public / private key pair' is selected from `Authentication Mode`. (ecdsa key type is supported.)
Passphrase for secret key file(Optional) If required, provide a passphrase for the provided secret file.
Retry limitNumber of times to retry a failed connection (default 10).
Timeout

Connection timeout in seconds (default 600).


6. Enter a name for your connection.
7. Choose to share the authentication with others or not. 
8. Select Continue.



Transfer Your Data to Treasure Data


After creating the authenticated connection, you are automatically taken to Authentications.


1. Search for the connection you created. 
2. Select New Source.
3. Type a name for your Source in the Data Transfer field.

4. Select Next.

5. Edit the following parameters:
Parameters Description

User directory root

check if path prefix is under user directory Ex: /home/test_user

Path prefix

Prefix of target files and it must point to file or folder (string, required)

Path match pattern

Type a regular expression to query file paths. If a file path doesn’t match with the specified pattern, the file is skipped. For example, if you specify the pattern  .csv$ # , then a file is skipped if its path doesn’t match the pattern.

Incremental

 Enables incremental loading (boolean, optional. default: true. If incremental loading is enabled, the config diff for the next execution will include last_path parameter so that the next execution skips files before the path. Otherwise, last_path is not included.

Start after path

Only paths lexicographically greater than this will be imported.

6. Select Next.

The Data Settings page can be modified for your needs or you can skip the page.

7. Select Next.

You can preview data for your needs or you can skip the page.

8. Select Next.

Data Placement

For data placement, select the target database and table where you want your data placed and indicate how often the import should run.

  1.  Select Next. Under Storage you will create a new or select an existing database and create a new or select an existing table for where you want to place the imported data.

  2. Select a Database > Select an existing or Create New Database.

  3. Optionally, type a database name.

  4. Select a Table> Select an existing or Create New Table.

  5. Optionally, type a table name.

  6. Choose the method for importing the data.

    • Append (default)-Data import results are appended to the table.
      If the table does not exist, it will be created.

    • Always Replace-Replaces the entire content of an existing table with the result output of the query. If the table does not exist, a new table is created. 

    • Replace on New Data-Only replace the entire content of an existing table with the result output when there is new data.

  7. Select the Timestamp-based Partition Key column.
    If you want to set a different partition key seed than the default key, you can specify the long or timestamp column as the partitioning time. As a default time column, it uses upload_time with the add_time filter.

  8. Select the Timezone for your data storage.

  9. Under Schedule, you can choose when and how often you want to run this query.

    • Run once:
      1. Select Off.

      2. Select Scheduling Timezone.

      3. Select Create & Run Now.

    • Repeat the query:

      1. Select On.

      2. Select the Schedule. The UI provides these four options: @hourly, @daily and @monthly or custom cron.

      3. You can also select Delay Transfer and add a delay of execution time.

      4. Select Scheduling Timezone.

      5. Select Create & Run Now.

 After your transfer has run, you can see the results of your transfer in Data Workbench > Databases.

Import with SFTP via TD Workflow

Create and run a workflow 

_export:
  td:
    database: workflow_sftp_v2
    table: workflow_sftp_v2
+import_from_sftp_v2:
  td_load>: imports/seed.yml
  database: ${td.database}
  table: ${td.table}


Modify the seed.yml file with your SFTP connection details for the import. 

in:
  type: sftp_v2
  host: <HOST>
  port: <PORT, default is 22>
  auth_method: key_pair
  user: <USER>
  secret_key_file:
    content: |
      -----BEGIN RSA PRIVATE KEY-----
      Proc-Type: 4,ENCRYPTED
      DEK-Info: AES-128-CBC...
      ...
      -----END RSA PRIVATE KEY-----
  secret_key_passphrase: <PASSPHRASE>
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
  parser:
    skip_header_lines: 1
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: "%Y-%m-%d %H:%M:%S"}
    - {name: purchase, type: timestamp, format: "%Y%m%d"}
    - {name: comment, type: string}
    - {name: json_column, type: json}
out:
  mode: append

Import with SFTP via the CLI (TD Toolbelt)

Install TD Toolbelt

Install the most current Treasure Data Toolbelt.

$ td --version

Create Seed Config File (seed.yml)

Prepare seed.yml as shown in the following example, with your SFTP_v2 details. We support two authentication methods: Public / Private Key Pair, and Password.

Public and Private Key Pair Authentication

Create seed.yml with the following content.

in:
  type: sftp_v2
  host: <HOST>
  port: <PORT, default is 22>
  auth_method: key_pair
  user: <USER>
  secret_key_file:
    content: |
      -----BEGIN RSA PRIVATE KEY-----
      Proc-Type: 4,ENCRYPTED
      DEK-Info: AES-128-CBC...
      ...
      -----END RSA PRIVATE KEY-----
  secret_key_passphrase: <PASSPHRASE>
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
out:
  mode: append
  exec: {}

`secret_key_file` requires OpenSSH format.

Password Authentication

Create seed.yml with the following content.

in:
  type: sftp_v2
  host: <HOST>
  port: <PORT, default is 22>
  auth_method: password
  user: <USER>
  password: <PASSWORD>
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
out:
  mode: append
  exec: {}

You can use the following special characters in the password: "#$!*@"


The SFTP_v2 integration imports all files that match the specified prefix. path_prefix must point to file or folder (e.g. path_prefix: path/to/sample–> path/to/sample/201501.csv.gz, path/to/sample/201502.csv.gz, …, path/to/sample/201505.csv.gz).


Guess Fields (Generate load.yml)

Use connector:guess. This command automatically reads the source file, and assesses (uses logic to guess) the file format.

$ td connector:guess seed.yml -o load.yml

If you open load.yml, you see the guessed file format definitions including file formats, encodings, column names, and types. This example is trying to load CSV files.

in:
  type: sftp_v2
  host: <HOST>
  port: <PORT, default is 22>
  auth_method: key_pair
  user: <USER>
  secret_key_file:
    content: |
      -----BEGIN RSA PRIVATE KEY-----
      Proc-Type: 4,ENCRYPTED
      DEK-Info: AES-128-CBC...
      ...
      -----END RSA PRIVATE KEY-----
  secret_key_passphrase: <PASSPHRASE>
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
  parser:
    skip_header_lines: 1
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: "%Y-%m-%d %H:%M:%S"}
    - {name: purchase, type: timestamp, format: "%Y%m%d"}
    - {name: comment, type: string}
    - {name: json_column, type: json}
out:
  mode: append
  exec: {}

Then, you can preview how the system will parse the file by using the preview command.

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

The guess command needs over 3 rows and 2 columns in source data file, because it guesses column definition using sample rows from source data.

If the system detects your column name or column type unexpectedly, modify load.yml directly and preview again.

The integration supports parsing of “boolean”, “long”, “double”, “string”, and “timestamp” types.

You also must create a database and table prior to executing the data load job. Follow these steps:

$ td database:create td_sample_db
$ td table:create td_sample_db td_sample_table

Execute Load Job

Submit the load job. It may take a couple of hours depending on the size of the data. Specify the Treasure Data database and table where the data should be stored.

It’s also recommended to specify --time-column option, because Treasure Data’s storage is partitioned by time (see data partitioning) If the option is not provided, the integration chooses the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you can add a time column by using add_time filter option. For more details see add_time filter plugin.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

The connector:issue command assumes that you have already created a database(td_sample_db)and a table(td_sample_table). If the database or the table do not exist in TD, the connector:issue command fails, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

The integration does not sort records on server-side. To use time-based partitioning effectively, sort records beforehand.

If you have a field called `time`, you don't have to specify the `--time-column` option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table


Scheduled Execution

You can schedule periodic integration execution for incremental SFTP_v2 file import. We configure our scheduler carefully to ensure high availability. By using this feature, you no longer need a crondaemon on your local data center.

For the scheduled import, the integration for SFTP_v2 imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample –> path/to/sample/201501.csv.gz, path/to/sample/201502.csv.gz, …, path/to/sample/201505.csv.gz) at first and remembers the last path (path/to/sample/201505.csv.gz) for the next execution.

On the second and on subsequent runs, it imports only files that comes after the last path in alphabetical (lexicographic) order. (path/to/sample/201506.csv.gz, …)

Create the Schedule

A new schedule can be created using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where the data will be stored, and the integration configuration file.

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml

It’s also recommended to specify the --time-column option, because Treasure Data’s storage is partitioned by time.

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml \
    --time-column created_at

The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`.

By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. The `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of currently scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                     |
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"sftp_v2", "access_key_id"... |
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+


Show the Setting and Schedule History

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  type: sftp_v2
  host: <HOST>
  port: <PORT, default is 22>
  auth_method: password
  user: <USER>
  password: <PASSWORD>
  path_prefix: /sftp/file/path/prefix
  parser:
    charset: UTF-8
    ...

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set


Delete the Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_import


Modes for out plugin

You can specify file import mode in out section of seed.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Note that any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace








  • No labels