データ分析の最初の手順であるデータ収集には大きく分けてストリーミング処理とバッチ処理という2つの方法があります。前回は、

Embulkでマスターデータをバッチ処理として収集する
Embulkとは
Fluentdはリアルタイムに生成されるデータに対しては有効なツールでした。しかし、
こうしたケースに対して、
こうした課題を解決するために、
Embulkの特徴として、
そして、
-
- guess
- データを一部読み込み、
自動でスキーマを推定し、 設定ファイルを生成します。
-
- preview
- 設定ファイルのスキーマ情報を元に読み込んだ際のプレビューを行います。
- ここで想定と異なるスキーマに成ってしまっていた場合には、
設定ファイルを手動で修正します。
-
- run
- 完成した設定ファイルを元にEmbulkを動かし、
データの転送を実行します。
Embulkをインストールする
では、
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar" $ chmod +x ~/.embulk/bin/embulk $ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc $ source ~/.bashrc
マスターデータのフォーマット
Embulkをインストールして、
今回は、
user_ |
sex | last_ |
closed_ |
age | city | device | country |
---|---|---|---|---|---|---|---|
1190452 | 1 | 2011-07-08 22:13:19 | 0001-01-01 00:00:00 | 21 | miyagi | smart phone | japan |
1581708 | 1 | 2010-04-04 23:33:52 | 0001-01-01 00:00:00 | 40 | tokyo | feature phone | japan |
1629941 | 0 | 2009-11-04 20:41:10 | 0001-01-01 00:00:00 | 35 | tokyo | smart phone | japan |
このデータをローカルのPostgreSQLのtestdb以下で定義しておきます。また合わせてサンプルデータもインポートしておきます。
create table users ( user_id bigint PRIMARY KEY, sex smallint, last_update timestamp NOT NULL, closed_account_time timestamp NOT NULL, age smallint, city varchar(30), device varchar(30), country varchar(30) ); INSERT INTO users (user_id, sex, last_update, closed_account_time, age, city, device, country) VALUES (1190452, 1, '2011-07-08 22:13:19', '0001-01-01 00:00:00', 21, 'miyagi' ,'smart_phone', 'japan'), (1581708, 1, '2010-04-04 23:33:52', '0001-01-01 00:00:00', 40, 'tokyo', 'feature phone', 'japan'), (1629941, 0, '2009-11-04 20:41:10', '0001-01-01 00:00:00', 35, 'tokyo', 'smart phone', 'japan')
また、
create table users ( user_id bigint PRIMARY KEY, sex smallint, last_update DATETIME, closed_account_time DATETIME, age smallint, city varchar(30), device varchar(30), country varchar(30) );
PostgreSQLのマスタデータを分析エンジンにインポートする
Pluginのインストール
マスターデータを格納するローカルのPostgreSQLから分析エンジンにしているMySQLへのインポートを行います。そこで今回は、
$ embulk gem install embulk-input-postgresql $ embulk gem install embulk-output-mysql
Guessコマンドによるスキーマの推定
プラグインのインストールができたら、
また設定のパートとして主にinとexecとoutの3ヵ所があり、
in:
type: postgresql
host: localhost
user: postgres
password: "password"
database: testdb
table: users
select: "*"
out:
type: mysql
host: HOSTNAME
user: mysql
password: "password"
database: testdb
table: users
mode: insert
column_options:
last_update: {type: datetime}
closed_account_time: {type: datetime}
ここでの注意点として、
$ embulk guess seed.yml -o guess.yml 2015-07-29 18:10:15.650 +0900: Embulk v0.6.19 2015-07-29 18:10:17.095 +0900 [INFO] (guess): Loaded plugin embulk-input-postgresql (0.6.0) in: type: postgresql host: localhost user: postgres password: password database: testdb table: users select: '*' column_options: last_update: {type: string, timestamp_format: '%Y/%m/%d %H:%M:%S', timezone: '+0900'} closed_account_time: {type: string, timestamp_format: '%Y/%m/%d %H:%M:%S', timezone: '+0900'} out: type: mysql host: HOSTNAME user: mysql password: password database: testdb table: users mode: insert column_options: last_update: {type: datetime} closed_account_time: {type: datetime} Created 'guess.yml' file.
Previewコマンドによるデータチェック
previewコマンドでは、
$ embulk preview guess.yml 2015-07-29 18:15:50.053 +0900: Embulk v0.6.19 2015-07-29 18:15:51.473 +0900 [INFO] (preview): Loaded plugin embulk-input-postgresql (0.6.0) 2015-07-29 18:15:51.549 +0900 [INFO] (preview): SQL: SET search_path TO "public" 2015-07-29 18:15:51.769 +0900 [INFO] (preview): SQL: SET search_path TO "public" 2015-07-29 18:15:51.770 +0900 [INFO] (preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT * FROM "users" 2015-07-29 18:15:51.771 +0900 [INFO] (preview): SQL: FETCH FORWARD 10000 FROM cur 2015-07-29 18:15:51.773 +0900 [INFO] (preview): > 0.00 seconds 2015-07-29 18:15:51.795 +0900 [INFO] (preview): SQL: FETCH FORWARD 10000 FROM cur 2015-07-29 18:15:51.796 +0900 [INFO] (preview): > 0.00 seconds +--------------+----------+-------------------------+-------------------------------+----------+-------------+---------------+----------------+ | user_id:long | sex:long | last_update:timestamp | closed_account_time:timestamp | age:long | city:string | device:string | country:string | +--------------+----------+-------------------------+-------------------------------+----------+-------------+---------------+----------------+ | 1,190,452 | 1 | 2011-07-08 13:13:19 UTC | 0000-12-29 15:00:00 UTC | 21 | miyagi | smart_phone | japan | | 1,581,708 | 1 | 2010-04-04 14:33:52 UTC | 0000-12-29 15:00:00 UTC | 40 | tokyo | feature phone | japan | | 1,629,941 | 0 | 2009-11-04 11:41:10 UTC | 0000-12-29 15:00:00 UTC | 35 | tokyo | smart phone | japan | +--------------+----------+-------------------------+-------------------------------+----------+-------------+---------------+----------------+
Runコマンドによるデータ転送
プレビューでスキーマの設定に問題がないことを確認したらRunコマンドを実行します。これにより、
$ embulk run guess.yml 2015-07-29 18:12:46.199 +0900: Embulk v0.6.19 2015-07-29 18:12:48.901 +0900 [INFO] (transaction): Loaded plugin embulk-input-postgresql (0.6.0) 2015-07-29 18:12:48.951 +0900 [INFO] (transaction): Loaded plugin embulk-output-mysql (0.4.1) 2015-07-29 18:12:49.158 +0900 [INFO] (transaction): SQL: SET search_path TO "public" 2015-07-29 18:12:49.339 +0900 [INFO] (transaction): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} 2015-07-29 18:12:49.653 +0900 [INFO] (transaction): Using insert mode 2015-07-29 18:12:49.668 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS `users_0000000055b8990f21bc7980_bl_tmp000` 2015-07-29 18:12:49.675 +0900 [INFO] (transaction): > 0.01 seconds 2015-07-29 18:12:49.684 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS `users_0000000055b8990f21bc7980_bl_tmp000` (`user_id` BIGINT, `sex` SMALLINT, `last_update` datetime, `closed_account_time` datetime, `age` SMALLINT, `city` VARCHAR(30), `device` VARCHAR(30), `country` VARCHAR(30)) 2015-07-29 18:12:49.714 +0900 [INFO] (transaction): > 0.03 seconds 2015-07-29 18:12:49.756 +0900 [INFO] (transaction): {done: 0 / 1, running: 0} 2015-07-29 18:12:49.795 +0900 [INFO] (task-0000): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} 2015-07-29 18:12:49.807 +0900 [INFO] (task-0000): Prepared SQL: INSERT INTO `users_0000000055b8990f21bc7980_bl_tmp000` (`user_id`, `sex`, `last_update`, `closed_account_time`, `age`, `city`, `device`, `country`) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 2015-07-29 18:12:50.010 +0900 [INFO] (task-0000): SQL: SET search_path TO "public" 2015-07-29 18:12:50.011 +0900 [INFO] (task-0000): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT * FROM "users" 2015-07-29 18:12:50.012 +0900 [INFO] (task-0000): SQL: FETCH FORWARD 10000 FROM cur 2015-07-29 18:12:50.013 +0900 [INFO] (task-0000): > 0.00 seconds 2015-07-29 18:12:50.056 +0900 [INFO] (task-0000): SQL: FETCH FORWARD 10000 FROM cur 2015-07-29 18:12:50.057 +0900 [INFO] (task-0000): > 0.00 seconds 2015-07-29 18:12:50.062 +0900 [INFO] (task-0000): Loading 3 rows 2015-07-29 18:12:50.075 +0900 [INFO] (task-0000): > 0.01 seconds (loaded 3 rows in total) 2015-07-29 18:12:50.076 +0900 [INFO] (transaction): {done: 1 / 1, running: 0} 2015-07-29 18:12:50.076 +0900 [INFO] (transaction): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=2700000} 2015-07-29 18:12:50.091 +0900 [INFO] (transaction): SQL: INSERT INTO `users` (`user_id`, `sex`, `last_update`, `closed_account_time`, `age`, `city`, `device`, `country`) SELECT `user_id`, `sex`, `last_update`, `closed_account_time`, `age`, `city`, `device`, `country` FROM `users_0000000055b8990f21bc7980_bl_tmp000` 2015-07-29 18:12:50.092 +0900 [INFO] (transaction): > 0.00 seconds (3 rows) 2015-07-29 18:12:50.108 +0900 [INFO] (transaction): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} 2015-07-29 18:12:50.125 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS `users_0000000055b8990f21bc7980_bl_tmp000` 2015-07-29 18:12:50.137 +0900 [INFO] (transaction): > 0.01 seconds 2015-07-29 18:12:50.137 +0900 [INFO] (main): Committed. 2015-07-29 18:12:50.138 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
インポートが完了したら、
mysql> select * from users; +---------+------+---------------------+---------------------+------+--------+---------------+---------+ | user_id | sex | last_update | closed_account_time | age | city | device | country | +---------+------+---------------------+---------------------+------+--------+---------------+---------+ | 1190452 | 1 | 2011-07-08 22:13:19 | 0001-01-01 00:00:00 | 21 | miyagi | smart_phone | japan | | 1581708 | 1 | 2010-04-04 23:33:52 | 0001-01-01 00:00:00 | 40 | tokyo | feature phone | japan | | 1629941 | 0 | 2009-11-04 20:41:10 | 0001-01-01 00:00:00 | 35 | tokyo | smart phone | japan | +---------+------+---------------------+---------------------+------+--------+---------------+---------+ 3 rows in set (0.01 sec)
おまけ:Embulkをスケジュール実行する
先ほどまでの例では、last_
を利用することで、
では、
embulk-input-postgresqlには、
それでは、last_
カラムを利用して、
まずは、
in:
type: postgresql
host: localhost
user: postgres
password: "password"
database: testdb
table: users
select: "*"
where: "last_update between (current_date - interval '1 day') and (current_date)"
最後に、embulk run load.
をcronで毎日実行するだけで、
データインポートのまとめ
前回は、
これらの2つの方法を用いることで、