はじめに
今回は、ImpalaのSQL処理の高速化において重要な役割を占めるクエリ処理について説明します。
Impalaのクエリ処理の特徴
Impalaは、MapReduceやSparkをはじめとする既存の手続き型のデータ処理エンジンを使用せず、アドホックなSQLクエリの処理の高効率化に焦点を置いた設計と実装が特徴です。たとえば、結合方法を見てみると、MapやReduceもしくはMapReduceジョブなどのブロッキングオペレータ(第16回)を組み合わせていく処理エンジンにおいては、Impalaにおけるパイプライン結合処理などを実現することは必ずしも容易ではありません(第8回「Impala/Prestoにおける結合処理」)。
また、MapReduceやSparkでは中間データをディスクに書き込むことにより高い耐障害性を実現しますが、Impalaでは耐障害性を多少犠牲にしてメモリ上で処理を完結することにより高速化を実現します(第13回「高い耐障害性を実現する設計方針」)(※1)。
前回の説明にあったように、Impalaではコーディネータがクラスタの各ノードに処理を割り振ります。そのとき、事前に取得された統計情報を元にメモリの使用量なども推測し、当該処理の並列化方法を決定します。今回は、この並列化方法についてお話します。
まずHash Joinの処理を例にImpalaでの実行計画について説明し、次にノード間処理の並列化およびノード内処理の並列化について説明します。
実行計画とplan fragment
今回は、下記の「3つの表を等結合し、共通する値がいくつあるかを集計する」というクエリを例に話を進めていきます。
select count(colA) from tabA join tabB on colA=colB join tabC on colB=colC;
Impalaでは、クエリを受け取ると、RDBMSなどの一般的なSQL処理系と同様に(第4回)、クエリの実行計画を作成します。その際、Impalaのクエリコンパイラは、compute statsによって取得した統計情報を元に、コーディネータがメモリの使用量などを推測し、クラスタ内の各ノードに処理を均等に分配するように実行計画を作成します。上記のクエリを実行してImpalaのprofileコマンド[2]で出力した実行計画の例を以下に示します。
図1 実行計画の例
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail
---------------------------------------------------------------------------------------------------------------------
09:AGGREGATE 1 181.94ms 181.94ms 1 1 20.00 KB -1.00 B FINALIZE
08:EXCHANGE 1 73.592us 73.592us 3 1 0 -1.00 B UNPARTITIONED
05:AGGREGATE 3 270.619ms 292.402ms 3 1 8.64 MB 10.00 MB
04:HASH JOIN 3 35.811ms 77.643ms 36.63K 33.94M 50.05 MB 3.23 MB INNER JOIN, BROADCAST
|--07:EXCHANGE 3 33.631ms 36.165ms 769.23K 769.23K 0 0 BROADCAST
| 02:SCAN HDFS 1 357.375ms 357.375ms 769.23K 769.23K 8.66 MB 32.00 MB default.tabc
03:HASH JOIN 3 2s072ms 3s076ms 476.19K 33.33M 203.02 MB 5.99 MB INNER JOIN, BROADCAST
|--06:EXCHANGE 3 69.922ms 74.444ms 1.43M 1.43M 0 0 BROADCAST
| 01:SCAN HDFS 1 586.556ms 586.556ms 1.43M 1.43M 12.66 MB 48.00 MB default.tabb
00:SCAN HDFS 3 531.481ms 761.759ms 33.33M 33.33M 32.66 MB 176.00 MB default.taba
図1のOperator列の各行をそれぞれ見てみましょう。
- AGGREGATION
- 集約関数(上記のcountなど)の集約処理を表します。今回は列値によるgroup byを実施していないため結果が1行にまとまっていますが、集約を実施する場合Impalaはメモリ上にgroup byのキーでハッシュ表を作成することにより処理を実行します
- EXCHANGE
- あるノードにおいて、ほかのノードから再分配されたレコードを受け取る処理を表します。その際、データはほかのノードのメモリからネットワークなどを介して、当該ノードのメモリへと転送されます。
- HASH JOIN
- メモリ上にハッシュ表を作成して等結合を行う処理を表します。Impalaでは、ハッシュ表を内部表(今回のtabBとtabC)として、外部表(今回のtabA)の値をProbeし、結合処理を行います(第5回の「Impala/Presto」、第8回の「Impala/Prestoにおける結合処理」)。
- SCAN HDFS
- データの読み出し処理を表します。データの読み出し元は、HDFSが一般的ですが、HBaseやAmazon S3などのほかのストレージを用いることも可能です。
これらをふまえて実行計画を見てみると、SCANにより読み出されたtabB、tabCのレコードからハッシュ表を作成し(ビルドし)、それらに対してtabAのレコードを引き当てる(プローブする)という流れを読み取ることができるかと思います。図2では、tabB、tabCに共通する値(緑の行)等価なtabAの行が、JOINの結果、選択されていく状況を示しています。
図2 Hash Joinのイメージ図
Impalaの場合、コーディネータが作成した実行計画をさらにplan fragmentと呼ばれる単位に分割し、各impaladに分配して実行します。そのとき、統計情報やcatalogdから共有されたブロックの配置状況、およびstatestoreから得られたクラスタ上のimpaladの状況を参照することにより、HDFSに対する入出力のローカリティを考慮してplan fragmentの実行ノードを選択します[3]。
以下に、Impalaのprofileコマンドで出力したplan fragmentの例を示します。
図3 plan fragmentの例
F00:PLAN FRAGMENT [RANDOM]
DATASTREAM SINK [FRAGMENT=F03, EXCHANGE=08, UNPARTITIONED]
05:AGGREGATE
| output: count(colA)
| hosts=3 per-host-mem=10.00MB
| tuple-ids=3 row-size=8B cardinality=1
|
04:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: colB = colC
| hosts=3 per-host-mem=3.23MB
| tuple-ids=0,1,2 row-size=12B cardinality=33940820
|
|--07:EXCHANGE [BROADCAST]
| hosts=1 per-host-mem=0B
| tuple-ids=2 row-size=4B cardinality=769231
|
03:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: colA = colB
| hosts=3 per-host-mem=5.99MB
| tuple-ids=0,1 row-size=8B cardinality=33333334
|
|--06:EXCHANGE [BROADCAST]
| hosts=1 per-host-mem=0B
| tuple-ids=1 row-size=4B cardinality=1428572
|
00:SCAN HDFS [default.taba, RANDOM]
partitions=1/1 files=1 size=282.57MB
table stats: 33333334 rows total
column stats: all
hosts=3 per-host-mem=176.00MB
tuple-ids=0 row-size=4B cardinality=33333334
上記はHash JoinからAggregationまでを行うplan fragmentですが、例ではtabAの担当範囲を変えて、同じplan fragmentを3つのノードで実行しています。
図1の2つ目の列は#Hostsとなっていて、対応する処理がいくつのノードで実施されたかを示しています。
Hash Joinの並列化
第8回の「Impala/Prestoにおける結合処理」で述べられているように、ImpalaのHash Joinはパイプライン処理を行います。先のF00のplan fragmentにおいて、00:SCAN HDFSから得られたデータが05:AGGREGATEまで連続的につながっていることがパイプライン処理を表しています。
06:EXCHANGE(tabB)、07:EXCHANGE(tabC)で受けとっているのはハッシュ表となる側(ビルド側)のデータで、こちらが内部表です。
MapReduceで複数表の結合を行う場合、1つの結合処理が完了しから次の結合処理を行いますが、Impalaでは上記のようにデータを連続的に結合していく(すなわち、複数の結合を同時に実行する)戦略を取ります。
上記のハッシュ結合において、各ノードがすべてに関連レコードに対するハッシュ表を持っていれば00:SCAN HDFS(tabA)を複数のノードで分担して実施することで、05:AGGREGATEで各ノードの結合結果の行数を得るところまでノード間で互いに並列に実施できます。また、図3にあるように、TabAの行がハッシュ表にあるかを確認するのみなので、tabCのHash JoinはtabBのHash Join全体の完了を待たず並列に行うことができます。上記のplan fragmentのAggregationまでの処理を各ノードで完了した後、最終的にコーディネータに各ノードの集計結果が集められ、コーディネータが全体の集計を行い、クライアントにクエリ結果として返します。
なお、この方法はブロードキャスト結合(第7回)と呼ばれる方法であり、各ノードにおいて結合するすべてのハッシュ表を持つ必要があるため、メモリを大量に使用します。オプションとして、パーティション方式(分割結合)で行うことも可能です(実行計画のHASH JOINのdetail列にPARTITIONEDと表示されます)。
ノード内処理の並列化
これまで説明してきたノード間の並列化方法は、ノード内の処理の並列化においても用いることが可能です。これにより、各ノードが有する複数のCPUコアを有効に活用できる場合があります。
たとえば、図2におけるtabAのプローブ処理を見てみると、あるtabAレコードの当該処理と別のtabAレコードの当該処理は互いに独立であるため、それぞれを別々のスレッドを用いて同時に実行することが可能です。このように、各ノードに割り振られたplan fragmentを複数のスレッドで実行することにより、ノード内におけるplan fragmentの実行を高速化することができます。
LLVM JIT Compile
ここまでは、SQL処理の単位をクラスタからノード、プロセッサ(コア)へと分配させることにより処理を並列化する方法について見てきました。さらに、Impalaにおいては、LLVMを用いて実行時コード生成(JIT Compile)を行うことにより、CPU命令のパイプライン実行を行います。すなわち、プログラムのコンパイル時にはどのようなデータに対してどのようなクエリが実行されるかはわかりませんが、実行時にコード生成をすることにより、実際に処理するデータとクエリを元にCPU命令をスケジュールしてパイプライン化します。
また、その際、コンパイラの最適化によって不要なCPUのインストラクションをとりのぞくことができれば、処理を大幅に軽減することが可能です。このような最適化は、大量のデータを用いるクエリや複雑なクエリほど効果が大きいと考えられ、TPC-H Q1ベンチマークの実行結果ではLLVMの最適化の有無で5.7倍の差があったとされています[4]。
おわりに
いかがでしたでしょうか。今回は、Impalaのクエリ実行時の並列化の仕組みについて説明しました。ImpalaがSQLを実行する際に内部で行われる並列処理の方法を理解する助けになれば幸いです。
次回は、ImpalaのI/O処理における高速化の仕組みを紹介します。