ikemonn's blog

技術ネタをちょこちょこと

MapReduce: Simplified Data Processing on Large Clusters を読んだ

MapReduce: Simplified Data Processing on Large Clustersを読んだ時のメモ。

どんなもの?

  • 巨大なデータセットを処理するプログラミングモデル
    • Map
      • key/valueのinputを中間的なkey/valueペアにする
    • Reduce
      • 全ての中間的なvalueを中間的なkeyでまとめる
  • 多数の一般的なマシン上で並列実行される
  • run-timeがinput-dataのパーティショニングや処理のスケジューリング、エラーハンドル等をしてくれるので、プログラマは意識しなくてもOK

先行研究とくらべて何がすごい?

  • 並列分散システムを使ったことのないプログラマでも簡単に使える

    • 並列化、耐障害性、localy optimization、負荷分散を透過的に行う
  • MapReduceの処理で様々なデータを生成出来る

  • 何千台ものマシンにスケールしても動くように作られている

技術や手法の肝は?

  • Lisp関数型言語のmap/reduceに着想を得た
  • map
    • inputを受け取って中間的なkey/valueのペアを作る
    • key I のすべての値をgroup化してreduceに渡す
  • reduce
    • 中間的なkey Iとその値を受け取ってマージする
    • 一般的にreduce後には0もしくは1つの値が作られる

f:id:ikemonn:20170702162202p:plain

処理概要

  1. input fileを16~64MBに分割する & プログラムのコピーをクラスタに配布する
  2. masterがworkerにtaskをassignしていく
    • M個のmapタスクとR個のreduceタスクが生成される
    • masterがidle中のworkerにtaskをassignしていく
  3. map taskのworkerは担当のinput fileを読み込みに行く
    • key/valueのデータをparseしてmap関数にかけていく
    • map関数によって生成されたkey/valueのデータはmemoryにのる
  4. memory上のデータがパーティショニング関数によってR regionに分割され、local diskに書き込まれる
    • ファイルが書かれたlocal diskの場所をmasterに通知し、masterはreduce workerに場所を伝える
  5. 中間データの位置を通知されたreduce workerはRPCを用いてmap workerのlocal diskからデータを読み取りに行く
    • データを読み終わったらkeyをグルーピングするためにソートする
  6. ソートされたデータを舐めていき、reduce関数にかけていく
    • reduce関数の結果はout putファイルにappendされる
  7. mapreduceの処理が終わるとmasterがユーザプログラムに通知する

  8. MasterDataに含まれる情報

    • 各map taskとreduce taskの状態情報を保持している(idle, in-progress, complete)
    • worker machineの情報
    • mapによって生成された中間ファイルの場所とサイズ
  9. Fault Tolerance

    • worker failure
      • masterはworkerにpingを定期的に送る
      • 返ってこない場合はfailedとmarkする
      • そのworkerにassignされていたtaskは、idle statusに戻って他のworkerにassignされる
    • task failure
      • map taskは再実行される
        • mapの結果ファイルがどの場所にあるかわからないため
      • reduce taskは再実行されない
    • master failure
      • master dataにcheckpointを書き出しておく
      • master taskが死ぬと最後に書き込まれたcheckpointから処理が再実行される
        • masterが死んだタイミングでMapReduceのtaskを全部abortしてretry
  10. Locality
    • ネットワーク帯域がボトルネックになっていた
      • map taskを対象のデータを持っているmachine上のworkerで行わせた
      • 無ければ同じネットワーク内のmachineで実行
  11. Backup

    • ハズレのmachineにあたると処理が遅くなり、全体の処理完了までの時間もそのmachineの処理に引きずられて遅くなる
    • MapReduceのtaskが終わりに近づくと、masterがin-progressなtaskのbackup実行をスケジュールする
      • in-progressの処理か、backup処理の一方が終わればそのtaskは完了
    • backup処理を追加することで数%の処理量が増えるように調整し、その結果backupありの方が44%速くなった
  12. 学び

    • プログラミングモデルを限定することで、並列分散処理が簡単になった
    • ネットワーク帯域がボトルネックになった
      • 最適化のいくつかも通信量を減らすためのもの
    • 重複実行は遅いマシンや故障したマシンがいたときに全体の処理を早くするのとデータロスを防ぐのに役に立った

どうやって有効と検証した?

  • machine
    • 2GHzのプロセッサ
    • 4GBのメモリ
    • 160GBのdisk
    • 1800台のサーバ

f:id:ikemonn:20170702162358p:plain

  • Grep
    • 10^10個の100byteのrecordをscan
    • 80secくらいでmap taskが終わり、全体で150secほどかかる
    • start upのoverheadがある
      • 全てのworkerにプログラムを伝搬させる
      • 1000個のinput fileのopen
      • locality optimazation情報の取得

f:id:ikemonn:20170702162258p:plain

  • Sort

    • 10^10個の100byteのrecordをsort
    • a) Normal mapは200secで終わる
      • Shuffle時に2つの山があるのは、batch処理を行っている時
        • 300secくらいで初めの処理が終わり、600secで次が終わる
      • writeが完了するまでは850secくらいで、起動時のオーバヘッドを含めると891sec
    • b) Backupをなくしたとき
      • Backupを無くすとロングテールのグラフになる
        • 最後の5つのreduce taskが遅く、300secくらいかかった
        • 1283secほどかかった
    • c) Machine Failureしたとき
      • 200/1746 workerをkillした
      • re-executionが実行されて、通常時よりも5%ほど時間がかかった
  • MapReduceにしてよかったこと

    • コード量が減った
      • C++のコードが3800->700行に
    • パフォーマンスが良くなった
    • Googleの検索indexを作る処理が簡単に実行できるようになった
      • machineの故障やネットワーク異常があってもMapReduceが検知して再実行してくれる
    • マシンを追加してスケールさせることでパフォーマンスも上がっていくようになった

次に読むべき論文は?

WEBSEARCH FOR A PLANET:THE GOOGLECLUSTER ARCHITECTURE