Snowflake offers a cloud-based data platform that allows customers to unlock the value of their data with near-unlimited scale, concurrency, and performance. Treasure Data's Snowflake integration allows you to import data stored in Snowflake tables into Treasure Data. You can also learn more about Treasure Data's Export Integration.

Prerequisites

  • Basic knowledge of Treasure Data

  • An existing account of Snowflake data warehouse with appropriate permissions to query tables/views.

Use the TD Console to Create Your Connection

Create a New Connection

Go to Integrations Hub > Catalog and search. Select Snowflake.

Create a New Snowflake Connector

The following dialog opens.

Select Authentication Method:

  • Basic: Provide the required credentials: User, Password, and Account to authenticate Treasure Data to Snowflake.
    • User: Snowflake login username.

    • Password: Snowflake login password.

  • Key Pair: Provide the Private Key and its Passphrase if it is the encrypted private key
    • Private Key: Your generated private key. See configuring-key-pair-authentication
    • Passphrase: The Private Key passphrase, or leave it empty if the Private Key is unencrypted.
    • User: Snowflake login username.
  • Account: Snowflake provided the account name. See how to find your account name in Snowflake.

  • OPTIONS: JDBC connection options (if any)

Select Continue.

Specify a name for your data connector. Indicate if you want to share the authentication with others. If you share, other team members can use your authentication to create a connector source. Select Done.

Transfer Snowflake Data to Treasure Data

Create a New Source on the Authentications page.


Complete the details.

Fetch from

You must register the information that you want to ingest. The parameters are as follows:

  • Role Name: Specifies the default access control role to use in the Snowflake. Leave empty to use the default Role.

  • Warehouse: required. Specifies the virtual warehouse to use.

  • Database: required. Snowflake database.

  • Schema: required. Specifies the default schema to use for the specified database when connected.

  • Source Type: Select Table/View or Query

    • Query:

      • SELECT Query: the Raw SQL query to run. Allows SELECT type query only. INSERT, UPDATE or modifying data are not allowed.

    • Table/View (default):

      • SELECT Columns: required. Expression of select. Accepts comma-separated column names or `*` to select all columns from Table/View.

      • From Table/View: required. Destination table name.

      • WHERE Condition: Specify WHERE condition to filter the rows

      • ORDER By Columns: expression of ORDER BY to sort rows

  • Incremental Loading: Enables incremental loading, see How Incremental Loading works.

    • Incremental Column(s): Column names for incremental loading. Allows Timestamp and Numeric columns. If not specified, the Primary column (if exists) is used.

    • ORDER By Columns: Not allowed when Incremental Loading is selected.

  • Invalid Value Handling Mode: When your table contains invalid data in a row, there are options to Fail Job, Ignore Invalid Value or Ignore Row. When Fail Job is selected, the current job stops and gives a status ERROR. When Ignore Invalid Value selected, saves a Null value if the TD table's column isn't a type of string, or tries to save the value as a string if the column is a string column. When Ignore Row is selected, ignores the row that contains the invalid value and keep running. Default if not specified, Fail Job is selected.

The specified Warehouse, Database, and Schema must already exist. The specified Role must have privileges to the specified warehouse, database, and schema.


Example: Query

Warehouse: DEMO_WH
Database: Test_DB
Schema: PUBLIC
Source Type: Query selected
SELECT Query: SELECT column1, column2, column3 
              FROM table_test
              WHERE column4 != 1
Incremental Loading: Checked
Incremental Column(s): column1

Table View

Warehouse: DEMO_WH
Database: Test_DB
Schema: PUBLIC
Source Type: Table/View selected
SELECT Columns: column1, column2, column3 FROM table_test WHERE column4 != 1
FROM Table/View: table_test
WHERE Condition: column4 != 1
ORDER By Columns:
Incremental Loading: Checked
Incremental Column(s): column1

Select Next you’ll see a preview of your data.

Preview

Preview might show dummy data if a query takes a long time to finish, such as when you use a heavy query. For example: querying a large table that contains an Order By column but is not indexed.

When you use a complex query with multiple conditions, joining tables, sometimes Preview shows "No records to preview" but it won't affect the Job result. Read more information about Previews.

To make changes, such as set data type mapping, select Advanced Settings otherwise, select Next.


Advanced Settings

