ConoHaでお手軽Hadoop&Spark開発
筆者が所属している次世代システム研究室では、
ConoHaで構築するHadoopクラスタのインスタンス構成と料金
試験などのための開発環境として、

Ambari | HDPのHadoop管理システム |
YARN | Hadoopクラスタ上で動作するソフトウェアのスケジュール管理、 |
HBase | GoogleのBigTableをモデルとした列指向の分散データベース |
Hive | Hadoopの上に構築されたデータベース。HiveQLというSQLに似たクエリを使用できる |
Spark | インメモリで分散処理をするソフトウェア。リアルタイムストリーミング処理が行えるSparkStreaming、 |
リージョンは
マシン名 | 役割 | CPU | メモリ | SSD | 月額料金 |
---|---|---|---|---|---|
gateway | インターネットから接続できるSSHサーバ | 2コア | 1GB | 50GB | 900円 |
ci_ | HDPのHadoop管理システムAmbariとデプロイサーバ | 4コア | 4GB | 50GB | 3,420円 |
master_ | Hadoopのマスタノード | 6コア | 8GB | 50GB | 6,670円×3 |
slave_ | Hadoopのスレーブノード | 6コア | 8GB | 550GB | 11,170円×3 |
dev-web1、 | Nginxサーバ | 2コア | 1GB | 50GB | 900円×2 |
dev-ap1、 | HTTPアプリケーションサーバ | 3コア | 2GB | 50GB | 1,750円×2 |
dev-batch1、 | 集計バッチサーバ | 3コア | 2GB | 50GB | 1,750円×2 |
ロードバランサ1台 | Webサーバで使用 | 1,000円 | |||
合計 | 67,640円 |
システム構築作業の際にインスタンスを作って壊すことも何度かはあるため、
HadoopクラスタでのSparkの活用
構築したクラスタで、
1.ダミーデータの作成(1,000万レコード)
データベーステーブルの元となるアクセスログのダミーデータ
#!/usr/bin/perl
for ($i=0; $i < $ARGV[0]; $i++) {
$user_id = "uid_".int(rand ($ARGV[1]));
$page_url = "http://maru/page/".int(rand $ARGV[2]);
print $user_id."\t".$page_url."\n";
}
あるサイトに1万ページのURLがあり、
[hive@slave_node1]$./dummy.pl 10000000 1000000 10000 > dummy10m.tsv
生成したログデータは次のようなものです。
uid_1 http://maru/page/1
uid_1 http://maru/page/2
uid_2 http://maru/page/3
uid_3 http://maru/page/4
uid_3 http://maru/page/5
このデータをリスト2のようなデータ形式にSpark SQLで変換してみます。
uid_1 http://maru/page/1 http://maru/page/2
uid_2 http://maru/page/3
uid_3 http://maru/page/4 http://maru/page/5
2.テーブルにデータ(TSV)を登録
spark-sqlを起動し、
spark-sql> create database noda_conoha_sd4; use noda_conoha_sd4; CREATE TABLE raw_log_data ( uid STRING, url STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE; CREATE TABLE log_data ( uid STRING, url STRING ) STORED AS ORC tblproperties ("orc.compress"="SNAPPY");
TSVファイルをTEXTFILEフォーマットテーブルにロードしたあと、
spark-sql> LOAD DATA LOCAL INPATH "./dummy10m.tsv" into table raw_log_data; INSERT OVERWRITE TABLE log_data select uid, url from raw_log_data; select * from log_data limit 10; uid_387616 http://maru/page/2197 uid_734748 http://maru/page/6714 uid_982660 http://maru/page/6854
3.spark-shellのYARNクライアントモード
spark-shellをYARNクライアントモードで起動し、
[hive@slave_node1]$ spark-shell --master yarn-client scala> Hiveテーブルを扱うため、Hiveのパッケージをインポート import org.apache.spark.sql.hive.HiveContext SparkContextでspark-shellを起動すると、scは自動的に作られる val hiveCtx = new HiveContext(sc) HiveQLでSQLを実行 val rows = hiveCtx.sql("select uid, url from noda_conoha_sd4.log_data") 重複行を削除 val rdd = rows.distinct.rdd map形式に変換 (ユーザID => ページURL) val pairs = rdd.map( u => (u{0}.asInstanceOf[String], u{1}.asInstanceOf[String])) ユーザIDをキーにデータを集約する (ユーザID => ページ URL1, ページURL2) val group = pairs.groupByKey() データを10個取り出す group.take(10)
結果は次のようになります。
Array[(String, Iterable[String])] =
Array((uid_203708,CompactBuffer(http://maru/page/1909,
http://maru/page/4316),(uid_66952,CompactBuffer(http://maru/page/3593,http://maru/page/3497),
……略……
イメージ保存機能を利用したサーバのチューニング
今回は、
Nginxの設定
図2は、

- 1.イメージを保存する
ConoHaのインスタンスは、
起動中はイメージを保存できないため、 いったんWEB1を停止してからイメージを保存します。イメージのネームタグは 「WEB1_ COPY」 とします (図3)。 - 2.イメージから新規インスタンスを構築する
WEB1を再起動し、
試験で使用可能な状態に戻します。次に、 メニューの 「サーバー追加」 からイメージタイプ 「保存イメージ」 を選択し、 ネームタグから 「WEB1_ COPY」 を選択して、 新規インスタンスをWEB3として1台構築します (図4)。 - 3.Nginxをチューニングする
WEB3単体でNginxをチューニングします。httperfなどでHTTPのパフォーマンスを見ながら、
nginx. confの最適値を算出します。 - 4.チューニングしたインスタンスをコピーする
WEB3のチューニングが終わったらWEB3を停止します。WEB3の保存イメージを作成し、
そのイメージを使用してさらにインスタンスWEB4を1台構築します。ロードバランサ経由でのHTTPアクセスの動作を確認するため、 ロードバランサ2を追加します。 ロードバランサ2をWEB3とWEB4に接続し、
ロードバランサ経由でのチューニングと動作確認を行います。結果に問題がなければ、 夜間などWebインスタンスが使用されていないときに、 ロードバランサ1に紐付けているWEB1とWEB2を、 それぞれWEB3とWEB4に変更します。インスタンスを残しておくと料金が発生するため、 WEB1、 WEB2、 ロードバランサ2は削除します [1]。


[noda@gateway]# httperf --server=WEB3のIP --port=80 --uri=/index.html --rate 5000 --num-conn 250000 --num-call 5 --timeout 5
まとめ
今回は、