大量データの転送にEmbulkを使ってみたら本当に楽だった

はじめまして。Zucks Affiliateでエンジニアをしている宗岡です。 今回は、リアルタイム性は求めないけど、簡単に大量のデータをどこか別の場所に転送したい。 という要望に答えてくれるEmbulkを紹介したいと思います。

実際に導入に至ったきっかけや、運用上よくある課題なども触れていきたいと思います。 同じ境遇の人が「簡単そうだしEmbulk使ってみようかな」となっていただければ幸いです。

目次

背景

  • 私の所属する事業部でも、BigQueryを使って色々と解析する機会が増えてきました
    • (広告効果向上のためにさまざまなデータを取り扱い、分析等を行っています)
  • ところが、BigQueryに乗っているデータは今のところアクセスログやアプリケーションログのみ
  • これだとデータとして足りず、DBに乗ってるコンバージョンしたユーザのデータもBigQueryに上げられれば、解析がより楽になりそうだ。というのがきっかけでした。

Embulk以外にも出てきた案

  • re:dashを導入し、「Query Results」を使って、BigQueryとDBのデータをJOINして解析
    • これなら特に努力する必要なく、re:dashの導入と、データソースの追加だけで完結です
    • ただ、残念ながら「Query Results」機能はまだオープンソースとしては公開されていないという悲しい現実がありました
    • Pythonをデータソースに追加することで、無理やり別々のデータソースから取得した結果をマージする方法もありますが、中々辛みがあり、なるべく避けたかったです
  • MySQLのデータをBigQueryに上げるという選択肢が挙がり、(弊社的には)割りと一般的な「MySQL > S3 > GCS > BigQuery」という方法もありましたが、下記のような理由から今回はEmbulkを採用してみました
    • 定期的にバッチとしてデータを取り込み続けたい
    • 今後も同じ事をしていくテーブルが増えそう
    • 社内の他事業部でも導入実績があった

などの理由で、今回はEmbulkを使ってみました

実際のEmbulkの導入と使い方

見ていただければわかると思いますが、非常に簡単です。

1. Embulkのインストールとセットアップ

具体的なセットアップ周りでは、embulkの公式をご参照下さい。

インストールはとても簡単です。

$ curl --create-dirs -o bin/embulk -L "http://dl.embulk.org/embulk-latest.jar
$ chmod +x bin/embulk

これだけ完了です。 もしグローバルでEmbulkを使いたければ、パスを通してあげればOKです。

2. 必要なプラグインのインストール

今回はMySQLからinputして、GoogleのBigQueryにoutputしたかったので、 次の2つのプラグインをインストールします。

  • embulk-input-mysql
  • embulk-output-bigquery

まずはGemfileを用意してあげます。

# For Embulk
source 'https://rubygems.org'

# input mysql plugin
gem 'embulk-input-mysql'

# ouput bq plugin
gem 'embulk-output-bigquery'

そして、embulk bundle

$ ./bin/embulk bundle --path=bundle
2017-07-18 02:15:05.887 +0000: Embulk v0.8.27
Fetching gem metadata from https://rubygems.org/...........
Fetching version metadata from https://rubygems.org/..
Resolving dependencies......
Installing public_suffix 2.0.5
Installing addressable 2.5.1
Installing declarative 0.0.9
Installing declarative-option 0.1.0
Installing embulk-input-mysql 0.8.4
Installing multipart-post 2.0.0
Installing faraday 0.12.1
Installing jwt 1.5.6
Installing little-plugger 1.1.4
Installing multi_json 1.12.1
Installing logging 2.2.2
Installing memoist 0.16.0
Installing os 0.9.6
Installing signet 0.7.3
Installing googleauth 0.5.1
Installing httpclient 2.8.3
Installing mime-types-data 3.2016.0521
Installing mime-types 3.1
Installing uber 0.1.0
Installing representable 3.0.4
Installing retriable 3.0.2
Installing google-api-client 0.13.1
Installing thread_safe 0.3.6
Installing tzinfo 1.2.3
Installing time_with_zone 0.3.1
Installing embulk-output-bigquery 0.4.5
Using bundler 1.10.6
Bundle complete! 2 Gemfile dependencies, 27 gems now installed.
Use `bundle show [gemname]` to see where a bundled gem is installed.

