Skip to content
Last updated

SFTP V2サーバーインポートインテグレーション

SFTP_V2のData Connectorを使用すると、SFTPサーバーに保存されているファイルをTreasure Dataにインポートできます。

前提条件

  • Treasure Dataの基本的な知識
  • このインテグレーションを使用する前に、お使いの環境で有効なプロトコルを確認してください。

SFTPを使用する場合は、このインテグレーションをSFTP用に使用できます。

FTP/FTPSを使用する場合は、FTPインポートインテグレーションへの接続をお試しください。

  • ファイアウォールを使用している場合は、許可されているIPレンジとポートを確認してください。サーバー管理者は、セキュリティ上の理由からデフォルトのポート番号をTCP 22から変更することがあります。
  • 「PuTTY」やその他の形式はサポートされていません。

制限事項とサポート内容

  • STOREDおよびDEFLATE圧縮方式のみをサポートしています。
  • マルチパートgzipファイルは動作しない可能性があります。

Treasure Data Integration の静的 IP アドレス

セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。

リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります: https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/

TD Consoleを使用した接続の作成

新しい接続の作成

Treasure Dataでは、クエリを実行する前にデータ接続を作成して設定する必要があります。データ接続の一部として、インテグレーションにアクセスするための認証情報を提供します。

  1. TD Consoleを開きます。

  2. Integrations Hub > Catalogに移動します。

  3. SFTP_V2を検索して選択します。

  1. Create Authenticationを選択します。

以下のダイアログが開きます。パラメータを編集します。Continueを選択します。

パラメータ説明
HostリモートSFTPインスタンスのホスト情報。例: IPアドレス
PortリモートSFTPインスタンスの接続ポート。デフォルトは22です。
UserリモートSFTPインスタンスへの接続に使用するユーザー名
Authentication modeSFTPサーバーで認証する方法を選択します。
Secret key fileAuthentication Modeから「public/private key pair」が選択されている場合は必須です。(RSA、DSS、ECDSA、ED25519がサポートされています)
Passphrase for secret key file(オプション) 必要に応じて、提供された秘密鍵ファイルのパスフレーズを入力します。
Retry limit接続失敗時のリトライ回数 (デフォルト: 10)
Timeout接続タイムアウト(秒単位、デフォルト: 600)
  1. 接続の名前を入力します。

  2. 認証を他のユーザーと共有するかどうかを選択します。

  3. Continueを選択します。

Treasure Dataへのデータ転送

認証済み接続を作成すると、自動的にAuthentications画面に移動します。

  1. 作成した接続を検索します。

  2. New Sourceを選択します。

  3. Data Transfer Name フィールドにSourceの名前を入力します。

  4. Nextを選択します。

  5. 以下のパラメータを編集します:

パラメータ説明
User directory rootパスプレフィックスがユーザーディレクトリ配下にあるかをチェックします。例: /home/test_user

Path prefix

対象ファイルのプレフィックス。フォルダを指す必要があります(文字列、必須)。SFTP v1とは異なり、パスプレフィックスはフォルダパスである必要があります。ファイルパスに部分的なファイル名が含まれている場合、invalid path_prefix:xxxというエラーメッセージが表示されます。

SFTP v1からSFTP v2 Importに移行する場合、v2のpath_prefixはv1と同じように動作しないことに注意してください。例えば、SFTP v1とは異なり、パスプレフィックスはフォルダパスである必要があります。

Path match patternファイルパスをクエリするための正規表現を入力します。ファイルパスが指定されたパターンと一致しない場合、そのファイルはスキップされます。例えば、パターン.csv$を指定すると、パスがパターンと一致しないファイルはスキップされます。
Incrementalインクリメンタルロードを有効にします(boolean、オプション。デフォルト: true)。インクリメンタルロードが有効な場合、次回実行用の設定差分にlast_pathパラメータが含まれるため、次回実行時にはそのパスより前のファイルがスキップされます。それ以外の場合、last_pathは含まれません。
Start after pathこの値よりも辞書式順序で大きいパスのみがインポートされます。
  1. Nextを選択します。

