Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

User-defined partitioning (UDP) provides hash partitioning for a table on one or more columns in addition to the time column. A query that filters on the set of columns used as user-defined partitioning keys can be more efficient , because Presto can skip scanning partitions that have matching values on that set of columns.

...

Table of Contents
maxLevel13

Limitations

The benefits of UDP can be limited when used with more complex queries. The query optimizer might not always apply UDP in cases where it can be beneficial.

The

...

Maximum Number of Partitioning

...

Keys is

...

Less Than or

...

Equal to

...

Three Keys

If the limit is exceeded, Presto causes the following error message:

'bucketed_on' must be less than 4 columns

...

Imports

...

Other than SQL

The import method provided by Treasure Data for the following does not support UDP tables:

  • Streaming Import
  • Data Connector
  • BulkImport 

If you try to use any of these import methods, you will get an error. As a workaround, you can use a workflow to copy data from a table that’s that is receiving streaming import imports to the UDP table.

Use Cases for UDP

...

  • a customer's ID, first name+last name+birth date, gender, or other profile values or flags

  • a product's SKU number, bar code, manufacturer, or other exact-match attributes

  • an address's country code; city, state, or province; or postal code

...

  • "Needle-in-a-Haystack" lookup on the partition key

  • Aggregations on the partition key

  • Very large joins on partition keys used in tables on both sides of the join

Basic UDP Usage

CREATE TABLE Syntax for UDP

To create a UDP table:

  1. use Use CREATE TABLE with the attributes bucketed_on to identify the bucketing keys , and bucket_count for the number of buckets.

  2. Optionally, define the max_file_size and max_time_range values.

Code Block
linenumberstrue
CREATE
  TABLE
    table_name WITH(
      bucketed_on = array['col1' [,
      'col2']... ] [,
      bucket_count = n] [,
      max_file_size = 'nnnMB'] [,
      max_time_range = 'nnd']
    ) [AS SELECT 
      . . . ]
;

For bucket_count the default value is 512. This should work for most use cases.

For example:

Code Block
linenumberstrue
CREATE
  TABLE mytable_p(
    time bigint,
    col1 VARCHAR,
    col2 VARCHAR,
    col3 bigint
  ) WITH (
    bucketed_on = array['col1' [,
    'col2']... ]
  )
;If not specified:

...

Supported TD data types for UDP partition keys include int, long, and string. These correspond to Presto data types as described in in About TD Primitive Data Types.

Choose a set of one or more columns used widely to select data for analysis-- that is, one frequently used to look up results, drill down to details, or aggregate data. For example, depending on the most frequently used types, you might choose:

  • customer_id

  • Country + State/Province + City

  • Postal_code

  • Customer-first name + last name + date of birth 

Choose a column or set of columns that have high cardinality (relative to the number of buckets), and which are frequently used with equality predicates.  For For example:

  • Unique values, for example, an email address or account number

  • Non-unique but high-cardinality columns with relatively even distribution, for example, date of birth

Checking

...

For and Addressing Data Skew

The performance is inconsistent if the number of rows in each bucket are is not roughly equal -- for . For example, if you partition on the US zip /postal code, urban postal codes will have more customers than rural ones

To help determine bucket count and partition size, you can run a SQL query that identifies distinct key column combinations and counts their occurrences. For example:

Code Block
linenumberstrue
SELECT 
  (
    concat(key_column1 [,
      '|',
      key_column2])
  ),
  COUNT(*)
FROM
  tbl. . .
GROUP BY
  1
;

...

For consistent results, choose a combination of columns where the distribution is roughly equal. 

Note: If If you do decide to use partitioning keys that do not produce an even distribution, see "Tip: Improving Performance with Skewed Data.

 


Using INSERT

...

and INSERT OVERWRITE

...

to Partitioned Tables

INSERT and INSERT OVERWRITE with partitioned tables work the same as with other tables. You can create an empty UDP table and then insert data into it the usual way. The resulting data will be is partitioned. 

Partitioning an Existing Table

Tables must have partitioning specified when first created. For an existing table, you must create a copy of the table with UDP options configured and copy the rows over. To do this use a CTAS from the source table.

For example:

Code Block
linenumberstrue
drop table if EXISTS customer_p
;

create table customer_p with (
  bucketed_on = array['customer_id'],
  bucket_count = 512
)  as select  *  from  customer

...

  • Creating a partitioned version of a very large table is likely to take hours or days. Consult with TD support to make sure you can run complete this operation to completion.

  • If the source table is continuing to receive updates, you will have to must update it further with SQL. For example: 

    Code Block
    languagesql
    linenumberstrue
    INSERT
      INTO
        customer_p SELECT 
           *
        FROM
          customer WHERE... 


...

Create a partitioned copy of the customer table named customer_p, to speed up lookups by customer_id;

...

;

Code Block
languagesql
linenumberstrue
drop table if EXISTS customer_p
;

create table customer_p with (
  bucketed_on = array['customer_id'],
  bucket_count = 512
)   as select  *  from  customer

Create and populate a partitioned table customers_p to speed up lookups on "city+state" columns:

Code Block
languagesql
linenumberstrue
-- create partitioned table, bucketing on combination of city + state columns
-- create table customer_p with (bucketed_on = array['city','state'] , bucket_count=512, max_file_size = '256MB', max_time_range='30d');
create table customer_p with (
  bucketed_on = array['city',
  'state'],
  bucket_count = 512
)
;

-- update for beta
-- Insert new records into the partitioned table 
INSERT
  INTO
    customer_p
  VALUES(
    . . . . .
  )
;

INSERT
  overwrite INTO
    customer_p
  VALUES(
    . . . .
  )
;