これで準備はOKです。

3. 設定ファイルを書く

次はEmbulkの設定ファイルを書いていきます。 設定ファイルは基本的にyaml形式で書きます。

Embulkでは、Liquidというテンプレートエンジンを搭載(v0.7.7以降)しているので、この機能を使いたい場合は、 ファイル名をxxxx.yml.liquidのようにします。

また、今回の例ではDBの接続情報は環境変数(.envファイル)を使ってみる事にしましたが、 実務ではAWSのCodeCommitで管理しており、大分便利ですのでオススメです。

実務でcodecommitを使った例

下記のようにすることで、.envを使うよりもより簡単にクレデンシャルを扱う事が可能になります

  • codecommitのリポジトリを特定ディレクトリ(今回は/credential)にclone
  • Makefileで必要な環境変数はexportし、設定ファイルで読み込めるようにする
  • (codecommitでクレデンシャル情報を管理出来るので、リリース時の.envファイルの扱い等は考える必要なくなります)
# rdsのクレデンシャル情報
include /credentials/rds/ro_connection_info.txt

export MYSQL_HOST      := $(host)
export MYSQL_USER      := $(user)
export MYSQL_PWD       := $(password)

# gcpのクレデンシャル情報
export CREDENTIA_JSON  := /credentials/gcp/credential.json

設定ファイルの書き方

in:
  type: mysql
  host: {{ env.MYSQL_HOST_NAME }}
  user: {{ env.MYSQL_USER }}
  password: {{ env.MYSQL_PASSWORD }}
  database: embulk_test
  table: user
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: ./credentials/bq_project.json
  path_prefix: tmp
  file_ext: .csv.gz
  source_format: CSV
  project: test-bigquery-project-166901
  dataset: test_for_embulk
  auto_create_table: true
  table: users
  formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
  encoders:
  - {type: gzip}

BQ側のプロジェクトや、データセットは今回の為にテスト的に作ったものを指定してます。

また、input/outputのそれぞれの設定は各プラグインのページを見ると詳細が載ってますので、イロイロ試してみると面白いです。

参考: embulk-input-mysql embulk-output-bigquery

4. まずはpreviewで問題なさそうか確認

previewは、dry-run機能として使えます。 実際にEmbulkを実行する前に、どんなデータが転送されそうかを確認出来ます。

$ ./bin/embulk preview -b . user.yml.liquid
2017-07-19 23:27:13.766 +0000: Embulk v0.8.27

... 中略 ...

2017-07-19 23:27:15.841 +0000 [INFO] (0001:preview): SQL: SELECT id,name,created_at FROM `user`
2017-07-19 23:27:15.854 +0000 [INFO] (0001:preview): > 0.01 seconds
2017-07-19 23:27:15.922 +0000 [INFO] (0001:preview): Fetched 500 rows.
+---------+------------------------+-------------------------+
| id:long |            name:string |    created_at:timestamp |
+---------+------------------------+-------------------------+
|       1 |            Valerie Cox | 2017-07-11 09:31:15 UTC |
|       2 |         Jennifer Perry | 2017-07-15 10:02:30 UTC |
|       3 |           James Garcia | 2017-07-01 05:56:45 UTC |
|       4 |         Haley Calderon | 2017-07-28 21:08:42 UTC |
|       5 |     Cindy Williams DDS | 2017-07-06 17:26:38 UTC |
|       6 |       Christina Mathis | 2017-07-27 22:45:17 UTC |
|       7 |         Timothy Barker | 2017-07-28 12:12:40 UTC |
|       8 |             Thomas Lee | 2017-07-01 01:29:39 UTC |
|       9 |            Jamie Smith | 2017-07-09 03:47:16 UTC |
|      10 |         Wesley Santana | 2017-07-23 18:42:06 UTC |

~長いので省略~

5. 問題なさそうなのでrunして実行

$ ./bin/embulk run -b . user.yml.liquid
2017-07-19 23:27:53.166 +0000: Embulk v0.8.27

... 中略 ...

2017-07-19 23:28:12.897 +0000 [INFO] (0016:task-0000): Fetched 1,000 rows.
2017-07-19 23:28:14.252 +0000 [INFO] (0016:task-0000): Fetched 2,000 rows.
2017-07-19 23:28:15.966 +0000 [INFO] (0016:task-0000): Fetched 4,000 rows.
2017-07-19 23:28:18.242 +0000 [INFO] (0016:task-0000): Fetched 8,000 rows.
2017-07-19 23:28:19.496 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

