Skip to content
Last updated

Databricks Import Integration

This data connector enables you to import data from Databricks into Treasure Data.

Prerequisites

  • Basic knowledge of Treasure Data and access to a Treasure Data account
  • Basic knowledge of Databricks and access to a Databricks account
  • Access to a Databricks server

Requirements and Limitations

  • If the Databricks server is in hibernation mode, it can take a few minutes to start, which may cause the connector to timeout.

Import from Databricks via TD Console

Obtaining the Host Name and HTTP Path for the Databricks Connector

  1. Log in to the Databricks website.

  2. Navigate to the SQL tab and then select SQL Warehouses.

  3. Select an instance of an SQL Warehouse.

  4. Select the Connection details tab.

  5. Note the Server hostname and HTTP path for the SQL Warehouse.

Create Authentication

Your first step is to create a new authentication with a set of credentials.

  1. Select Integrations Hub > Catalog.

  1. Search for databricks in the Catalog.
  2. Hover your mouse over the icon and select Create Authentication.

  1. Ensure that the Credentials tab is selected and then enter credential information for the integration.

ParameterDescription
HostnameThis is the hostname of Databricks instance,
HTTP PathThis is the HTTP Path of Databricks instance.
Authentication MethodThis is the authentication method: it can be be Basic or Personal Access Token.
UsernameThis is the username used to log in. It is only valid when the Authentication Method is Basic.
PasswordThis is the password used to log in. It is only valid when the Authentication Method is Basic.
TokenThis is the Databricks personal access token. It is only valid when the Authentication Method is Personal Access Token.
  1. Select Continue.
  2. Enter a name for your authentication, and select Done.

Create a Source

  1. Open TD Console.
  2. Navigate to Integrations Hub > Authentications.
  3. Locate your new authentication and select New Source.

Create a Connection

  1. Type a source name in the Data Transfer Name field.
  2. Select Next.
ParameterDescription
Data Transfer NameThis is the name for you data transfer source.
AuthenticationThis is the name of the authentication to be used.

The Create Source page displays with the Source Table tab selected.

Identify a Source Table

ParameterDescription
CatalogThis is the dataset catalog.
SchemaThis is the dataset schema.
Use custom SELECT query?Select the checkbox to build a custom query, or fill in the fields to make a query.
Skip validate query?Skip query syntax validation
Select ColumnsDefine columns to select from the dataset. This is required only if screen-shotUse custom SELECT queryscreen-shot is not selected.
TableThis is the table to select. Required only if screen-shotUse custom SELECT queryscreen-shot is not selected.
Filter ConditionsIf you need additional specificity on the data retrieved from the table, you can specify it as part of WHERE clause.
Order bySpecify if you need the records ordered by a particular field. Incremental columns should be included in order by conditions.
QueryThis field contains your SQL query. This is required only if screen-shotUse custom SELECT queryscreen-shot is selected.
Incremental LoadingSelect the checkbox to enable incremental loading
Incremental ColumnsDefine incremental columns; these columns should be in the order by filter.
Start after valueThis is the value for the columns used in incremental loading.
Invalid Value Handling ModeSpecify how to handle invalid values. You can select - Fail Job - Ignore Invalid Value - Ignore Row

Select Next.

Creating a Custom Query

  • Simple query:
query: SELECT * FROM simple_data WHERE cut = 'Very Good' ORDER BY `c_custkey`
  • Query with incremental and last_records

The value of the incremental\_columns should follow the position of parameters and have the same name as the column.

query: SELECT * FROM simple_data WHERE myid > :myid AND count > :count ORDER BY `myid`
incremental: true
incremental_columns: ['myid', 'count']
last_record: ['4', '5']

The query must have a WHERE condition with values filled in for last\_record.

Order by is not required but should be specified to get more precise result

An actual query where screen-shotmyidscreen-shot is greater than 4 and screen-shotcountscreen-shot is greater than 5, would look something like this:

SELECT * FROM simple_data WHERE myid > 4 AND count > 5 ORDER BY `myid`

Define Data Settings

ParameterDescription
COLUMN OPTIONSDefine data type for specific columns.
DEFAULT COLUMN_OPTIONSDefine conversion datatype for a specific datatype. For example:  always convert a long to a string

Select Next.

Preview Your Data

Data preview is optional, and you can safely click Next to go to the next page of the dialog if you would like.

  1. Display a preview of your data before running the import by selecting Generate Preview.

    The data shown in the data preview is approximated from your source. It is not the actual data that is imported.

  2. Verify that the data looks approximately like you expect it to.

  3. Select Next.

Data Placement

For data placement, select the target database and table where you want your data placed and indicate how often the import should run.

  1. Select Next. Under Storage, you will create a new or select an existing database and create a new or select an existing table for where you want to place the imported data.

  2. Select a Database > Select an existing or Create New Database.

  3. Optionally, type a database name.

  4. Select a TableSelect an existing or Create New Table.

  5. Optionally, type a table name.

  6. Choose the method for importing the data.

    • Append (default)-Data import results are appended to the table. If the table does not exist, it will be created.
    • Always Replace-Replaces the entire content of an existing table with the result output of the query. If the table does not exist, a new table is created.
    • Replace on New Data-Only replace the entire content of an existing table with the result output when there is new data.
  7. Select the Timestamp-based Partition Key column. If you want to set a different partition key seed than the default key, you can specify the long or timestamp column as the partitioning time. As a default time column, it uses upload_time with the add_time filter.

  8. Select the Timezone for your data storage.

  9. Under Schedule, you can choose when and how often you want to run this query.