Data Settings ページは必要に応じて変更できますが、スキップすることもできます。

Data Preview

インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。

  1. Next を選択します。Data Preview ページが開きます。
  2. データをプレビューする場合は、Generate Preview を選択します。
  3. データを確認します。

Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

  1. Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。

  2. Database を選択 > Select an existing または Create New Database を選択します。

  3. オプションで、database 名を入力します。

  4. Table を選択 > Select an existing または Create New Table を選択します。

  5. オプションで、table 名を入力します。

  6. データをインポートする方法を選択します。

    • Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
    • Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
    • Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
  7. Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。

  8. データストレージの Timezone を選択します。

  9. Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。

一度だけ実行

  1. Off を選択します。
  2. Scheduling Timezone を選択します。
  3. Create & Run Now を選択します。

定期的に繰り返す

  1. On を選択します。
  2. Schedule を選択します。UI では、@hourly@daily@monthly、またはカスタム cron の 4 つのオプションが提供されます。
  3. Delay Transfer を選択して、実行時間の遅延を追加することもできます。
  4. Scheduling Timezone を選択します。
  5. Create & Run Now を選択します。

転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。

TD Workflowを使用したSFTPからのインポート

ワークフローを作成して実行します

_export:
  td:
    database: workflow_sftp_v2
    table: workflow_sftp_v2
+import_from_sftp_v2:
  td_load>: imports/seed.yml
  database: ${td.database}
  table: ${td.table}

インポート用のSFTP接続の詳細をseed.ymlファイルに設定します。

in:
  type: sftp_v2
  host: HOST
  port: <PORT, default is 22>
  auth_method: key_pair
  user: USER
  secret_key_file:
    content: |
      -----BEGIN RSA PRIVATE KEY-----
      Proc-Type: 4,ENCRYPTED
      DEK-Info: AES-128-CBC...
      ...
      -----END RSA PRIVATE KEY-----
  secret_key_passphrase: PASSPHRASE
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
  parser:
    skip_header_lines: 1
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: "%Y-%m-%d %H:%M:%S"}
    - {name: purchase, type: timestamp, format: "%Y%m%d"}
    - {name: comment, type: string}
    - {name: json_column, type: json}
out:
  mode: append
設定パラメータ
host:(string, 必須)
port:(string, デフォルト: 22)
auth_method:(string ['password', 'key_pair'], 必須)
user:(string, 必須)
password:(string, デフォルト: null)
secret_key_file:(string, デフォルト: null)。OpenSSH形式が必要です。
secret_key_passphrase:(string, デフォルト: "")
user_directory_is_root:(boolean, デフォルト: true)
timeout: SFTP接続タイムアウト秒数(integer, デフォルト: 600)
path_prefix: 出力パスのプレフィックス(string, 必須)
incremental: インクリメンタルローディングを有効化(boolean, オプション。デフォルト: true)。インクリメンタルローディングが有効な場合、次回実行の設定差分にはlast_pathパラメータが含まれ、次回実行時にそのパス以前のファイルをスキップします。それ以外の場合、last_pathは含まれません。
path_match_pattern:ファイルパスに一致する正規表現。ファイルパスがこのパターンに一致しない場合、そのファイルはスキップされます(正規表現文字列、オプション)
total_file_count_limit:読み込むファイルの最大数 (integer, オプション)
min_task_size (実験的):タスクの最小サイズ。これが0より大きい場合、1つのタスクに複数の入力ファイルが含まれます。これは、タスク数が多すぎることが出力プラグインやエグゼキュータプラグインのパフォーマンスに悪影響を与える場合に有用です。(integer, オプション)

CLIを使用したSFTPインポート(TD Toolbelt)

TD Toolbeltのインストール

最新のTreasure Data Toolbeltをインストールしてください。