... 中略 ...

2017-07-19 23:28:55.069 +0000 [INFO] (main): Committed.
2017-07-19 23:28:55.069 +0000 [INFO] (main): Next config diff: {"in":{},"out":{}}

終わりました。 今回対象件数は1万件程でしたが、 約1分 で完了です。 とっても楽ちんですね。

BigQuery上でもこのようにちゃんと1万件分のデータがアップロードされてるのが確認出来ます。

スクリーンショット 2017-07-20 08.30.37.png

Embulkの運用上、よくぶつかる課題

Embulkを使うと簡単に転送出来る事が分かった所で、運用上ハマりどこになりそうな所を上げておきます。 ハマったとしても、大体やりたい事はプラグインを見れば解決出来る設定方法が書いてあるので参考にしてみると良いと思います。

今回であれば下記2つのプラグインを参照してます。

embulk-input-mysql embulk-output-bigquery

1. 重複に気付け無い

Embulkを使う機会は、大量のデータを定期的にインサートしたい場合だと思いますので、 cron等で動かす事になるかと思います。

その場合、cronでも動かしてるものを、誤って手動でインサートした場合、そのまま重複した状態でデータがインサートされてしまいます。

対応 - prevent_duplicate_insertをtrueに設定する

これが困ってしまうという場合は、BigQueryの場合は、prevent_duplicate_insertをtrueにしておく事で、重複を弾けます。

実際に、この設定を追加してやってみましょう。

$ git diff user.yml.liquid
diff --git a/user.yml.liquid b/user.yml.liquid
index 37a7074..e1980ff 100644
--- a/user.yml.liquid
+++ b/user.yml.liquid
@@ -16,6 +16,7 @@ out:
   project: test-bigquery-project-166901
   dataset: test_for_embulk
   auto_create_table: true
+  prevent_duplicate_insert: true
   table: user
   formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
   encoders:

prevent_duplicate_insertをtrueにしてみます。

$ ./bin/embulk run -b . user.yml.liquid
2017-07-19 23:44:22.042 +0000: Embulk v0.8.27
...中略 ...
Error: org.jruby.exceptions.RaiseException: (Error) failed to load tmp.30148.5930.csv.gz to test-bigquery-project-166901:test_for_embulk.LOAD_TEMP_0b5d4c5e_e440_407e_8a2d_92caeb232bd5_user, response:{:status_code=>409, :message=>"duplicate: Already Exists: Job test-bigquery-project-166901:embulk_load_job_dcec11ac837806a8524fcecf0b5b12a5", :error_class=>Google::Apis::ClientError}

Already Exists: Job test-bigquery-project-166901:embulk_load_job_dcec11ac837806a8524fcecf0b5b12a5"という事でちゃんと重複データを入れようとするとエラーで落としてくれました。

2. 転送元のスキーマを変更するとエラーで落ちる

転送元をデータベースにしている場合などは、スキーマの変更は珍しくないと思います。 この場合、デフォルト設定のままだとカラム追加時にEmbulkはエラーで落ちます。

理由は2つあります。

  • 設定ファイルのinputの設定に、テーブル名だけを指定している事
    • テーブル名だけの指定だと、全てのカラムを転送しようとするので、今回追加するageカラムも追加しようとします
  • BigQuery上に作ったスキーマと異なるデータを転送しようとしている事

今対象としてるサンプルで使ったテーブルはuserというシンプルなテーブルです。

mysql> desc user;
+------------+------------------+------+-----+---------+----------------+
| Field      | Type             | Null | Key | Default | Extra          |
+------------+------------------+------+-----+---------+----------------+
| id         | int(10) unsigned | NO   | PRI | NULL    | auto_increment |
| name       | varchar(32)      | NO   |     | NULL    |                |
| created_at | timestamp        | NO   |     | NULL    |                |
+------------+------------------+------+-----+---------+----------------+
3 rows in set (0.00 sec)

こちらのテーブルに、ageカラムを追加してみます。

mysql> ALTER TABLE user ADD column age INT UNSIGNED NOT NULL AFTER name
    -> ;
