MapReduce: Simplified Data Processing on Large Clusters を読んだ
MapReduce: Simplified Data Processing on Large Clustersを読んだ時のメモ。
どんなもの?
- 巨大なデータセットを処理するプログラミングモデル
- Map
- key/valueのinputを中間的なkey/valueペアにする
- Reduce
- 全ての中間的なvalueを中間的なkeyでまとめる
- Map
- 多数の一般的なマシン上で並列実行される
- run-timeがinput-dataのパーティショニングや処理のスケジューリング、エラーハンドル等をしてくれるので、プログラマは意識しなくてもOK
先行研究とくらべて何がすごい?
並列分散システムを使ったことのないプログラマでも簡単に使える
- 並列化、耐障害性、localy optimization、負荷分散を透過的に行う
MapReduceの処理で様々なデータを生成出来る
何千台ものマシンにスケールしても動くように作られている
技術や手法の肝は?
- Lispや関数型言語のmap/reduceに着想を得た
- map
- inputを受け取って中間的なkey/valueのペアを作る
- key I のすべての値をgroup化してreduceに渡す
- reduce
- 中間的なkey Iとその値を受け取ってマージする
- 一般的にreduce後には0もしくは1つの値が作られる
処理概要
- input fileを16~64MBに分割する & プログラムのコピーをクラスタに配布する
- masterがworkerにtaskをassignしていく
- M個のmapタスクとR個のreduceタスクが生成される
- masterがidle中のworkerにtaskをassignしていく
- map taskのworkerは担当のinput fileを読み込みに行く
- key/valueのデータをparseしてmap関数にかけていく
- map関数によって生成されたkey/valueのデータはmemoryにのる
- memory上のデータがパーティショニング関数によってR regionに分割され、local diskに書き込まれる
- ファイルが書かれたlocal diskの場所をmasterに通知し、masterはreduce workerに場所を伝える
- 中間データの位置を通知されたreduce workerはRPCを用いてmap workerのlocal diskからデータを読み取りに行く
- データを読み終わったらkeyをグルーピングするためにソートする
- ソートされたデータを舐めていき、reduce関数にかけていく
- reduce関数の結果はout putファイルにappendされる
mapreduceの処理が終わるとmasterがユーザプログラムに通知する
MasterDataに含まれる情報
- 各map taskとreduce taskの状態情報を保持している(idle, in-progress, complete)
- worker machineの情報
- mapによって生成された中間ファイルの場所とサイズ
Fault Tolerance
- worker failure
- masterはworkerにpingを定期的に送る
- 返ってこない場合はfailedとmarkする
- そのworkerにassignされていたtaskは、idle statusに戻って他のworkerにassignされる
- task failure
- map taskは再実行される
- mapの結果ファイルがどの場所にあるかわからないため
- reduce taskは再実行されない
- 結果はグローバルなファイルシステムに書き込まれるため
- map taskは再実行される
- master failure
- master dataにcheckpointを書き出しておく
- master taskが死ぬと最後に書き込まれたcheckpointから処理が再実行される
- masterが死んだタイミングでMapReduceのtaskを全部abortしてretry
- worker failure
- Locality
- ネットワーク帯域がボトルネックになっていた
- map taskを対象のデータを持っているmachine上のworkerで行わせた
- 無ければ同じネットワーク内のmachineで実行
- ネットワーク帯域がボトルネックになっていた
Backup
- ハズレのmachineにあたると処理が遅くなり、全体の処理完了までの時間もそのmachineの処理に引きずられて遅くなる
- MapReduceのtaskが終わりに近づくと、masterがin-progressなtaskのbackup実行をスケジュールする
- in-progressの処理か、backup処理の一方が終わればそのtaskは完了
- backup処理を追加することで数%の処理量が増えるように調整し、その結果backupありの方が44%速くなった
学び
- プログラミングモデルを限定することで、並列分散処理が簡単になった
- ネットワーク帯域がボトルネックになった
- 最適化のいくつかも通信量を減らすためのもの
- 重複実行は遅いマシンや故障したマシンがいたときに全体の処理を早くするのとデータロスを防ぐのに役に立った
どうやって有効と検証した?
- machine
- 2GHzのプロセッサ
- 4GBのメモリ
- 160GBのdisk
- 1800台のサーバ
- Grep
- 10^10個の100byteのrecordをscan
- 80secくらいでmap taskが終わり、全体で150secほどかかる
- start upのoverheadがある
- 全てのworkerにプログラムを伝搬させる
- 1000個のinput fileのopen
- locality optimazation情報の取得
Sort
- 10^10個の100byteのrecordをsort
- a) Normal mapは200secで終わる
- Shuffle時に2つの山があるのは、batch処理を行っている時
- 300secくらいで初めの処理が終わり、600secで次が終わる
- writeが完了するまでは850secくらいで、起動時のオーバヘッドを含めると891sec
- Shuffle時に2つの山があるのは、batch処理を行っている時
- b) Backupをなくしたとき
- Backupを無くすとロングテールのグラフになる
- 最後の5つのreduce taskが遅く、300secくらいかかった
- 1283secほどかかった
- Backupを無くすとロングテールのグラフになる
- c) Machine Failureしたとき
- 200/1746 workerをkillした
- re-executionが実行されて、通常時よりも5%ほど時間がかかった
MapReduceにしてよかったこと