Run once

  1. Select Off.
  2. Select Scheduling Timezone.
  3. Select Create & Run Now.

Repeat Regularly

  1. Select On.
  2. Select the Schedule. The UI provides these four options: @hourly@daily and @monthly or custom cron.
  3. You can also select Delay Transfer and add a delay of execution time.
  4. Select Scheduling Timezone.
  5. Select Create & Run Now.

After your transfer has run, you can see the results of your transfer in Data Workbench > Databases.

Import from Databricks via Workflow

You can import data from Databricks by using the  screen-shottd_load>:screen-shot  operator for workflow. If you have already created a SOURCE, you can run it; if you don't want to create a SOURCE, you can import it using a screen-shot.ymlscreen-shot file.

Using a Source

  1. Open TD Console.
  2. Navigate to Integrations Hub > Sources.
  3. Identify your source.
  4. Select the more icon (...) on the far right of your source and then select Copy Unique ID.

The Unique ID will look something like this: screen-shotdatabricks_import_1234••••••screen-shot

  1. Define a workflow task using the screen-shottd_load>:screen-shot operator.
+load:
  td_load>: databricks_import_1234••••••
  database: ${td.dest_db}
  Table: ${td.dest_table}
  1. Run a workflow.

Using a yml file

  1. Identify your screen-shot.ymlscreen-shot file.

If you need to create the screen-shot.ymlscreen-shot file, review Amazon S3 Import Integration Using CLI for reference.

  1. Define a workflow task using the screen-shottd_load>:screen-shot operator.
+load:
  td_load>: config/daily_load.yml
  database: ${td.dest_db}
  table: ${td.dest_table}
  1. Run a workflow

Parameters Reference

NameDescriptionValueDefault ValueRequired
typedatabricksdatabricksTrue
auth_methodThis is the authentication method.basic or access_tokenbasic
host_nameThis is the host name of the Databricks instance.True
http_pathThis is the HTTP path of the Databricks instance.True
user_nameThis is the username for login.True, if auth_method is basic.
passwordThis is the password for login.True, if auth_method is basic.
access_tokenThis is the access_token to be used to fetch data.True, if auth_method is access_token.
catalogThis is the dataset catalog.True
schemaThis is the dataset schema.True
use_custom_queryYou can build a custom query or fill in the fields to create a custom query.true/falsetrue
selectDefine columns to select from dataset. This is Required only if screen-shotUse custom SELECT queryscreen-shotis false.*True, if use_custom_query is false.
tableThis is the table to select. This is Required only if screen-shotUse custom SELECT queryscreen-shot is false.True, if use_custom_query is false.
whereIf you need additional specificity on the data retrieved from the table, you can specify it as part of a WHERE clause.
order_bySpecify if you need the records ordered by a particular field. The incremental columns should be included in order by conditions.
queryBuild your own query.True, if use_custom_query is true.
incrementalEnable incremental loading.true/falsefalse
incremental_columnsDefine data type for specific columns.
last_recordDefine the last record value for incremental loading.
default_column_optionsDefine conversion datatype for a specific datatype.

Sample Workflow Code

Visit Treasure Boxes for sample workflow code.

Import from Databricks via CLI (Toolbelt)

Before setting up the connector, install the most current TD Toolbelt.

Create Seed Configuration File (seed.yml)

in:
  type: databricks
  auth_method: BASIC
  host_name: "dbc-8297d0a7-2709.cloud.databricks.com"
  http_path: "/sql/1.0/warehouses/1b195ce3a8720eb9"
  username: ***
  password: ***
  access_token: ***
  catalog: hive_metastore
  schema: default
  select: "*"
  table: diamonds
  incremental_columns: [ price, x, y, z ]
  order_by: "`price`, `x`, `y`, `z`"
  incremental: true
  last_record: ['4']
  default_column_options: {"double": {"type": "string"}}
out:
  mode: append

Parameters Reference

NameDescriptionValueDefault ValueRequired
typedatabricksdatabricksTrue
auth_methodThis is the authentication method.basic/access_tokenbasic
host_nameThis is the host name of the Databricks instance.True
http_pathThis is the HTTP path of the Databricks instance.True
user_nameThis is the username to log in.True, if auth_method is basic.
passwordThis is the password to log in.True, if auth_method is basic.
access_tokenThis is the access_token to fetch data.True, if auth_method is access_token.
catalogThis is the dataset catalog.True
schemaThis is the the dataset schema.True
use_custom_queryBuild a custom query or fill in the fields to make a query .true/falsetrue
skip_validate_querySkip query validation. If we skip query validation, preview mode always run with dummy datatrue/falsefalse
selectDefine columns to select from the dataset. Required only ifscreen-shotUse custom SELECT queryscreen-shotis false.*True, if use_custom_query is false.
tableThis is the table to select. Required only if screen-shotUse custom SELECT queryscreen-shotis false.True, if use_custom_query is false.
whereIf you need additional specificity on the data retrieved from the table, you can specify it as part of a WHERE clause.
order_bySpecify if you need the records ordered by a particular field. The incremental columns should be included in order by conditions.
queryBuild your own query.True, if use_custom_query is true.
incrementalEnable incremental loading.true/falsefalse
incremental_columnsDefine the data type for specific columns.
last_recordDefine the last record value for incremental loading.
default_column_optionsDefine convert datatype for a specific datatype.