-- accelerates queries that test equality on all bucketing columns 
SELECT 
   *
FROM
  customer_p
WHERE
  city = 'San Francisco'
  AND state = 'California'
;

SELECT 
   *
FROM
  customer_p
WHERE
  city IN(
    'San Francisco',
    'San Jose'
  )
  AND state = 'California'
  AND monthly_income > 10000
;

-- NOT accelerated: filter predicate does not use all hash columns
SELECT 
   *
FROM
  customer_p
WHERE
  state = 'California'
;

SELECT 
   *
FROM
  customer_p
WHERE
  city IN(
    'San Francisco',
    'San Jose'
  )
; 

 


UDP Advanced Use Case Details

...


Table of Content Zone
maxLevel3
minLevel2


Choosing Bucket Count, Partition Size in

storage

Storage, and Time Ranges for Partitions

Bucket counts must be in powers of two. Higher A higher bucket count means dividing data among many smaller partitions, which can be less efficient to scan. TD suggests starting with 512 for most cases.  If If you aren't sure of the best bucket count, it is safer to err on the low side. 

We recommend partitioning UDP tables on one-day or multiple-day time ranges, instead of the one-hour partitions most commonly used in TD. Otherwise you can wind up with , you might incur higher costs and slower data access because too many small partitions have to be fetched from storage.

Aggregations on the

hash key

Hash Key

Using a GROUP BY key as the bucketing key, major improvements in performance and reduction in cluster load on aggregation queries were seen. For example, you can see the UDP version of this query on a 1TB table:

  • used 10 Presto workers instead of 19

  • ran in 45 seconds instead of 2 minutes 31 seconds

processing >3x as many rows per second. ( The total data processed in GB was greater because the UDP version of the table occupied more storage.)

 


UDP version:

Code Block
languagesql
linenumberstrue
presto: udp_tpch_sf1000 > SELECT 
  COUNT(*)
FROM
  (
    SELECT 
      max_by(
        l_discount,
        time
      ),
      max_by(
        l_partkey,
        time
      )
    FROM
      lineitem
    GROUP BY
      l_orderkey
  )
;

_col0------------
1500000000(
  1 row
) Query 20171227_014452_14154_sjh9g,
FINISHED,
10 nodes Splits: 517 total,
517 done(
  100.00 %
) 0: 45 [6B ROWS,
25.5GB] [134M ROWS / s,
585MB / s]

non-UDP:

Code Block
languagesql
linenumberstrue
presto: udp_tpch_sf1000 > SELECT 
  COUNT(*)
FROM
  (
    SELECT 
      max_by(
        l_discount,
        time
      ),
      max_by(
        l_partkey,
        time
      )
    FROM
      tpch_sf1000.lineitem
    GROUP BY
      l_orderkey
  )
;

_col0------------
1500000000(
  1 row
) Query 20171227_014549_14273_sjh9g,
FINISHED,
19 nodes Splits: 175 total,
175 done(
  100.00 %
) 2: 31 [6B ROWS,
18.3GB] [39.7M ROWS / s,
124MB / s]

Needle-in-a-Haystack Lookup on the Hash Key

The largest improvements – 5x, 10x, or more – will – will be on lookup or filter operations where the partition key column columns are tested for equality. Only partitions in the bucket from hashing the partition keys are scanned.

For example, consider two tables:

  • table customers is bucketed on customer_id

  • table contacts is bucketed on country_code and area_code

 These queries will improve:

Code Block
languagesql
linenumberstrue
SELECT... FROM customers WHERE customer_id = 10001;

Here with UDP Presto scans only one bucket (the one that 10001 hashes to) if customer_id is the only bucketing key.

Code Block
languagesql
linenumberstrue
SELECT... FROM contacts WHERE country_code='1' and area_code = '650' and phone like'555-____'; 

Here with UDP Presto scans only the bucket that matches the hash of country_code 1 + area_code 650.

 These queries will not improve:

Code Block
languagesql
linenumberstrue
SELECT... FROM customers WHERE customer_id >= 10001; 

Here UDP will not improve performance, because the predicate doesn't use '='.

Code Block
languagesql
linenumberstrue
SELECT... FROM contacts WHERE area_code = '650' ; 

Here UDP will not improve performance, because the predicate does not include both bucketing keys.

Very Large Join Operations 

Very large join operations can sometimes run out of memory. Such joins can benefit from UDP. Distributed and colocated joins will use less memory, CPU, and shuffle less data among Presto workers. This may enable you to finish queries that would otherwise run out of resources. To leverage these benefits, you must:

  1. Make sure the two tables to be joined are partitioned on the same keys

  2. Use equi-joins equijoin across all the partitioning keys

  3. Set the following options on your join using a magic comment:

Code Block
languagesql
linenumberstrue
-- set session join_distritbution_type = 'PARTITIONED'
-- set session colocated_join = 'true'

Improving Performance on Skewed Data

When processing a UDP query, Presto ordinarily creates one split of filtering work per bucket (typically 512 splits, for 512 buckets). But if data is not evenly distributed, filtering on skewed bucket could make performance worse -- one Presto worker node will handle the filtering of that skewed set of partitions, and the whole query lags.

To enable higher scan parallelism you can use:

Code Block
linenumberstrue
-- set session distributed_bucket='true|false'

When set to true, multiple splits are used to scan the files in a bucket in parallel, increasing performance. The tradeoff is that colocated join is always disabled when distributed_bucket is true. As a result, some operations such as GROUP BY will require shuffling and more memory during execution.

This query hint is most effective with needle-in-a-haystack queries.  Even Even if these queries perform well with the query hint, Customers should test performance with and without the query hint in other use cases on those tables to find the best performance tradeoffs. 

...