# Snowflake Import Integration Snowflake offers a cloud-based data platform that allows customers to unlock the value of their data with near-unlimited scale, concurrency, and performance. Treasure Data's Snowflake integration allows you to import data stored in Snowflake tables into Treasure Data. You can also learn more about [Treasure Data's Export Integration](/int/snowflake-export-integration). ## Prerequisites - Basic knowledge of Treasure Data - An existing account of Snowflake data warehouse with appropriate permissions to query tables/views. ## Use the TD Console to Create Your Connection ### Create a New Connection Go to Integrations Hub > Catalog and search. Select Snowflake. ![](/assets/snowflake.a0943a74401fc7e2c6b0f77d6bafad995d3944a84645373ef985054726db9381.f59f9572.png) ### Create a New Snowflake Connector The following dialog opens. ![](/assets/snowflakenewauth.d6513201a167216ea22999e661dc2549bde18ef90b069847ac43f70dd233a892.f59f9572.png) Select Authentication Method: - **Basic**: Provide the required credentials: **User**, **Password**, and **Account** to authenticate Treasure Data to Snowflake. - **User**: Snowflake login username. - **Password**: Snowflake login password. - **Key Pair**: Provide the **Private Key** and its **Passphrase** if it is the encrypted private key - **Private Key**: Your generated private key. See [configuring-key-pair-authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth.md#configuring-key-pair-authentication) - **Passphrase**: The Private Key passphrase, or leave it empty if the Private Key is unencrypted. - **User**: Snowflake login username. - **Account**: Snowflake provided the account name. See [how to find your account name in Snowflake](https://docs.snowflake.net/manuals/user-guide/connecting.md#your-snowflake-account-name). - **OPTIONS**: JDBC connection options (if any) ![](/assets/snwoflakeimport.50d165348afa1b1bc8fb6f3f6b7136f6cfb9be61e55896dcd7afd6a9bc3641a1.f59f9572.png) Select **Continue**. Specify a name for your data connector. Indicate if you want to share the authentication with others. Other team members can use your authentication to create a connector source if you share. Select **Done**. ### Transfer Snowflake Data to Treasure Data Create a **New Source** on the Authentications page. ![](/assets/snowflake2.fabf01ee8894539e254b5b810f52c7e68d1c736a03d9bfb4db791cd07482b6bd.f59f9572.png) Complete the details. ### Fetch from You must register the information that you want to ingest. The parameters are as follows: - **Drive Version**: Specifies the Snowflake JDBC driver to be used. The options are - 3.13.19 (current version used by the connector by default) - 3.14.3 - 3.21.0 (the latest version) - **Role Name**: Specifies the default access control role to use in the Snowflake. Leave empty to use the default [Role](https://docs.snowflake.net/manuals/user-guide/admin-user-management.md#user-roles). - **Warehouse**: (Required) Specifies the virtual warehouse to use. - **Database**: (Required) Snowflake database. - **Schema**: (Required) Specifies the default schema for the specified database when connected. - **Source Type**: Select **Table/View** or **Query** - **Query**: - **SELECT Query**: the Raw SQL query to run. Allows **SELECT** type query only. INSERT, UPDATE, or modifying data are not allowed. - **Table/View** (default): - **SELECT Columns**: (Required) Accepts comma-separated column names or `*****` to select all columns from Table/View. - **From Table/View**: (Required) Destination table name. - **WHERE Condition**: Specify WHERE condition to filter the rows - **ORDER By Columns**: Expression of ORDER BY to sort rows - **Incremental Loading**: Enables incremental loading. See [How Incremental Loading works](/int/snowflake-import-integration#h2__1652280794). - **Incremental Column(s)**: Column names for incremental loading, which allows **Timestamp** and **Numeric** columns. If not specified, the Primary column (if exists) is used. - **ORDER By Columns:** Not allowed when **Incremental Loading** is selected. - **Invalid Value Handling Mode:** When your table contains invalid data in a row, there are options to **Fail Job, Ignore Invalid Value** or **Ignore Row.** When a **Fail Job** is selected, the current job stops and gives an ERROR status. When **Ignore Invalid Value is**selected, it saves a Null value if the TD table's column isn't a type of string or tries to save the value as a string if the column is a string column. When selecting **Ignore Row**, ignore the row containing the invalid value and keep running. Default if not specified, **Fail Job** is selected. The specified **Warehouse**, **Database**, and **Schema** must already exist. The specified **Role** must have privileges in the specified warehouse, database, and schema. ![](/assets/snowflakesourcetable.7414cccafa1b66eebc93a431f22f4760fa52b480a6708ac920ce51f1c1c092d0.f59f9572.png) #### Example: Configuration ``` Warehouse: DEMO_WH Database: Test_DB Schema: PUBLIC Source Type: Query selected SELECT Query: SELECT column1, column2, column3 FROM table_test WHERE column4 != 1 Incremental Loading: Checked Incremental Column(s): column1 ``` #### Table View ``` Warehouse: DEMO_WH Database: Test_DB Schema: PUBLIC Source Type: Table/View selected SELECT Columns: column1, column2, column3 FROM table_test WHERE column4 != 1 FROM Table/View: table_test WHERE Condition: column4 != 1 ORDER By Columns: Incremental Loading: Checked Incremental Column(s): column1 ``` Select **Next** you’ll see a [preview](https://docs.treasuredata.com/smart/project-product-documentation/about-data-preview) of your data. ### Preview Preview might show dummy data if a query takes a long time to finish, such as when you use a heavy query. For example: querying a large table that contains an **Order By** column but is not indexed. When you use a complex query with multiple conditions, joining tables, sometimes Preview shows "**No records to preview**" but it won't affect the Job result. Read [more information about Previews](https://docs.treasuredata.com/smart/project-product-documentation/about-data-preview). To make changes, such as set data type mapping, select **Advanced Settings** otherwise, select **Next**. ![](/assets/image-20191004-232422.59adea24d3e0809d471fb5ab60962b6fa0715912f16172b0dd83852532caa5fd.f59f9572.png) ### Advanced Settings Advanced Settings allow you to customize the Transfer. Edit the following section if needed. To change the default data type conversion from the **source** (Snowflake column) when importing data to the **destination** (Treasure Data), see the appendix [on how](/int/snowflake-import-integration#h2__1833546230)[data type conversion works](/int/snowflake-import-integration#h2__1833546230). #### COLUMN OPTIONS - **Data Type Mapping**: Enter the Snowflake column name. - **Get as Type**: Get data from the **source** as this specified type. - **Destination Data Type**: The expected **destination** data type. - **Timestamp Format**: Format pattern is used to format the **destination** type when **Get as Type** is a date, time, or timestamp, and **Destination Data Type** is a string. For example: *yyyy-MM-dd'T'HH:mm:ssZ* - **Time zone**: Use with **Timestamp Format**. If not specified, then the format is a GMT value. Supports Zone offset value, such as +0800 or -1130 - **Data Type Mapping**: Enter the Snowflake column name. - **Get as Type**: Get data from the **source** as this specified type. - **Destination Data Type**: The expected **destination** data type. Select **Add**. #### START AFTER (VALUES) - **Start After (values)**: When Incremental Loading is selected, and you specify this value, only data grater than this value is imported. The size and order of this field must be equal to **Incremental Columns**. - **Rows per Batch**: The number of rows to fetch once, with a default of 10000 rows. - **Network Timeout**: Specifies how long to wait for a response when interacting with the Snowflake service before returning an error. - **Query Timeout**: Specifies how long to wait for a query to complete before returning an error. Zero (0) indicates to wait indefinitely. ![](/assets/snowflakein.331fed439f00533ea2c5c9a5508d7b53b9040f2dce4c92f6254e523484d5e5fe.f59f9572.png) ### Transfer to - Choose the Target Database and Table Choose an existing or create a new database and table that you want to import to. - **Method**: Append or Replace. Select whether to **append** records to an existing table or **replace** your existing table. - **Timestam-based Partition Key**: Choose the long or timestamp column as the partitioning time. As default time column, it’s used upload_time with using add_time filter. ![](/assets/snowflakedataplacement.b8fc7a7c8415fb99f9be502538ce6603fff26c2624af5ac2a17374a193e2d37c.f59f9572.png) ## Scheduling You can specify a one-time transfer, or you can schedule an automated recurring transfer. Parameters - **Once now**: Set one-time job. - **Repeat…** - **Schedule**: Options: *@hourly*, *@daily*, *@monthly*, and custom *cron*. - **Delay Transfer**: Add a delay of execution time. - **Scheduling TimeZone**: Supports extended timezone formats like ‘Asia/Tokyo’. ![](/assets/schedule.0da876ce7e1db3c17434bca3b1b4f37ba34cdd0003cd73ed8a40779e0b9c1d08.f59f9572.png) ### Details Provide a name for your Snowflake connector, as a new source. ![](/assets/image-20191004-232506.aa0f7e420ce2eb51d7e1e07498ddf5431cb829d27dc73602923e5671a80c4619.f59f9572.png) Select **Done**. Your data connector is saved as a **Source**. ### Sources On this page, you can edit your existing jobs. You can also view the details of previous transfers using this data connector by selecting the jobs icon. ![](/assets/image-20191004-232517.6ee152dbdb65976bccf553601e3b2da0932482bd12f91cf5e1c43182231b1337.f59f9572.png) ## Use the CLI to Configure the Connector Before setting up the connector, install the ‘td’ command. Install the [Treasure Data Toolbelt](https://docs.treasuredata.com/smart/project-product-documentation/installing-and-updating-td-toolbelt-and-treasure-agent). ### Prepare load.yml file The in: section is where you specify what comes into the connector from Snowflake and the out: section is where you specify what the connector puts out to the database in Treasure Data. Provide your Snowflake account access information as follows: ```yaml in: type: snowflake account_name: treasuredata user: Test_user password: xxxx warehouse: DEMO_WH db: TEST_DB schema: PUBLIC query: | SELECT column1, column2, column3 FROM table_test WHERE column4 != 1 out: mode: append ``` Configuration keys and descriptions are as follows: | Config key | Type | Required | Description | | --- | --- | --- | --- | | type | string | yes | connector type (must be "snowflake") | | driver_version | string | | - 3.13.19 (current version used by the connector by default)​ - 3.21.0 (the latest Snowflake JDBC version)​ - 3.14.3 (The version before Snowflake changed the driver logging behavior that can cause job failure) | | account_name | string | yes | specifies the name of your account (provided by Snowflake). See Snowflake [account name](https://docs.snowflake.net/manuals/user-guide/connecting.md#your-snowflake-account-name). | | warehouse | string | yes | specifies the virtual warehouse to use | | db | string | yes | Snowflake default database to use | | schema | string | yes | specifies the default schema to use for the specified database once connected | | role | string | no | the default access control role to use in the Snowflake session initiated by the driver | | auth_method | string | yes | authentication method. Currently supports: basic, key_pair (default "basic") | | private_key | string | no | required if **auth_method** is **key_pair** | | passphrase | string | no | the passphrase of the private_key | | user | string | yes | Snowflake login name | | password | string | no | required if **auth_method** is **basic**. Password for the specified user | | query | string | no | the SQL query to run | | incremental | boolean | no | if true, enables incremental loading. See [How incremental loading works](/int/snowflake-import-integration#h2__1652280794). | | incremental_columns | strings array | no | column names for incremental loading. If not specified will use Primary keys columns | | invalid_value_option | string | no | options to handle invalid data in a row. Available options are **fail_job**, **ignore_row** and **insert_null** | | last_record | objects array | no | values of the last record for incremental loading | | fetch_rows | integer | no | the number of rows to fetch one time. Default 10000 | | network_timeout | integer | no | how long to wait for a response when interacting with the Snowflake service before returning an error. Default not set. | | query_timeout | integer | | how long to wait for a query to complete before returning an error. Default 0 | | select | string | no | expression of select. Required when 'query' is not set. | | table | string | no | destination table name. Required when 'query' is not set. | | where | string | no | condition to filter the rows | | order_by | string | no | expression of ORDER BY to sort rows | | column_options | key-value pairs | no | option to customize data type from source (Snowflake) to destination (Treasure Data), see [How data type conversion works](/int/snowflake-import-integration#h2__1833546230). | | value_type | string | no | use with **column_options**, attempt to get data from Snowflake JDBC driver using this type. | | type | string | no | destination data type. The value get from **value_type** will be converted to this type | | timestamp_format | string | no | Java language timestamp format pattern. Use to format the destination value when **value_type** is a date, time or timestamp, and **type** is a string | | timezone | string | no | RFC 822 timezone, uses with timestamp_format. E.g. +0300 or -0530 | An example of *load.yml* with incremental and column_options ```yaml in: type: snowflake account_name: treasuredata user: snowflake_user password: xxxxx warehouse: DEMO_WH db: TEST_DB schema: PUBLIC query: | SELECT column1, column2, column3 FROM table_test WHERE column4 != 1 incremental: true incremental_columns: [column1, column3] last_record: [140, 1500.5] column_options: colum5 : {value_type: "timestamp", type: "string", timestamp_format: "yyyy-MMM-dd HH:mm:ss Z", timezone: "+0700"} out: mode: append ``` For more details on available *out* modes, see [modes](/int/snowflake-import-integration#h2_1159725868). To preview the data, issue the preview command ``` $ td connector:preview load.yml +--------------+-----------------+----------------+ | COLUMN1:long | COLUMN2:string | COLUMN3:double | +--------------+-----------------+----------------+ | 100 | "Sample value1" | 1900.1 | | 120 | "Sample value3" | 1700.3 | | 140 | "Sample value5" | 1500.5 | +--------------+-----------------+----------------+ ``` ### Execute Load Job Submit the load job. It may take a couple of hours depending on the size of the data. You must specify the database and table where their data is stored. It is recommended to specify the --time-column option because Treasure Data’s storage is partitioned by time. If the option is not given, the data connector selects the first *long* or *timestamp* column as the partitioning time. The type of the column specified by *--time-column* must be either of *long* or *timestamp* type. If your data doesn’t have a time column you may add it using *add_time* filter option. For more information, see [add_time filter plugin](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function). ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at ``` td connector:issue assumes you have already created a *database(td_sample_db)* and *table(td_sample_table)*. If the database or the table do not exist in TD, td connector:issue will fail. Therefore you must create the database and table manually or use *--auto-create-table* option with *td connector:issue* command to automatically create the database and table. ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table ``` If you have a field called *time*, you don’t have to specify the *--time-column* option. ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table ``` ### Scheduled Execution You can schedule periodic data connector execution for periodic Snowflake import. #### Create the Schedule A new schedule can be created using the *td connector:create* command. The following are required: - the name of the schedule - the cron-style schedule - the database and table where their data will be stored - the Data Connector configuration file ```bash $ td connector:create daily_import "10 0 * * *" td_sample_db td_sample_table load.yml ``` It’s also recommended to specify the *--time-column* option since Treasure Data’s storage is partitioned by time (see also [data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)). ``` $ td connector:create daily_import "10 0 * * *" td_sample_db td_sample_table load.yml --time-column created_at ``` The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`. By default, the schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. The `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles', etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules. ### List the Schedules You can see the list of scheduled entries by running the command *td connector:list*. ``` $ td connector:list ``` ## Appendix ### Support for SSL Connection to Snowflake is made via their [official JDBC driver](https://docs.snowflake.net/manuals/user-guide/jdbc-download.md). The JDBC driver forces the usage of SSL as default and mandatory ( i.e. connection with SSL = false will be rejected). ### Snowflake Authentication For connecting to Snowflake, this tool offers two JDBC authentication options: - **Basic (Username/Password):** This standard login method **will fail if Multi-Factor Authentication (MFA) is active** on the user's Snowflake account. This is due to potential conflicts with the connector's authentication process during job runs. - **Key Pair Authentication:** This secure method uses a cryptographic key pair. It is the recommended approach and works reliably whether MFA is enabled or not. ### How Data Type Conversion Works A data type conversion, also called **Column type mapping** or **column_options** uses by the Connector to convert data from **Source** data type (Snowflake) to **Destination (type)** data type (Treasure Data). The Connector will **Get as type (value_type)** from **Source** and then performs conversion to **Destination.** The table isthe default conversion. | **Source (Snowflake)** | **Get as Type (value_type)** | **Destination (type)** | | --- | --- | --- | | NUMBER (scale > 0) | double | double | | DECIMAL (scale > 0) | double | double | | NUMERIC (scale > 0) | double | double | | NUMERIC (scale = 0) | long | long | | INT | long | long | | INTEGER | long | long | | BIGINT | long | long | | SMALLINT | long | long | | FLOAT | double | double | | FLOAT4 | double | double | | FLOAT8 | double | double | | DOUBLE | double | double | | DOUBLE PRECISION | double | double | | REAL | double | double | | VARCHAR | string | string | | CHARACTER | string | string | | CHAR | string | string | | STRING | string | string | | TEXT | string | string | | BOOLEAN | boolean | boolean | | DATE | date | string | | TIME | time | string | | DATETIME | timestamp | timestamp | | TIMESTAMP | timestamp | timestamp | | TIMESTAMP_LTZ | timestamp | timestamp | | TIMESTAMP_NTZ | timestamp | timestamp | | TIMESTAMP_TZ | timestamp | timestamp | | VARIANT (doesn't support officially) | string | string | | OBJECT (doesn't support officially) | string | string | | ARRAY (doesn't support officially) | string | string | | BINARY | | Not supported | | VARBINARY | | Not supported | Supported data types for **value_type** - boolean - date - double - decimal - json - long - string - time - timestamp Supported data types for **type** - boolean - long - double - string - json - timestamp **value_type** can use to get data from Snowflake data types | **value_type** | **Snowflake Data type** | | --- | --- | | boolean | BOOLEAN, NUMERIC, INT, INTEGER, BIGINT, SMALLINT, FLOAT, DOUBLE, REAL, VARCHAR, CHARACTER, CHAR, STRING, TEXT | | date | DATE, CHAR, VARCHAR, STRING, TIMESTAMP, TEXT | | double | INT, SMALLINT, INTEGER, BIGINT, REAL, DECIMAL, NUMERIC, BIT, CHAR, VARCHAR, STRING, TEXT | | decimal | DECIMAL, INT, SMALLINT, INTEGER, BIGINT, REAL, FLOAT, DOUBLE, BIT, CHAR, VARCHAR, STRING, TEXT | | json | VARCHAR, CHAR, STRING, TEXT, VARIANT, OBJECT, ARRAY | | long | LONG, INT, SMALLINT, INTEGER, REAL, FLOAT, DOUBLE, DECIMAL, NUMERIC, CHAR, VARCHAR, STRING, TEXT | | time | TIME, CHAR, VARCHAR, STRING, TIMESTAMP, TEXT | | timestamp | TIMESTAMP, CHAR, VARCHAR, DATE, TIME, STRING, TEXT | | string | can use to get most of Snowflake data types | **value_type** can convert to **type** | **value_type** | **type** | | --- | --- | | boolean | boolean, double, long, string | | date | long, timestamp, string | | double | boolean, double, long, string | | decimal | boolean, double, long, string | | json | json, string | | long | boolean, double, long, string | | time | long, timestamp, string | | timestamp | long, timestamp, string | | string | double, long, string, json | ***Notes:*** **Get as Type** date, time and timestamp and **Destination** as a string will have additional options to format the value. - Timestamp Format: accepts standard Java date time format patterns e.g. *yyyy-MM-dd HH:mm:ss Z* - Timezone: accepts RFC822 timezone format e.g. *+0800* **Get as Type** 'decimal' type is supported but not used by default. If the **Source** contains a numeric value that doesn't fit the range of the **Destination** numeric value, use **Get as Type** decimal and **Destination** is a string. ### How Incremental Loading Works Incremental loading uses monotonically increasing unique columns (such as AUTO_INCREMENT column) to load records that were inserted (or updated) after the last execution. First, if incremental`: true` is set, this connector loads all records with additional ORDER BY. This mode is useful when you want to fetch just the object targets that have changed since the previous scheduled run. For example, if **incremental_columns**: [updated_at, id] option is set, the query is as follows: ``` SELECT * FROM ( ...original query is here... ) ORDER BY updated_at, id ``` When bulk data loading finishes successfully, it outputs last_record: parameter as config-diff so that next execution uses it. At the next execution, when **last_record**: is also set, this connector generates additional WHERE conditions to load records larger than the last record. For example, if **last_record**: ["2017-01-01T00:32:12.000000", 5291] is set, ``` SELECT * FROM ( ...original query is here... ) WHERE updated_at > '2017-01-01T00:32:12.000000' OR (updated_at = '2017-01-01T00:32:12.000000' AND id > 5291) ORDER BY updated_at, id ``` Then, it updates **last_record**: so that next execution uses the updated **last_record**. IMPORTANT: If you set **incremental_columns**: option, make sure that there is an index on the columns to avoid full table scan. For this example, the following index should be created: ``` CREATE INDEX embulk_incremental_loading_index ON table (updated_at, id); ``` Recommended usage is to leave **incremental_columns** unset and let the connector automatically finds an AUTO_INCREMENT primary key. Only Timestamp, Datetime and numerical columns are supported as incremental_columns. For the raw query, the incremental_columns is required because it won't be able to detect the Primary keys for a complex query. `If incremental: false` is set, the data connector fetches all the records of the specified Snowflake Table/View, regardless of when they were last updated. This mode is best combined with writing data into a destination table using the ‘replace’ mode.