Generate load.yml

The screen-shotconnector:guessscreen-shot command automatically reads the source files and uses logic to guess the file format, the fields, and columns.

$ td connector:guess seed.yml -o load.yml

You can open the screen-shotload.ymlscreen-shot to review the definitions including file formats, encodings, column names, and types.

  • Example
in:
  type: databricks
  auth_method: BASIC
  host_name: "dbc-8297d0a7-2709.cloud.databricks.com"
  http_path: "/sql/1.0/warehouses/1b195ce3a8720eb9"
  username: ***
  password: ***
  access_token: ***
  catalog: hive_metastore
  schema: default
  select: "*"
  table: diamonds
  incremental_columns: [ price, x, y, z ]
  order_by: "`price`, `x`, `y`, `z`"
  query: SELECT * FROM simple_data WHERE myid > :myid
  incremental: true
  last_record: ['4']  
  default_column_options: {"double": {"type": "string"}}
out:
  mode: append

To preview the data, use the screen-shottd connector:previewscreen-shot command.

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

The guess command requires more than 3 rows and 2 columns in the source data file because the command assesses the column definition using sample rows from the source data.

If the system detects your column name or column type unexpectedly, modify the screen-shotload.ymlscreen-shot file and preview again.

Execute Load Job

Executing a job might take a couple of hours depending on the size of the data. Be sure to specify the Treasure Data database and table where the data should be stored.

Treasure Data also recommends specifying the screen-shot*--time-columnscreen-shot option because Treasure Data’s storage is partitioned by time (see data partitioning). If this option is not provided, the data connector chooses the first long or timestamp column as the partitioning time. The type of the column specified by screen-shot--time-column*screen-shot must be either of type long or timestamp.

If your data doesn’t have a time column, you can add a time column by using the add_time filter option. For more details see add_time filter plugin.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
  --time-column created_at

This example command for screen-shotconnector:issuescreen-shot assumes that you have already created a screen-shotdatabase(td_sample_db)screen-shotand a screen-shottable(td_sample_table)screen-shot. If the database or the table does not exist in TD, this command fails. Create the database and table manually, or use the  screen-shot--auto-create-tablescreen-shot option with the  screen-shottd connector:issuescreen-shot command to auto-create the database and table.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table
 --time-column created_at --auto-create-table

The data connector does not sort records on the server side. To use time-based partitioning effectively, sort the records in your files beforehand.

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Import Modes

You can specify the file import mode in the out section of the screen-shotload.ymlscreen-shot file. The screen-shotout:screen-shot section controls how data is imported into a Treasure Data table. For example, you may choose to append data or replace data in an existing table in Treasure Data.

ModeDescriptionExamples
AppendRecords are appended to the target table.in:   ... out:   mode: append
Always ReplaceThis replaces data in the target table. Any manual schema changes made to the target table remain intact.in:   ... out:   mode: replace
Replace on new dataThis replaces data in the target table only when there is new data to import.in:   ... out:   mode: replace_on_new_data

Scheduling Executions

You can schedule periodic data connector execution for incremental file import. Configure the scheduler carefully to ensure high availability.

For the scheduled import, you can import all files that match the specified prefix and one of these fields by condition:

  • If screen-shotuse_modified_timescreen-shot is disabled, the last path is saved for the next execution. On the second and subsequent runs, the connector only imports files that come after the last path in alphabetical order.
  • Otherwise, the time that the job is executed is saved for the next execution. On the second and subsequent runs, the connector only imports files that were modified after that execution time in alphabetical order.

All dates and as well as date/time formats that are specified  without time zones are always treated as default UTC.

Create a Schedule Using the TD Toolbelt

A new schedule can be created using the screen-shottd connector:createscreen-shotcommand.

$ td connector:create daily_import "10 0 * * *" \
    td_sample_db td_sample_table load.yml

Treasure Data also recommends that you specify the --time-column option, because Treasure Data’s storage is partitioned by time (see also data partitioning).

td connector:create daily_import "10 0 * * *" \
td_sample_db td_sample_table load.yml \
--time-column created_at

The screen-shotcronscreen-shot parameter also accepts three special options: @hourly, @daily, and @monthly.

By default, the schedule is set up in the UTC timezone. You can set the schedule in a timezone using the screen-shot-tscreen-shot or screen-shot--timezone option.screen-shot The screen-shot--timezonescreen-shot option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles', etc. Timezone abbreviations like PST or CST are not supported and might lead to unexpected schedules.