Advanced Settings allow you to customize the Transfer. Edit the following section, if needed.

  • Data Type Mapping: Change the default data type conversion from source (Snowflake column) when importing data to the destination (Treasure Data), see appendix How data type conversion works.

    • Column: Snowflake column name

    • Get as Type: Get data from source as this specified type.

    • Destination Data Type: The expected destination data type.

    • Timestamp Format: Format pattern uses to format the destination type when Get as Type is a date, time, or timestamp, and Destination Data Type is a string. For example: yyyy-MM-dd'T'HH:mm:ssZ

  • Timezone: use with Timestamp Format. If not specified, then the format is a GMT value. Supports Zone offset value, such as +0800 or -1130

  • Start After (values): When Incremental Loading is selected and you specify this value, only data that greater than this value is imported. The size and order of this field must be equal to Incremental Columns.

  • Rows per Batch: Number of rows to fetch one time, default 10000 rows.

  • Network Timeout: Specifies how long to wait for a response when interacting with the Snowflake service before returning an error.

  • Query Timeout: Specifies how long to wait for a query to complete before returning an error. Zero (0) indicates to wait indefinitely.


Transfer to - Choose the Target Database and Table

Choose an existing or create a new database and table that you want to import to.

  • Mode: Append or Replace. Select whether to append records to an existing table or replace your existing table.

  • Partition key Seed: Choose the long or timestamp column as the partitioning time. As default time column, it’s used upload_time with using add_time filter.


Scheduling

You can specify a one-time transfer, or you can schedule an automated recurring transfer.

Parameters

  • Once now: Set one-time job.

  • Repeat…

    • Schedule: Options: @hourly, @daily, @monthly, and custom cron.

    • Delay Transfer: Add a delay of execution time.

  • TimeZone: Supports extended timezone formats like ‘Asia/Tokyo’.


Details

Provide a name for your Snowflake connector, as a new source.


Select Done. Your data connector is saved as a Source.

Sources

On this page, you can edit your existing jobs. You can also view the details of previous transfers using this data connector by selecting the jobs icon.


Use the CLI to Configure the Connector

Before setting up the connector, install the ‘td’ command. Install the Treasure Data Toolbelt.

Prepare load.yml file

The in: section is where you specify what comes into the connector from Snowflake and the out: section is where you specify what the connector puts out to the database in Treasure Data.

Provide your Snowflake account access information as follows:

in:
  type: snowflake
  account_name: treasuredata
  user: Test_user
  password: xxxx
  warehouse: DEMO_WH
  db: TEST_DB
  schema: PUBLIC
  query: |
    SELECT column1, column2, column3
    FROM table_test
    WHERE column4 != 1
out:
  mode: append

Configuration keys and descriptions are as follows:

Config key

Type

Required

Description

type

string

yes

connector type (must be "snowflake")

account_name

string

yes

specifies the name of your account (provided by Snowflake). See Snowflake account name.

warehouse

string

yes

specifies the virtual warehouse to use

db

string

yes

Snowflake default database to use

schema

string

yes

specifies the default schema to use for the specified database once connected

role

string

no

the default access control role to use in the Snowflake session initiated by the driver

auth_methodstringyesauthentication method. Currently supports: basic, key_pair (default "basic")
private_keystringnorequired if auth_method is key_pair
passphrasestringnothe passphrase of the private_key

user

string

yes

Snowflake login name

password

string

no

required if auth_method is basic. Password for the specified user

query

string

no

the SQL query to run

incremental

boolean

no

if true, enables incremental loading. See How incremental loading works.

incremental_columns

strings array

no

column names for incremental loading. If not specified will use Primary keys columns

invalid_value_option

string

no

options to handle invalid data in a row. Available options are fail_job, ignore_row and insert_null

last_record

objects array

no

values of the last record for incremental loading

fetch_rows

integer

no

the number of rows to fetch one time. Default 10000

network_timeout

integer

no

how long to wait for a response when interacting with the Snowflake service before returning an error. Default not set.

query_timeout

integer


how long to wait for a query to complete before returning an error. Default 0

select

string

no

expression of select. Required when 'query' is not set.

table

string

no

destination table name. Required when 'query' is not set.

where

string

no

condition to filter the rows

order_by

string

no

expression of ORDER BY to sort rows

column_options

key-value pairs

no

option to customize data type from source (Snowflake) to destination (Treasure Data), see How data type conversion works.

value_type

string

no

use with column_options, attempt to get data from Snowflake JDBC driver using this type.

type

string

no

destination data type. The value get from value_type will be converted to this type

timestamp_format

string

no

