はじめに
こんにちは、
ビッグデータという言葉が浸透して、
ビッグなデータを処理するには相応の計算パワーが必要です。分散処理システムを使って対処するのは一般的な方法ですが、
Hadoop MapReduceにおいては、
しかしながら、
何台ものサーバをセットアップし、
そもそも、
我々
「実例で学ぶAWS」
- Javaで実装する
- CloudFrontのログを解析する
- EMRクラスタとEC2インスタンスとが通信する
第3回は、
CloudFront & EMR
今回はAWSが提供するContents Delivery Network
CloudFrontとEMRは相性がよく、
CloudFrontは静的コンテンツの配信だけではなく、
すなわち、
cosmiでも、
EMRを使う、その前に
HadoopはMapReduceのJava実装ですので、
ソースコードをコンパイルするにはJavaのコンパイラとHadoop Commonのライブラリが必要です。ダウンロードして、
執筆時点
ローカル環境だけでの簡単な動作確認なら、
もし自前の分散実行環境を用意するのであれば、
MapReduceとは
Hadoop MapReduceのプログラムを書く前に、
MapReduceは分散コンピューティングのための計算モデルです。入力となるキーと値のペアを、
Map処理とReduce処理はそれぞれキーの種類ぶん複数回実施されます。それぞれの処理は入力に応じた出力さえ算出すればいいので、
なお、
CloudFrontのログを処理するプログラムを書く
まずは、
2011-10-18 07:00:32 HKG1 748 123.456.789.012 GET xxx.cloudfront.net /index.html 200 - Mozilla/5.0... foo=bar&hoge=fuga
2011-10-18 22:04:50 HKG1 14534 234.567.890.123 GET xxx.cloudfront.net /picture.jpg 200 - Mozilla/5.0... foo=bar&hoge=fuga
2011-10-19 01:08:15 HKG1 748 219.118.174.241 GET xxx.cloudfront.net /index.html 404 - Mozilla/5.0... foo=bar&hoge=fuga
このログを読み込み、
目的の処理を、
- Map処理
- ファイル中の行番号→その行のログ内容
(タブ区切り) というキーバリューペアを受け取り、 1カラム目 (日付) を取得して、 日付→アクセス数 (1) のキーバリューペアを書き込みます。 - Reduce処理
- 日付→アクセス数の配列のキーバリューペアを受け取り、
配列の中身をすべて足しこんで日付に対応するアクセス数を算出します。
なお、
必要があればユーザが自作の入力フォーマットを定義することも可能です。
上記の内容に沿って作成したプログラムを以下に示します。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;
public class DailyAccessCounter {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = new Job(conf);
job.setJarByClass(DailyAccessCounter.class);
job.setMapperClass(CloudFrontLogToDateMapper.class);
job.setReducerClass(SumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
}
}
class CloudFrontLogToDateMapper extends Mapper{
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] log = value.toString().split("\t");
if(log.length != 12)
return;
String dateString = log[0];
context.write(new Text(dateString), new LongWritable(1));
}
}
class SumReducer extends Reducer{
protected void reduce(Text statusCode, Iterable counts, Context context)
throws IOException, InterruptedException {
long sum = 0;
for(LongWritable count:counts){
sum += count.get();
}
context.write(statusCode, new LongWritable(sum));
}
}
このプログラムをDailyAccessCounter.
$ export HADOOP_HOME=/path/to/hadoop $ javac -cp "$HADOOP_HOME/hadoop-core-0.20.203.0.jar:$HADOOP_HOME/lib/*" DailyAccessCounter.java
サンプルデータを入力して、
$ export JAVA_HOME=/path/to/jdk $ export HADOOP_CLASSPATH=. $ $HADOOP_HOME/bin/hadoop DailyAccessCounter sample.input.log output
計算が終わるとoutputディレクトリが作成されます。結果はpart-00000に格納されていますので、
2011-10-18 2
2011-10-19 1
次回は
今回はここまでです。
CloudFrontのログを解析するHadoop MapReduceのプログラムを作り、
次回はこのプログラムをEMRを使って実行します。