$ td --version

シード設定ファイルの作成(seed.yml)

以下の例に示すように、SFTP_v2の詳細を含むseed.ymlを準備します。公開鍵/秘密鍵ペアとパスワードの2つの認証方法をサポートしています。

公開鍵と秘密鍵ペア認証

以下の内容でseed.ymlを作成します。

in:
  type: sftp_v2
  host: HOST
  port: <PORT, default is 22>
  auth_method: key_pair
  user: USER
  secret_key_file:
    content: |
      -----BEGIN RSA PRIVATE KEY-----
      Proc-Type: 4,ENCRYPTED
      DEK-Info: AES-128-CBC...
      ...
      -----END RSA PRIVATE KEY-----
  secret_key_passphrase: PASSPHRASE
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
out:
  mode: append
  exec: {}

secret\_key\_fileにはOpenSSH形式が必要です。

パスワード認証

以下の内容でseed.ymlを作成します。

in:
  type: sftp_v2
  host: HOST
  port: <PORT, default is 22>
  auth_method: password
  user: USER
  password: PASSWORD
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
out:
  mode: append
  exec: {}

パスワードには次の特殊文字を使用できます: "#$!*@"

SFTP_v2インテグレーションは、指定されたプレフィックスに一致するすべてのファイルをインポートします。path_prefixはファイルまたはフォルダを指す必要があります(例: path_prefix: path/to/sample–> path/to/sample/201501.csv.gzpath/to/sample/201502.csv.gz、…、path/to/sample/201505.csv.gz)。

フィールドの推測(load.ymlの生成)

connector:guessを使用します。このコマンドは、ソースファイルを自動的に読み取り、ファイル形式を評価(ロジックを使用して推測)します。

$ td connector:guess seed.yml -o load.yml

load.ymlを開くと、ファイル形式、エンコーディング、カラム名、型を含む推測されたファイル形式定義が表示されます。この例では、CSVファイルをロードしようとしています。

in:
  type: sftp_v2
  host: HOST
  port: <PORT, default is 22>
  auth_method: key_pair
  user: USER
  secret_key_file:
    content: |
      -----BEGIN RSA PRIVATE KEY-----
      Proc-Type: 4,ENCRYPTED
      DEK-Info: AES-128-CBC...
      ...
      -----END RSA PRIVATE KEY-----
  secret_key_passphrase: PASSPHRASE
  user_directory_is_root: true
  timeout: 600
  path_prefix: /path/to/sample
  parser:
    skip_header_lines: 1
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: "%Y-%m-%d %H:%M:%S"}
    - {name: purchase, type: timestamp, format: "%Y%m%d"}
    - {name: comment, type: string}
    - {name: json_column, type: json}
out:
  mode: append
  exec: {}

次に、preview コマンドを使用して、システムがファイルをどのように解析するかをプレビューできます。

td connector:preview load.yml

guess コマンドは、ソースデータファイルに3行以上、2列以上必要です。これは、ソースデータのサンプル行を使用して列定義を推測するためです。

システムが予期しない列名や列タイプを検出した場合は、load.yml を直接修正して、再度プレビューしてください。

この統合は、「boolean」、「long」、「double」、「string」、「timestamp」タイプの解析をサポートしています。

また、データロードジョブを実行する前に、データベースとテーブルを作成する必要があります。次の手順に従ってください:

td database:create td_sample_db
td table:create td_sample_db td_sample_table

ロードジョブの実行

ロードジョブを送信します。データのサイズによっては、数時間かかる場合があります。データを保存する Treasure Data のデータベースとテーブルを指定します。

--time-column オプションを指定することをお勧めします。Treasure Data のストレージは時間でパーティション分割されているためです(データパーティショニングを参照)。オプションが指定されていない場合、統合は最初の long または timestamp 列をパーティショニング時間として選択します。--time-column で指定される列のタイプは、long および timestamp タイプのいずれかである必要があります。

