Skip to content
Last updated

Snowflake Import Integration

Snowflake offers a cloud-based data platform that allows customers to unlock the value of their data with near-unlimited scale, concurrency, and performance. Treasure Data's Snowflake integration allows you to import data stored in Snowflake tables into Treasure Data. You can also learn more about Treasure Data's Export Integration.

Prerequisites

  • Basic knowledge of Treasure Data
  • An existing account of Snowflake data warehouse with appropriate permissions to query tables/views.

Use the TD Console to Create Your Connection

Create a New Connection

Go to Integrations Hub > Catalog and search. Select Snowflake.

Create a New Snowflake Connector

The following dialog opens.

Select Authentication Method:

  • Basic: Provide the required credentials: User, Password, and Account to authenticate Treasure Data to Snowflake.
    • User: Snowflake login username.
    • Password: Snowflake login password.
  • Key Pair: Provide the Private Key and its Passphrase if it is the encrypted private key
    • Private Key: Your generated private key. See configuring-key-pair-authentication
    • Passphrase: The Private Key passphrase, or leave it empty if the Private Key is unencrypted.
    • User: Snowflake login username.
  • Account: Snowflake provided the account name. See how to find your account name in Snowflake.
  • OPTIONS: JDBC connection options (if any)

Select Continue.

Specify a name for your data connector. Indicate if you want to share the authentication with others. Other team members can use your authentication to create a connector source if you share. Select Done.

Transfer Snowflake Data to Treasure Data

Create a New Source on the Authentications page.

Complete the details.

Fetch from

You must register the information that you want to ingest. The parameters are as follows:

  • Drive Version: Specifies the Snowflake JDBC driver to be used. The options are

    • 3.13.19 (current version used by the connector by default)
    • 3.14.3
    • 3.21.0 (the latest version)
  • Role Name: Specifies the default access control role to use in the Snowflake. Leave empty to use the default Role.

  • Warehouse: (Required) Specifies the virtual warehouse to use.

  • Database: (Required) Snowflake database.

  • Schema: (Required) Specifies the default schema for the specified database when connected.

  • Source Type: Select Table/View or Query

    • Query:
      • SELECT Query: the Raw SQL query to run. Allows SELECT type query only. INSERT, UPDATE, or modifying data are not allowed.
    • Table/View (default):
      • SELECT Columns: (Required) Accepts comma-separated column names or ***** to select all columns from Table/View.
      • From Table/View: (Required) Destination table name.
      • WHERE Condition: Specify WHERE condition to filter the rows
      • ORDER By Columns: Expression of ORDER BY to sort rows
  • Incremental Loading: Enables incremental loading. See How Incremental Loading works.

    • Incremental Column(s): Column names for incremental loading, which allows Timestamp and Numeric columns. If not specified, the Primary column (if exists) is used.
    • ORDER By Columns: Not allowed when Incremental Loading is selected.
  • Invalid Value Handling Mode: When your table contains invalid data in a row, there are options to Fail Job, Ignore Invalid Value or Ignore Row. When a Fail Job is selected, the current job stops and gives an ERROR status. When Ignore Invalid Value isselected, it saves a Null value if the TD table's column isn't a type of string or tries to save the value as a string if the column is a string column. When selecting Ignore Row, ignore the row containing the invalid value and keep running. Default if not specified, Fail Job is selected.

The specified Warehouse, Database, and Schema must already exist. The specified Role must have privileges in the specified warehouse, database, and schema.

Example: Configuration

Warehouse: DEMO_WH
Database: Test_DB
Schema: PUBLIC
Source Type: Query selected
SELECT Query: SELECT column1, column2, column3
              FROM table_test
              WHERE column4 != 1
Incremental Loading: Checked
Incremental Column(s): column1

Table View

Warehouse: DEMO_WH
Database: Test_DB
Schema: PUBLIC
Source Type: Table/View selected
SELECT Columns: column1, column2, column3 FROM table_test WHERE column4 != 1
FROM Table/View: table_test
WHERE Condition: column4 != 1
ORDER By Columns:
Incremental Loading: Checked
Incremental Column(s): column1

Select Next you’ll see a preview of your data.

Preview

Preview might show dummy data if a query takes a long time to finish, such as when you use a heavy query. For example: querying a large table that contains an Order By column but is not indexed.

When you use a complex query with multiple conditions, joining tables, sometimes Preview shows "No records to preview" but it won't affect the Job result. Read more information about Previews.

To make changes, such as set data type mapping, select Advanced Settings otherwise, select Next.

Advanced Settings

Advanced Settings allow you to customize the Transfer. Edit the following section if needed.