Java language timestamp format pattern. Use to format the destination value when value_type is a date, time or timestamp, and type is a string

timezone

string

no

RFC 822 timezone, uses with timestamp_format. E.g. +0300 or -0530

An example of load.yml with incremental and column_options

in:
  type: snowflake
  account_name: treasuredata
  user: snowflake_user
  password: xxxxx
  warehouse: DEMO_WH
  db: TEST_DB
  schema: PUBLIC
  query: |
    SELECT column1, column2, column3
    FROM table_test
    WHERE column4 != 1
  incremental: true
  incremental_columns: [column1, column3]
  last_record: [140, 1500.5]
  column_options:
    colum5 : {value_type: "timestamp", type: "string", timestamp_format: "yyyy-MMM-dd HH:mm:ss Z", timezone: "+0700"}
out:
  mode: append

For more details on available out modes, see modes.

To preview the data, issue the preview command

$ td connector:preview load.yml
+--------------+-----------------+----------------+
| COLUMN1:long | COLUMN2:string  | COLUMN3:double |
+--------------+-----------------+----------------+
| 100          | "Sample value1" | 1900.1         |
| 120          | "Sample value3" | 1700.3         |
| 140          | "Sample value5" | 1500.5         |
+--------------+-----------------+----------------+

Execute Load Job

Submit the load job. It may take a couple of hours depending on the size of the data. You must specify the database and table where their data is stored.

It is recommended to specify the --time-column option because Treasure Data’s storage is partitioned by time. If the option is not given, the data connector selects the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long or timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. For more information, see add_time filter plugin.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
  --time-column created_at

td connector:issue assumes you have already created a database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD, td connector:issue will fail. Therefore you must create the database and table manually or use --auto-create-table option with td connector:issue command to automatically create the database and table.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Scheduled Execution

You can schedule periodic data connector execution for periodic Snowflake import.

Create the Schedule

A new schedule can be created using the td connector:create command. The following are required:

  • the name of the schedule

  • the cron-style schedule

  • the database and table where their data will be stored

  • the Data Connector configuration file

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml

It’s also recommended to specify the --time-column option since Treasure Data’s storage is partitioned by time (see also data partitioning).

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml \
    --time-column created_at

The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`.

By default, the schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. The `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles', etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+------------+----------+-------+--------------+-----------------+----------------------------+
| Name         | Cron       | Timezone | Delay | Database     | Table           | Config                     |
+--------------+------------+----------+-------+--------------+-----------------+----------------------------+
| daily_import | 10 0 * * * | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"karte", ... |
+--------------+------------+----------+-------+--------------+-----------------+----------------------------+

Show the Settings and Schedule History

td connector:show shows the execution settings of a schedule entry.

% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---in:
 type: snowflake
 account_name: treasuredata
 user: Test_user
 password: xxxx
 warehouse: DEMO_WH
 db: TEST_DB
 schema: PUBLIC
 query: |
   SELECT column1, column2, column3
   FROM table_test
   WHERE column4 != 1
 incremental: true
 incremental_columns: [column1, column3]

td connector:history shows the execution history of a schedule entry. To investigate the results of each run, use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2018-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set

Delete the Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_import

Appendix

Modes for Out Plugin

You can specify file import mode in out section of seed.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace

Support for SSL

Connection to Snowflake is made via their official JDBC driver. The JDBC driver forces the usage of SSL as default and mandatory ( i.e. connection with SSL = false will be rejected).

How Data Type Conversion Works

A data type conversion, also called Column type mapping or column_options uses by the Connector to convert data from Source data type (Snowflake) to Destination (type) data type (Treasure Data). The Connector will Get as type (value_type) from Source and then performs conversion to Destination. The table is the default conversion.

Source (Snowflake)

Get as Type (value_type)

Destination (type)

NUMBER (scale > 0)

double

double

DECIMAL (scale > 0)

double

double

NUMERIC (scale > 0)

double

double

NUMERIC (scale = 0)

long

long

INT

long

long

INTEGER

long

long

BIGINT

long

long

SMALLINT

long

long

FLOAT

double

double

FLOAT4

double

double

FLOAT8

double

double

DOUBLE

double

double

DOUBLE PRECISION

double

double

REAL

double

double

VARCHAR

string

string

CHARACTER

string

string

CHAR

string

string

STRING

string

string

TEXT

string

string

BOOLEAN

boolean

boolean

DATE

date

string

TIME

time

string

DATETIME

timestamp