データに時間列がない場合は、add_time フィルターオプションを使用して時間列を追加できます。詳細については、add_time フィルタープラグインを参照してください。

td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

connector:issue コマンドは、database(td_sample_db)table(td_sample_table) が既に作成されていることを前提としています。TD にデータベースまたはテーブルが存在しない場合、connector:issue コマンドは失敗します。これが発生した場合は、データベースを作成し、テーブルを作成するか、td connector:issue コマンドで --auto-create-table オプションを使用して、データベースとテーブルを自動作成します:

td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

この統合は、サーバー側でレコードをソートしません。時間ベースのパーティショニングを効果的に使用するには、事前にレコードをソートしてください。

time というフィールドがある場合は、--time-column オプションを指定する必要はありません。

td connector:issue load.yml --database td_sample_db --table td_sample_table

スケジュール実行

増分 SFTP_v2 ファイルインポートの定期的な統合実行をスケジュールできます。高可用性を確保するために、スケジューラを慎重に設定しています。この機能を使用すると、ローカルデータセンターで cron デーモンを使用する必要がなくなります。

スケジュールインポートの場合、SFTP_v2 の統合は、最初に指定されたプレフィックスに一致するすべてのファイル(例:path_prefix: path/to/sample –> path/to/sample/201501.csv.gzpath/to/sample/201502.csv.gz、...、path/to/sample/201505.csv.gz)をインポートし、次回の実行のために最後のパス(path/to/sample/201505.csv.gz)を記憶します。

2回目以降の実行では、アルファベット順(辞書順)で最後のパスより後に来るファイルのみをインポートします。(path/to/sample/201506.csv.gz、...)

スケジュールの作成

td connector:create コマンドを使用して、新しいスケジュールを作成できます。スケジュールの名前、cron 形式のスケジュール、データを保存するデータベースとテーブル、および統合設定ファイルが必要です。

td connector:create \
daily_import \
"10 0 * * *" \
td_sample_db \
td_sample_table \
load.yml

Treasure Data のストレージは時間でパーティション分割されているため、--time-column オプションを指定することをお勧めします。

td connector:create \
daily_import \
"10 0 * * *" \
td_sample_db \
td_sample_table \
load.yml \
--time-column created_at

cron パラメータは、@hourly@daily@monthly の3つの特殊なオプションも受け入れます。

デフォルトでは、スケジュールは UTC タイムゾーンで設定されます。-t または --timezone オプションを使用して、タイムゾーンでスケジュールを設定できます。--timezone オプションは、「Asia/Tokyo」、「America/Los_Angeles」などの拡張タイムゾーン形式のみをサポートします。PST、CST などのタイムゾーンの略語はサポートされて*おらず*、予期しないスケジュールにつながる可能性があります。

スケジュールのリスト

td connector:list コマンドを実行すると、現在スケジュールされているエントリのリストを確認できます。

td connector:list

設定とスケジュール履歴の表示

td connector:show は、スケジュールエントリの実行設定を表示します。

td connector:show daily_import

td connector:history は、スケジュールエントリの実行履歴を表示します。個々の実行の結果を調査するには、td job jobid を使用します。

td connector:history daily_import

スケジュールの削除

td connector:delete はスケジュールを削除します。

td connector:delete daily_import

out プラグインのモード

seed.yml の out セクションでファイルインポートモードを指定できます。

append(デフォルト)

これはデフォルトモードで、レコードはターゲットテーブルに追加されます。

in:
  ...
out:
  mode: append

replace(td 0.11.10 以降)

このモードは、ターゲットテーブルのデータを置き換えます。ターゲットテーブルに対して手動で行ったスキーマ変更は、このモードでもそのまま残ることに注意してください。

in:
  ...
out:
  mode: replace

Workflow 経由で SFTP サーバーからインポート

STFP サーバーからファイルをインポートするサンプルワークフローについては、Treasure Boxes を参照してください。