To change the default data type conversion from the source (Snowflake column) when importing data to the destination (Treasure Data), see the appendix on howdata type conversion works.

COLUMN OPTIONS

  • Data Type Mapping: Enter the Snowflake column name.
  • Get as Type: Get data from the source as this specified type.
  • Destination Data Type: The expected destination data type.
  • Timestamp Format: Format pattern is used to format the destination type when Get as Type is a date, time, or timestamp, and Destination Data Type is a string. For example: yyyy-MM-dd'T'HH:mm:ssZ
  • Time zone: Use with Timestamp Format. If not specified, then the format is a GMT value. Supports Zone offset value, such as +0800 or -1130
  • Data Type Mapping: Enter the Snowflake column name.
  • Get as Type: Get data from the source as this specified type.
  • Destination Data Type: The expected destination data type.

Select Add.

START AFTER (VALUES)

  • Start After (values): When Incremental Loading is selected, and you specify this value, only data grater than this value is imported. The size and order of this field must be equal to Incremental Columns.
  • Rows per Batch: The number of rows to fetch once, with a default of 10000 rows.
  • Network Timeout: Specifies how long to wait for a response when interacting with the Snowflake service before returning an error.
  • Query Timeout: Specifies how long to wait for a query to complete before returning an error. Zero (0) indicates to wait indefinitely.

Transfer to - Choose the Target Database and Table

Choose an existing or create a new database and table that you want to import to.

  • Method: Append or Replace. Select whether to append records to an existing table or replace your existing table.
  • Timestam-based Partition Key: Choose the long or timestamp column as the partitioning time. As default time column, it’s used upload_time with using add_time filter.

Scheduling

You can specify a one-time transfer, or you can schedule an automated recurring transfer.

Parameters

  • Once now: Set one-time job.

  • Repeat…

    • Schedule: Options: @hourly, @daily, @monthly, and custom cron.
    • Delay Transfer: Add a delay of execution time.
    • Scheduling TimeZone: Supports extended timezone formats like ‘Asia/Tokyo’.

Details

Provide a name for your Snowflake connector, as a new source.

Select Done. Your data connector is saved as a Source.

Sources

On this page, you can edit your existing jobs. You can also view the details of previous transfers using this data connector by selecting the jobs icon.

Use the CLI to Configure the Connector

Before setting up the connector, install the ‘td’ command. Install the Treasure Data Toolbelt.

Prepare load.yml file

The in: section is where you specify what comes into the connector from Snowflake and the out: section is where you specify what the connector puts out to the database in Treasure Data.

Provide your Snowflake account access information as follows:

in:
  type: snowflake
  account_name: treasuredata
  user: Test_user
  password: xxxx
  warehouse: DEMO_WH
  db: TEST_DB
  schema: PUBLIC
  query: |
    SELECT column1, column2, column3
    FROM table_test
    WHERE column4 != 1
out:
  mode: append

Configuration keys and descriptions are as follows:

Config keyTypeRequiredDescription
typestringyesconnector type (must be "snowflake")
driver_versionstring
  • 3.13.19 (current version used by the connector by default)​
  • 3.21.0 (the latest Snowflake JDBC version)​
  • 3.14.3 (The version before Snowflake changed the driver logging behavior that can cause job failure)
account_namestringyesspecifies the name of your account (provided by Snowflake). See Snowflake account name.
warehousestringyesspecifies the virtual warehouse to use
dbstringyesSnowflake default database to use
schemastringyesspecifies the default schema to use for the specified database once connected
rolestringnothe default access control role to use in the Snowflake session initiated by the driver
auth_methodstringyesauthentication method. Currently supports: basic, key_pair (default "basic")
private_keystringnorequired if auth_method is key_pair
passphrasestringnothe passphrase of the private_key
userstringyesSnowflake login name
passwordstringnorequired if auth_method is basic. Password for the specified user
querystringnothe SQL query to run
incrementalbooleannoif true, enables incremental loading. See How incremental loading works.
incremental_columnsstrings arraynocolumn names for incremental loading. If not specified will use Primary keys columns
invalid_value_optionstringnooptions to handle invalid data in a row. Available options are fail_job, ignore_row and insert_null
last_recordobjects arraynovalues of the last record for incremental loading
fetch_rowsintegernothe number of rows to fetch one time. Default 10000
network_timeoutintegernohow long to wait for a response when interacting with the Snowflake service before returning an error. Default not set.
query_timeoutintegerhow long to wait for a query to complete before returning an error. Default 0
selectstringnoexpression of select. Required when 'query' is not set.
tablestringnodestination table name. Required when 'query' is not set.
wherestringnocondition to filter the rows
order_bystringnoexpression of ORDER BY to sort rows
column_optionskey-value pairsnooption to customize data type from source (Snowflake) to destination (Treasure Data), see How data type conversion works.
value_typestringnouse with column_options, attempt to get data from Snowflake JDBC driver using this type.
typestringnodestination data type. The value get from value_type will be converted to this type
timestamp_formatstringnoJava language timestamp format pattern. Use to format the destination value when value_type is a date, time or timestamp, and type is a string
timezonestringnoRFC 822 timezone, uses with timestamp_format. E.g. +0300 or -0530

