Visit our new documentation site! This documentation page is no longer updated.

Data Connector execution on Hadoop

The Data Connector can be executed on our Hadoop as a MapReduce task for better performance.

Data Connector jobs are running on a single instance by default. By using Hadoop and running the Data Connector as a MapReduce job, the workload can be executed on multiple servers in parallel, which is particularly beneficial when processing a large number of files.

The other great benefit of using the MapReduce executor for the Data Connector is that it automatically enables partitioning imported records based on the value in the time column. This in turn has the benefit of enabling fine grained partial deletion of the data imported and making the queries against the imported data much more efficient (provided that time-index predicate pushdown is used).

Table of Contents

Prerequisites

  • Basic knowledge of Treasure Data

Step 1: Specify executor type in your configuration file

First, edit your existing config.yml file and add the executor type mapreduce in the exec section. The following is an example to use the MapReduce executor to ingest data from AWS S3:

exec:
  type: mapreduce
  reducers: 4
in:
  type: s3
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file
  ... ...
out:
  mode: append

Step 2: Execute the Data Connector job

Submit the Data Connector job the same way:

$ td connector:issue load.yml \
     --database td_sample_db --table td_sample_table --time-column created_at

Known Issues

Sporadic HDFS Access failures

Hadoop MapReduce Data Connector execution sometimes incurs in sporadic HDFS access failures. These failures a recoverable and are handled by the executed with the intrinsic retrying mechanism. When these failures occur, the following error message will appear in the job’s log (either on the command line or in the job’s detail page in the Console). At this point this error can be safely ignored.

2015-11-12 21:24:40.654 +0000 [WARN] (transaction): Retrying opening state file /mnt/embulk/20151112_212333_682000000/attempt_1445591247779_23073_m_000006_0 (1/5) error: {}
java.io.EOFException: JSON is not included in the attempt state file.
  at org.embulk.executor.mapreduce.AttemptState.readFrom(AttemptState.java:167) ~[embulk-executor-mapreduce-0.2.2.jar:na]
  at org.embulk.executor.mapreduce.EmbulkMapReduce$7.call(EmbulkMapReduce.java:300) ~[embulk-executor-mapreduce-0.2.2.jar:na]
  at org.embulk.executor.mapreduce.EmbulkMapReduce$7.call(EmbulkMapReduce.java:295) ~[embulk-executor-mapreduce-0.2.2.jar:na]
  at org.embulk.spi.util.RetryExecutor.run(RetryExecutor.java:100) [embulk-core-0.7.7.jar:na]
  at org.embulk.spi.util.RetryExecutor.runInterruptible(RetryExecutor.java:77) [embulk-core-0.7.7.jar:na]
  at org.embulk.executor.mapreduce.EmbulkMapReduce.readAttemptStateFile(EmbulkMapReduce.java:295) [embulk-executor-mapreduce-0.2.2.jar:na]
  at org.embulk.executor.mapreduce.MapReduceExecutor.getAttemptReports(MapReduceExecutor.java:520) [embulk-executor-mapreduce-0.2.2.jar:na]

Ruby Data Connector plugins not working

Currently the Hadoop MapReduce executor only supports Java-based input plugins. Ruby-based plugins are currently not supported.

As of this writing, the Data Connector Java plugins are:

  • AWS S3, Riak CS
  • MySQL, PostgreSQL
  • AWS Redshift
  • Google Cloud Storage
  • Google BigQuery
  • FTP

The Ruby plugins are:

  • Marketo
  • Salesforce
  • Jira

Last modified: Aug 26 2016 18:42:59 UTC

If this article is incorrect or outdated, or omits critical information, let us know. For all other issues, access our support channels.