timestamp

TIMESTAMP

timestamp

timestamp

TIMESTAMP_LTZ

timestamp

timestamp

TIMESTAMP_NTZ

timestamp

timestamp

TIMESTAMP_TZ

timestamp

timestamp

VARIANT (doesn't support officially)

string

string

OBJECT (doesn't support officially)

string

string

ARRAY (doesn't support officially)

string

string

BINARY


Not supported

VARBINARY


Not supported

Supported data types for value_type

  • boolean

  • date

  • double

  • decimal

  • json

  • long

  • string

  • time

  • timestamp

Supported data types for type

  • boolean

  • long

  • double

  • string

  • json

  • timestamp

value_type can use to get data from Snowflake data types

value_type

Snowflake Data type

boolean

BOOLEAN, NUMERIC, INT, INTEGER, BIGINT, SMALLINT, FLOAT, DOUBLE, REAL, VARCHAR, CHARACTER, CHAR, STRING, TEXT

date

DATE, CHAR, VARCHAR, STRING, TIMESTAMP, TEXT

double

INT, SMALLINT, INTEGER, BIGINT, REAL, DECIMAL, NUMERIC, BIT, CHAR, VARCHAR, STRING, TEXT

decimal

DECIMAL, INT, SMALLINT, INTEGER, BIGINT, REAL, FLOAT, DOUBLE, BIT, CHAR, VARCHAR, STRING, TEXT

json

VARCHAR, CHAR, STRING, TEXT, VARIANT, OBJECT, ARRAY

long

LONG, INT, SMALLINT, INTEGER, REAL, FLOAT, DOUBLE, DECIMAL, NUMERIC, CHAR, VARCHAR, STRING, TEXT

time

TIME, CHAR, VARCHAR, STRING, TIMESTAMP, TEXT

timestamp

TIMESTAMP, CHAR, VARCHAR, DATE, TIME, STRING, TEXT

string

can use to get most of Snowflake data types

value_type can convert to type

value_type

type

boolean

boolean, double, long, string

date

long, timestamp, string

double

boolean, double, long, string

decimal

boolean, double, long, string

json

json, string

long

boolean, double, long, string

time

long, timestamp, string

timestamp

long, timestamp, string

string

double, long, string, json

Notes:

Get as Type date, time and timestamp and Destination as a string will have additional options to format the value.

  • Timestamp Format: accepts standard Java date time format patterns e.g. yyyy-MM-dd HH:mm:ss Z

  • Timezone: accepts RFC822 timezone format e.g. +0800

Get as Type 'decimal' type is supported but not used by default. If the Source contains a numeric value that doesn't fit the range of the Destination numeric value, use Get as Type decimal and Destination is a string.

How Incremental Loading Works

Incremental loading uses monotonically increasing unique columns (such as AUTO_INCREMENT column) to load records that were inserted (or updated) after the last execution.
First, if incremental: true is set, this connector loads all records with additional ORDER BY. This mode is useful when you want to fetch just the object targets that have changed since the previous scheduled run. For example, if incremental_columns: [updated_at, id] option is set, the query is as follows:

SELECT * FROM (
 ...original query is here...
)
ORDER BY updated_at, id

When bulk data loading finishes successfully, it outputs last_record: parameter as config-diff so that next execution uses it.
At the next execution, when last_record: is also set, this connector generates additional WHERE conditions to load records larger than the last record. For example, if last_record: ["2017-01-01T00:32:12.000000", 5291] is set,

SELECT * FROM (
 ...original query is here...
)
WHERE updated_at > '2017-01-01T00:32:12.000000' OR (updated_at = '2017-01-01T00:32:12.000000' AND id > 5291)
ORDER BY updated_at, id

Then, it updates last_record: so that next execution uses the updated last_record.
IMPORTANT: If you set incremental_columns: option, make sure that there is an index on the columns to avoid full table scan. For this example, the following index should be created:

CREATE INDEX embulk_incremental_loading_index ON table (updated_at, id);

Recommended usage is to leave incremental_columns unset and let the connector automatically finds an AUTO_INCREMENT primary key.

Only Timestamp, Datetime and numerical columns are supported as incremental_columns.
For the raw query, the incremental_columns is required because it won't be able to detect the Primary keys for a complex query.

If incremental: false is set, the data connector fetches all the records of the specified Snowflake Table/View, regardless of when they were last updated. This mode is best combined with writing data into a destination table using the ‘replace’ mode.

  • No labels