An example of load.yml with incremental and column_options

in:
  type: snowflake
  account_name: treasuredata
  user: snowflake_user
  password: xxxxx
  warehouse: DEMO_WH
  db: TEST_DB
  schema: PUBLIC
  query: |
    SELECT column1, column2, column3
    FROM table_test
    WHERE column4 != 1
  incremental: true
  incremental_columns: [column1, column3]
  last_record: [140, 1500.5]
  column_options:
    colum5 : {value_type: "timestamp", type: "string", timestamp_format: "yyyy-MMM-dd HH:mm:ss Z", timezone: "+0700"}
out:
  mode: append

For more details on available out modes, see modes.

To preview the data, issue the preview command

$ td connector:preview load.yml
+--------------+-----------------+----------------+
| COLUMN1:long | COLUMN2:string  | COLUMN3:double |
+--------------+-----------------+----------------+
| 100          | "Sample value1" | 1900.1         |
| 120          | "Sample value3" | 1700.3         |
| 140          | "Sample value5" | 1500.5         |
+--------------+-----------------+----------------+

Execute Load Job

Submit the load job. It may take a couple of hours depending on the size of the data. You must specify the database and table where their data is stored.

It is recommended to specify the --time-column option because Treasure Data’s storage is partitioned by time. If the option is not given, the data connector selects the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long or timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. For more information, see add_time filter plugin.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table 
  --time-column created_at

td connector:issue assumes you have already created a database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD, td connector:issue will fail. Therefore you must create the database and table manually or use --auto-create-table option with td connector:issue command to automatically create the database and table.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Scheduled Execution

You can schedule periodic data connector execution for periodic Snowflake import.

Create the Schedule

A new schedule can be created using the td connector:create command. The following are required:

  • the name of the schedule
  • the cron-style schedule
  • the database and table where their data will be stored
  • the Data Connector configuration file
$ td connector:create 
    daily_import 
    "10 0 * * *" 
    td_sample_db 
    td_sample_table 
    load.yml

It’s also recommended to specify the --time-column option since Treasure Data’s storage is partitioned by time (see also data partitioning).

$ td connector:create 
    daily_import 
    "10 0 * * *" 
    td_sample_db 
    td_sample_table 
    load.yml 
    --time-column created_at

The cron parameter also accepts three special options: @hourly, @daily and @monthly.

By default, the schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. The --timezone option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles', etc. Timezone abbreviations like PST, CST are not supported and may lead to unexpected schedules.

List the Schedules

You can see the list of scheduled entries by running the command td connector:list.

$ td connector:list

Appendix

Support for SSL

Connection to Snowflake is made via their official JDBC driver. The JDBC driver forces the usage of SSL as default and mandatory ( i.e. connection with SSL = false will be rejected).

Snowflake Authentication

For connecting to Snowflake, this tool offers two JDBC authentication options:

  • Basic (Username/Password): This standard login method will fail if Multi-Factor Authentication (MFA) is active on the user's Snowflake account. This is due to potential conflicts with the connector's authentication process during job runs.
  • Key Pair Authentication: This secure method uses a cryptographic key pair. It is the recommended approach and works reliably whether MFA is enabled or not.

How Data Type Conversion Works

A data type conversion, also called Column type mapping or column_options uses by the Connector to convert data from Source data type (Snowflake) to Destination (type) data type (Treasure Data). The Connector will Get as type (value_type) from Source and then performs conversion to Destination. The table isthe default conversion.