Query OK, 0 rows affected (0.19 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> desc user;
+------------+------------------+------+-----+---------+----------------+
| Field      | Type             | Null | Key | Default | Extra          |
+------------+------------------+------+-----+---------+----------------+
| id         | int(10) unsigned | NO   | PRI | NULL    | auto_increment |
| name       | varchar(32)      | NO   |     | NULL    |                |
| age        | int(10) unsigned | NO   |     | NULL    |                |
| created_at | timestamp        | NO   |     | NULL    |                |
+------------+------------------+------+-----+---------+----------------+
4 rows in set (0.00 sec)

この状態でEmbulkを走らせてみます。

$ ./bin/embulk run -b . user.yml.liquid
2017-07-20 00:01:08.421 +0000: Embulk v0.8.27

... 中略 ...

Error: org.jruby.exceptions.RaiseException: (Error) failed during waiting a Copy job, get_job(test-bigquery-project-166901, embulk_copy_job_dedd3a2b-d82d-4bc8-af97-e68aff6540fa), errors:[{:reason=>"invalid", :message=>"Provided Schema does not match Table test-bigquery-project-166901:test_for_embulk.user. Cannot add fields (field: age)."}]

BigQuery上のテーブルスキーマと異なるデータ(今回だと新しく追加したageカラム)も転送しようとして落ちてしまいます。

対策 - 転送するカラムを指定しておく

これを防ぐには、予め転送するカラムを指定しておく事で解決できます。 詳しくは、mySQL input pluginの設定に記載してありますが、今回の場合は、テーブル名だけでなく、カラムも指定してあげるようにしておけばOKです。

$ git diff user.yml.liquid
diff --git a/user.yml.liquid b/user.yml.liquid
index d6a52e9..37a7074 100644
--- a/user.yml.liquid
+++ b/user.yml.liquid
@@ -5,6 +5,7 @@ in:
   password: {{ env.MYSQL_PASSWORD }}
   database: embulk_test
   table: user
+  select: id,name,created_at
 out:
   type: bigquery
   auth_method: json_key

この状態でもう一度Embulkを実行してみましょう。

$ ./bin/embulk run -b . user.yml.liquid
2017-07-20 23:07:31.758 +0000: Embulk v0.8.27

...中略...

2017-07-20 23:07:51.614 +0000 [INFO] (0016:task-0000): Fetched 1,000 rows.
2017-07-20 23:07:52.890 +0000 [INFO] (0016:task-0000): Fetched 2,000 rows.
2017-07-20 23:07:54.619 +0000 [INFO] (0016:task-0000): Fetched 4,000 rows.
2017-07-20 23:07:56.872 +0000 [INFO] (0016:task-0000): Fetched 8,000 rows.
2017-07-20 23:07:58.057 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

...中略...

2017-07-20 23:08:33.688 +0000 [INFO] (0001:transaction): embulk-output-bigquery: 
Copy job response... job_id:[embulk_copy_job_38ef8d00-a904-456d-824f-3cd36698c16f] response.statistics:{:creation_time=>1500592103079, :start_time=>1500592103219, :end_time=>1500592105762}
2017-07-20 23:08:33.688 +0000 [INFO] (0001:transaction): embulk-output-bigquery: Delete table... test-bigquery-project-166901:test_for_embulk.LOAD_TEMP_282263eb_ec81_4868_9115_bee413408092_user
2017-07-20 23:08:34.322 +0000 [INFO] (0001:transaction): embulk-output-bigquery: delete tmp.32592.5930.csv.gz
2017-07-20 23:08:34.333 +0000 [INFO] (main): Committed.
2017-07-20 23:08:34.333 +0000 [INFO] (main): Next config diff: {"in":{},"out":{}}

問題なく転送出来ている事が分かります。 このように、予めスキーマが変わりそうな場合は、 selectで対象カラムを指定し、想定内のカラムのみが転送される状態にしておくのが望ましいと思います。

また、SQLの取得には下記の様に2種類ありますので、直接Queryを書いてしまうというのもありです。

  • query にクエリを直接書く
  • query は書かずに、table/select / where / order_by を記載

3. 設定ファイルが増えてくる(対象テーブルが増えてくる)と重複が多くなる

設定ファイルの中でも、実際変わるのはテーブル名・カラム名・Project名とかだけだと思います。 その他の接続情報などはほとんど同じです。 これらを、転送テーブルを増やす度に、都度設定ファイルに書くのは面倒です。

対応 - liquidテンプレートのinclude機能を使う

実務では、liquidテンプレートの機能を使い、下記のような構成で運用しています。

- templates
    - common
        - _conf_mysql_include.yml.liquid
        - _conf_bigquery_include.yml.liquid
    - user.yml.liquid

で、実際にuser.yml.liquidは、下記のように、 commonに書いたmysqlとBigQueryの設定ファイルをincludeして使っています。 (includeされるファイルは、ファイル名の先頭に_(アンダースコア)を付けてあげます)

in:
  {% include 'common/conf_mysql_include' %}
  table: user
out:
  {% include 'common/conf_bigquery_include' %}
  table: user

これでテーブル毎に異なる設定をする場合も、 共通項はcommoに切り出せているので重複を防ぐ事が可能となります。

4. エラーメッセージが分かりづらいので整理しておく

このように、非常に簡単に、あらゆる大規模なデータソースを、短時間で転送出来るという便利な代物なEmbulkですが、若干エラーメッセージが分かり辛いので、よく遭遇するエラーメッセージを紹介しておきます。

4-1. Setting null to a task field is not allowed

これは、特定のフィールドにnull値は入れられないよ。

という意味ですが、よくあるのは、.envを使って環境変数で設定ファイルを書いている場合、 .envに書いた環境変数をexportしてない可能性が高いです。 (ここで再現した例も、環境変数のexportをしてないのが原因です)

$ ./bin/embulk run -b . user.yml.liquid
2017-07-20 23:19:19.560 +0000: Embulk v0.8.27
2017-07-20 23:19:23.055 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-mysql (0.8.4)

... 中略 ...

Error: org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Setting null to a task field is not allowed. Use Optional<T> (com.google.common.base.Optional) to represent null.

4-2. Can not deserialize instance of java.lang.String out of START_OBJECT

これは、liquidテンプレートを使っているのに、ファイル名にliquidがついてない時にでます。

今回あえて、これまでのuser.yml.liquiduser.yamlに変えてみます。

$ ./bin/embulk run -b . user.yml
2017-07-20 23:13:47.582 +0000: Embulk v0.8.27
2017-07-20 23:13:50.058 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-mysql (0.8.4)
2017-07-20 23:14:01.579 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.5)
org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token
 at [Source: N/A; line: -1, column: -1]

