The Data Connector for SFTP_V2 enables you to import files stored on your SFTP server to Treasure Data.
Prerequisites
Basic knowledge of Treasure Data.
Before using this integration, determine valid protocols for your environment.
If you intend to use SFTP, you can use this integration for SFTP.
If using FTP/FTPS, try connecting with the FTP Import Integration.
If you are using a firewall, check your accepted IP range and port. Server administrators sometimes change the default port number from TCP 22 for security reasons.
- “PuTTY” and other formats are not supported.
Limitations and Supported
- Support only the STORED and DEFLATE compression methods.
- Multi-part gzip file may not work.
Static IP Address of Treasure Data
The static IP address of Treasure Data is the access point and source of the linkage for this Integration. To determine the static IP address, contact your Customer Success representative or Technical support.
Use the TD Console to Create Your Connection
Create a New Connection
In Treasure Data, you must create and configure the data connection prior to running your query. As part of the data connection, you provide authentication to access the integration.
1. Open TD Console.
2. Navigate to Integrations Hub > Catalog.
3. Search for and select SFTP_V2.
4. Select Create Authentication.
5. The following dialog opens. Edit the parameters. Select Continue.
Parameters | Description |
---|---|
Host | The host information of the remote SFTP instance, for example an IP address. |
Port | The connection port on the remote SFTP instance, the default is 22. |
User | The user name used to connect to the remote SFTP instance. |
Authentication mode | The way you choose to authenticate with your SFTP server. |
Secret key file | Required if 'public / private key pair' is selected from `Authentication Mode`. (ECDSA key type is supported.) |
Passphrase for secret key file | (Optional) If required, provide a passphrase for the provided secret file. |
Retry limit | Number of times to retry a failed connection (default 10). |
Timeout | Connection timeout in seconds (default 600). |
6. Enter a name for your connection.
7. Choose to share the authentication with others or not.
8. Select Continue.
Transfer Your Data to Treasure Data
After creating the authenticated connection, you are automatically taken to Authentications.
1. Search for the connection you created.
2. Select New Source.
3. Type a name for your Source in the Data Transfer field.
4. Select Next.
5. Edit the following parameters:
Parameters | Description |
---|---|
User directory root | Check if path prefix is under user directory Ex: /home/test_user |
Path prefix | Prefix of target files and it must point to file or folder (string, required) |
Path match pattern | Type a regular expression to query file paths. If a file path doesn’t match with the specified pattern, the file is skipped. For example, if you specify the pattern .csv$ # , then a file is skipped if its path doesn’t match the pattern. |
Incremental | Enables incremental loading (boolean, optional. default: true). If incremental loading is enabled, the config diff for the next execution will include last_path parameter so that the next execution skips files before the path. Otherwise, last_path is not included. |
Start after path | Only paths lexicographically greater than this will be imported. |
6. Select Next.
The Data Settings page can be modified for your needs or you can skip the page.
7. Select Next.
You can preview data for your needs or you can skip the page.
8. Select Next.
Data Placement
For data placement, select the target database and table where you want your data placed and indicate how often the import should run. Select Next. Under Storage you will create a new or select an existing database and create a new or select an existing table for where you want to place the imported data. Select a Database > Select an existing or Create New Database. Optionally, type a database name. Select a Table> Select an existing or Create New Table. Optionally, type a table name. Choose the method for importing the data. Append (default)-Data import results are appended to the table. Always Replace-Replaces the entire content of an existing table with the result output of the query. If the table does not exist, a new table is created. Replace on New Data-Only replace the entire content of an existing table with the result output when there is new data. Select the Timestamp-based Partition Key column. Select the Timezone for your data storage. Under Schedule, you can choose when and how often you want to run this query. Select Off. Select Scheduling Timezone. Select Create & Run Now. Repeat the query: Select On. Select the Schedule. The UI provides these four options: @hourly, @daily and @monthly or custom cron. You can also select Delay Transfer and add a delay of execution time. Select Scheduling Timezone. Select Create & Run Now. After your transfer has run, you can see the results of your transfer in Data Workbench > Databases.
If the table does not exist, it will be created.
If you want to set a different partition key seed than the default key, you can specify the long or timestamp column as the partitioning time. As a default time column, it uses upload_time with the add_time filter.
Import with SFTP via TD Workflow
Create and run a workflow
_export: td: database: workflow_sftp_v2 table: workflow_sftp_v2 +import_from_sftp_v2: td_load>: imports/seed.yml database: ${td.database} table: ${td.table}
Modify the seed.yml file with your SFTP connection details for the import.
in: type: sftp_v2 host: <HOST> port: <PORT, default is 22> auth_method: key_pair user: <USER> secret_key_file: content: | -----BEGIN RSA PRIVATE KEY----- Proc-Type: 4,ENCRYPTED DEK-Info: AES-128-CBC... ... -----END RSA PRIVATE KEY----- secret_key_passphrase: <PASSPHRASE> user_directory_is_root: true timeout: 600 path_prefix: /path/to/sample parser: skip_header_lines: 1 charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: "%Y-%m-%d %H:%M:%S"} - {name: purchase, type: timestamp, format: "%Y%m%d"} - {name: comment, type: string} - {name: json_column, type: json} out: mode: append
Import with SFTP via the CLI (TD Toolbelt)
Install TD Toolbelt
Install the most current Treasure Data Toolbelt.
$ td --version
Create Seed Config File (seed.yml)
Prepare seed.yml as shown in the following example, with your SFTP_v2 details. We support two authentication methods: Public / Private Key Pair, and Password.
Public and Private Key Pair Authentication
Create seed.yml with the following content.
in: type: sftp_v2 host: <HOST> port: <PORT, default is 22> auth_method: key_pair user: <USER> secret_key_file: content: | -----BEGIN RSA PRIVATE KEY----- Proc-Type: 4,ENCRYPTED DEK-Info: AES-128-CBC... ... -----END RSA PRIVATE KEY----- secret_key_passphrase: <PASSPHRASE> user_directory_is_root: true timeout: 600 path_prefix: /path/to/sample out: mode: append exec: {}
`secret_key_file` requires OpenSSH format. |
Password Authentication
Create seed.yml with the following content.
in: type: sftp_v2 host: <HOST> port: <PORT, default is 22> auth_method: password user: <USER> password: <PASSWORD> user_directory_is_root: true timeout: 600 path_prefix: /path/to/sample out: mode: append exec: {}
You can use the following special characters in the password: "#$!*@" |
The SFTP_v2 integration imports all files that match the specified prefix. path_prefix must point to file or folder (e.g. path_prefix: path/to/sample
–> path/to/sample/201501.csv.gz
, path/to/sample/201502.csv.gz
, …, path/to/sample/201505.csv.gz
).
Guess Fields (Generate load.yml)
Use connector:guess. This command automatically reads the source file, and assesses (uses logic to guess) the file format.
$ td connector:guess seed.yml -o load.yml
If you open load.yml, you see the guessed file format definitions including file formats, encodings, column names, and types. This example is trying to load CSV files.
in: type: sftp_v2 host: <HOST> port: <PORT, default is 22> auth_method: key_pair user: <USER> secret_key_file: content: | -----BEGIN RSA PRIVATE KEY----- Proc-Type: 4,ENCRYPTED DEK-Info: AES-128-CBC... ... -----END RSA PRIVATE KEY----- secret_key_passphrase: <PASSPHRASE> user_directory_is_root: true timeout: 600 path_prefix: /path/to/sample parser: skip_header_lines: 1 charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: "%Y-%m-%d %H:%M:%S"} - {name: purchase, type: timestamp, format: "%Y%m%d"} - {name: comment, type: string} - {name: json_column, type: json} out: mode: append exec: {}
Then, you can preview how the system will parse the file by using the preview command.
$ td connector:preview load.yml +-------+---------+----------+---------------------+ | id | company | customer | created_at | +-------+---------+----------+---------------------+ | 11200 | AA Inc. | David | 2015-03-31 06:12:37 | | 20313 | BB Imc. | Tom | 2015-04-01 01:00:07 | | 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 | | 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 | | 93133 | EE Inc. | Jake | 2015-04-02 14:11:13 | +-------+---------+----------+---------------------+
The guess command needs over 3 rows and 2 columns in source data file, because it guesses column definition using sample rows from source data.
If the system detects your column name or column type unexpectedly, modify load.yml directly and preview again.
The integration supports parsing of “boolean”, “long”, “double”, “string”, and “timestamp” types.
You also must create a database and table prior to executing the data load job. Follow these steps:
$ td database:create td_sample_db $ td table:create td_sample_db td_sample_table
Execute Load Job
Submit the load job. It may take a couple of hours depending on the size of the data. Specify the Treasure Data database and table where the data should be stored.
It’s also recommended to specify --time-column option, because Treasure Data’s storage is partitioned by time (see data partitioning) If the option is not provided, the integration chooses the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.
If your data doesn’t have a time column, you can add a time column by using add_time filter option. For more details, see add_time filter plugin.
$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at
The connector:issue command assumes that you have already created a database(td_sample_db)and a table(td_sample_table). If the database or the table do not exist in TD, the connector:issue command fails, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:
$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table
The integration does not sort records on server-side. To use time-based partitioning effectively, sort records beforehand.
If you have a field called `time`, you don't have to specify the `--time-column` option.
$ td connector:issue load.yml --database td_sample_db --table td_sample_table
Scheduled Execution
You can schedule periodic integration execution for incremental SFTP_v2 file import. We configure our scheduler carefully to ensure high availability. By using this feature, you no longer need a crondaemon on your local data center.
For the scheduled import, the integration for SFTP_v2 imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample
–> path/to/sample/201501.csv.gz
, path/to/sample/201502.csv.gz
, …, path/to/sample/201505.csv.gz
) at first and remembers the last path (path/to/sample/201505.csv.gz
) for the next execution.
On the second and on subsequent runs, it imports only files that comes after the last path in alphabetical (lexicographic) order. (path/to/sample/201506.csv.gz
, …)
Create the Schedule
A new schedule can be created using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where the data will be stored, and the integration configuration file.
$ td connector:create \ daily_import \ "10 0 * * *" \ td_sample_db \ td_sample_table \ load.yml
It's also recommended to specify the --time-column option, because Treasure Data’s storage is partitioned by time.
$ td connector:create \ daily_import \ "10 0 * * *" \ td_sample_db \ td_sample_table \ load.yml \ --time-column created_at
The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`. |
By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. The `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.
List the Schedules
You can see the list of currently scheduled entries by running the command td connector:list.
$ td connector:list +--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+ | Name | Cron | Timezone | Delay | Database | Table | Config | +--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+ | daily_import | 10 0 * * * | UTC | 0 | td_sample_db | td_sample_table | {"in"=>{"type"=>"sftp_v2", "access_key_id"... | +--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+
Show the Setting and Schedule History
td connector:show shows the execution setting of a schedule entry.
% td connector:show daily_import Name : daily_import Cron : 10 0 * * * Timezone : UTC Delay : 0 Database : td_sample_db Table : td_sample_table Config --- in: type: sftp_v2 host: <HOST> port: <PORT, default is 22> auth_method: password user: <USER> password: <PASSWORD> path_prefix: /sftp/file/path/prefix parser: charset: UTF-8 ...
td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, use td job <jobid>.
% td connector:history daily_import +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ | JobID | Status | Records | Database | Table | Priority | Started | Duration | +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ | 578066 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-18 00:10:05 +0000 | 160 | | 577968 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-17 00:10:07 +0000 | 161 | | 577914 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-16 00:10:03 +0000 | 152 | | 577872 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-15 00:10:04 +0000 | 163 | | 577810 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-14 00:10:04 +0000 | 164 | | 577766 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-13 00:10:04 +0000 | 155 | | 577710 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-12 00:10:05 +0000 | 156 | | 577610 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-11 00:10:04 +0000 | 157 | +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ 8 rows in set
Delete the Schedule
td connector:delete will remove the schedule.
$ td connector:delete daily_import
Modes for out plugin
You can specify file import mode in out section of seed.yml.
append (default)
This is the default mode and records are appended to the target table.
in: ... out: mode: append
replace (In td 0.11.10 and later)
This mode replaces data in the target table. Note that any manual schema changes made to the target table will remain intact with this mode.
in: ... out: mode: replace