Source (Snowflake)Get as Type (value_type)Destination (type)
NUMBER (scale > 0)doubledouble
DECIMAL (scale > 0)doubledouble
NUMERIC (scale > 0)doubledouble
NUMERIC (scale = 0)longlong
INTlonglong
INTEGERlonglong
BIGINTlonglong
SMALLINTlonglong
FLOATdoubledouble
FLOAT4doubledouble
FLOAT8doubledouble
DOUBLEdoubledouble
DOUBLE PRECISIONdoubledouble
REALdoubledouble
VARCHARstringstring
CHARACTERstringstring
CHARstringstring
STRINGstringstring
TEXTstringstring
BOOLEANbooleanboolean
DATEdatestring
TIMEtimestring
DATETIMEtimestamptimestamp
TIMESTAMPtimestamptimestamp
TIMESTAMP_LTZtimestamptimestamp
TIMESTAMP_NTZtimestamptimestamp
TIMESTAMP_TZtimestamptimestamp
VARIANT (doesn't support officially)stringstring
OBJECT (doesn't support officially)stringstring
ARRAY (doesn't support officially)stringstring
BINARYNot supported
VARBINARYNot supported

Supported data types for value_type

  • boolean
  • date
  • double
  • decimal
  • json
  • long
  • string
  • time
  • timestamp

Supported data types for type

  • boolean
  • long
  • double
  • string
  • json
  • timestamp

value_type can use to get data from Snowflake data types

value_typeSnowflake Data type
booleanBOOLEAN, NUMERIC, INT, INTEGER, BIGINT, SMALLINT, FLOAT, DOUBLE, REAL, VARCHAR, CHARACTER, CHAR, STRING, TEXT
dateDATE, CHAR, VARCHAR, STRING, TIMESTAMP, TEXT
doubleINT, SMALLINT, INTEGER, BIGINT, REAL, DECIMAL, NUMERIC, BIT, CHAR, VARCHAR, STRING, TEXT
decimalDECIMAL, INT, SMALLINT, INTEGER, BIGINT, REAL, FLOAT, DOUBLE, BIT, CHAR, VARCHAR, STRING, TEXT
jsonVARCHAR, CHAR, STRING, TEXT, VARIANT, OBJECT, ARRAY
longLONG, INT, SMALLINT, INTEGER, REAL, FLOAT, DOUBLE, DECIMAL, NUMERIC, CHAR, VARCHAR, STRING, TEXT
timeTIME, CHAR, VARCHAR, STRING, TIMESTAMP, TEXT
timestampTIMESTAMP, CHAR, VARCHAR, DATE, TIME, STRING, TEXT
stringcan use to get most of Snowflake data types

value_type can convert to type

value_typetype
booleanboolean, double, long, string
datelong, timestamp, string
doubleboolean, double, long, string
decimalboolean, double, long, string
jsonjson, string
longboolean, double, long, string
timelong, timestamp, string
timestamplong, timestamp, string
stringdouble, long, string, json

Notes:

Get as Type date, time and timestamp and Destination as a string will have additional options to format the value.

  • Timestamp Format: accepts standard Java date time format patterns e.g. yyyy-MM-dd HH:mm:ss Z
  • Timezone: accepts RFC822 timezone format e.g. +0800

Get as Type 'decimal' type is supported but not used by default. If the Source contains a numeric value that doesn't fit the range of the Destination numeric value, use Get as Type decimal and Destination is a string.

How Incremental Loading Works

Incremental loading uses monotonically increasing unique columns (such as AUTO_INCREMENT column) to load records that were inserted (or updated) after the last execution. First, if incremental: true is set, this connector loads all records with additional ORDER BY. This mode is useful when you want to fetch just the object targets that have changed since the previous scheduled run. For example, if incremental_columns: [updated_at, id] option is set, the query is as follows:

SELECT * FROM (
 ...original query is here...
)
ORDER BY updated_at, id

When bulk data loading finishes successfully, it outputs last_record: parameter as config-diff so that next execution uses it. At the next execution, when last_record: is also set, this connector generates additional WHERE conditions to load records larger than the last record. For example, if last_record: ["2017-01-01T00:32:12.000000", 5291] is set,

SELECT * FROM (
 ...original query is here...
)
WHERE updated_at > '2017-01-01T00:32:12.000000' OR (updated_at = '2017-01-01T00:32:12.000000' AND id > 5291)
ORDER BY updated_at, id

Then, it updates last_record: so that next execution uses the updated last_record. IMPORTANT: If you set incremental_columns: option, make sure that there is an index on the columns to avoid full table scan. For this example, the following index should be created:

CREATE INDEX embulk_incremental_loading_index ON table (updated_at, id);

Recommended usage is to leave incremental_columns unset and let the connector automatically finds an AUTO_INCREMENT primary key.

Only Timestamp, Datetime and numerical columns are supported as incremental_columns. For the raw query, the incremental_columns is required because it won't be able to detect the Primary keys for a complex query.

If incremental: false is set, the data connector fetches all the records of the specified Snowflake Table/View, regardless of when they were last updated. This mode is best combined with writing data into a destination table using the ‘replace’ mode.