... 中略 ...
    
    Error: org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token
 at [Source: N/A; line: -1, column: -1]

4-3 Can not deserialize instance of java.util.ArrayList out of VALUE_STRING token

これは、配列で指定する必要がある所を、文字列で指定している為に出るエラーです。 例えば、incremental_columnsは配列で指定する必要がありますが、 わざと配列指定じゃなくしてみます。 (正しくは、incremental_columns: [id]とする必要があります)

$ git diff user.yml.liquid
diff --git a/user.yml.liquid b/user.yml.liquid
index 37a7074..a30d64d 100644
--- a/user.yml.liquid
+++ b/user.yml.liquid
@@ -6,6 +6,9 @@ in:
   database: embulk_test
   table: user
   select: id,name,created_at
+  incremental: true
+  incremental_columns: id
 out:
   type: bigquery
   auth_method: json_key

これで実行してみます。

$ ./bin/embulk run -b . user.yml.liquid
2017-07-20 23:39:54.033 +0000: Embulk v0.8.27
2017-07-20 23:39:57.515 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-mysql (0.8.4)
2017-07-20 23:40:08.258 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.5)
... 中略 ...
Error: org.embulk.config.ConfigException: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.util.ArrayList out of VALUE_STRING token
 at [Source: N/A; line: -1, column: -1]

まとめ

今回は、大量データを簡単に別のDWHに転送する方法としてEmbulkを紹介しました。 また、実際に運用上に困ってくる事も話してみました。

冒頭に書いた様に、リアルタイム性は求めないものの、簡単にデータ転送したいな。 という欲求をお持ちの方は、一度試してみてはいかがでしょうか。 御覧頂いた通り、非常に簡単に始められますので非常にオススメです。