ikemonn's blog

技術ネタをちょこちょこと

Session Guarantees for Weakly Consistent Replicated Data を読んだ

Session Guarantees for Weakly Consistent Replicated Data を読んだ時のメモ。

どんなもの?

  • weak consistencyのread-any, write-anyの特性を活かしつつも、ある1つのクライアントからは一貫性があるように見えるようにしたもの
    • モバイル端末のユーザはread/writeで別のサーバに接続しに行く時があるので、従来のweak consistencyではinconsistencyに気づく可能性があった
    • 下記の4つのモデルがポイント
      • Read Your Writes
      • Monotonic Reads
      • Writes Follow Reads
      • Monotonic Writes

先行研究とくらべて何がすごい?

  • atomic transactionとは異なる
    • sessionという概念の導入
      • アプリケーションが実行する、ひとつづきのread/writeを抽象化したもの
    • 複数のサーバに接続先が切り替わっても大丈夫

技術や手法の肝は?

  • Read Your Writes
    • 同一セッション内であれば、以前に行われた全てのwriteの結果をreadできる
      • 「DBを更新したあとすぐに書き込んだ値を読めるか」という、ユーザやアプリケーションを悩ませる問題にモチベートされた
    • 例) Webサービスで、パスワードを変更したあとすぐに新しいパスワードでログインしようとすると、データの更新が伝播されていない場合にログインできない時がある
      • ログインプロセスが新しいパスワードに更新される前のサーバと通信したから
      • Read Your Writesが担保されていると、パスワード更新後に別のサーバにつなぎに行っても新しいパスワードでログインできる
  • Monotonic Read
    • 同一セッション内で、あるプロセスがreadした後に、再度readをすると前回と同じかそれより新しいデータをreadすることができる
    • 例) 分散DBを利用しているメールシステムで、あるメールを読んだ後にreloadして別のサーバに繋ぎにいっても、前回読んだメールは全て読める
  • Write Follow Reads
    • 同一セッション内で、あるreadに後続するwriteは、前回のread時に取得した値かそれより新しい値に対して行われる
    • 例) 掲示板の書き込みで、前回読み込んだ記事への返信を書き込む場合、別のサーバにアクセスしても、その記事が読み込める限り返信が書き込まれている

どうやって有効と検証した?

議論はある?

  • session guaranteeを導入することで、可用性を損なう可能性がある
    • 実用上は問題なさそう

次に読むべき論文は?

Coda: A Highly Available File System for aDistributed Workstation Environment

分散システム 原理とパラダイム の同期について読んだ

Time, Clocks, and the Ordering of Events in a Distributed System を読んでいたが、ぼんやりとしか分からなかったので、まず下記の同期の章を読んだ。

分散システム―原理とパラダイム

分散システム―原理とパラダイム

  • 作者: アンドリュー・S.タネンバウム,マールテン・ファンスティーン,Andrew S. Tanenbaum,Maarten van Steen,水野忠則,東野輝夫,宮西洋太郎,鈴木健二,西山智,佐藤文明
  • 出版社/メーカー: ピアソンエデュケーション
  • 発売日: 2003/10
  • メディア: 単行本
  • クリック: 25回
  • この商品を含むブログ (20件) を見る

物理的クロック

  • クロック
    • 時間追跡管理回路(circuit for keeping track of time)
    • 時計ではなく、タイマの方が適切
    • 水晶で出来ている
      • 水晶は種類、切削方法、圧力の大きさに依存して明確に定義された周波数で発振する
    • 各水晶には2つのレジスタがある
    • 水晶は一回の発振ごとにカウンタを1減らす
      • カウンタが0になると割り込みが発生され、カウンタは保持レジスタから再設定される
      • 各割り込みはclock tickと呼ばれる
    • クロックの同期のズレをclock skewと言う
    • 原子時計
      • 原子の遷移をみる

クロック同期アルゴリズム

  • 全てのアルゴリズムは下記のモデルに基づいている
    • 各マシンは1秒にH回の割り込みを発生するタイマをもっていると仮定する
    • このタイマが動いた時、割り込み処理ハンドラは1をソフトウェアクロックに加える
    • ソフトウェアクロックは同意がなされている過去のある時点以来のtickの数を追跡管理している(C)
    • UTC時間がtのとき、マシンpのクロック地はCp(t)である
    • 完全な世界では全てのpとtに対して、Cp(t) = tである
    • だが実際はずれる

Cristioanのアルゴリズム

  • 正確な時間を持つtime serverがいるとする
  • 短い周期で各マシンが現在の時間を問い合わせるために時間サーバにメッセージを送る
  • 各マシンが時間を受け取ると、調整した後に自己のクロックをその時間にあわせる
  • 各マシンが返ってきた時間をそのまま自己クロックに適用させると問題が起こる
    • 時間サーバへの要求送信と、返答の到着との時間間隔を正確に記録する

Berckeleyのアルゴリズム

  • Cristioanのアルゴリズムにおいて、time serverは受動的
  • Berckeleyのアルゴリズムでは時間デーモンが、各マシンにそれぞれが保持している時間について時々問い合わせる
  • その返答に基づいてサーバは平均の時間を計算し、全ての他のマシンにその時間を進める/戻すように通知する

論理クロック

  • クロックの同期は可能であるが、絶対的である必要はない
  • 通常問題になるのは、全てプロセスが現在の時間について正確に同意しているということではなく、イベントの発生順序について同意しているということ

Lamportのタイムスタンプ

  • 事前発生
    • a -> b は「aはb以前に発生する」
      • aがまず発生し、次にイベントbが発生していることに全てプロセスが同意していることを示す
    • 事前発生関係は次の2つの状況において直接的に観察できる
      • もしaおよびbが同じプロセスにおけるイベントで、aがb以前に発生するならa->b
      • もしaが1つのプロセスによって送信されたメッセージのイベントであり、bが別のプロセスによって受信されたそのメッセージのイベントなら、a->bは真
  • それぞれのイベントaに対して、全てのプロセスが合意する時間時C(a)を割り当てられることが重要
    • a->bならばC(a) > C(b)
  • クロック時間Cは常に増加しなければならない
    • 減算してはいけない

f:id:ikemonn:20170704214130p:plain

    • 各プロセスが異なる速度で動作しているとする
    • 避けるべき例
      • A: プロセス0(以降p0)がp1にメッセージを送ると、p1は10かかったと判断する
      • B: p1がp2にメッセージを送ると16かかったと判断する
      • C: p2がp1にメッセージを送ると-4となり明らかに不正である
    • 対応方法
      • C: p2では60に出発しているので60以降でないといけない、60未満であれば出発時の時間+1になるように自己クロックを進める
  • 必ずイベント間には1のtickがある
  • この方法を利用すると次の条件にしたがって、分散システムにおけるすべてのイベントに時間を割り当てられる
    • 同じプロセスに置いて、aがb以前に発生するなら C(a) < C(b)
    • もしaおよびbがそれぞれメッセージ送信および受信を表すなら C(a) < C(b)
    • すべての区別できるイベントaおよびbに対してC(a) ≠ C(b)

ベクトルタイムスタンプ

  • Lamportタイムスタンプでは、2つのイベントの順序については言及できるが関係については言及できない
  • C(a) < C(b)でも、必ずしも本当にaがbより以前に発生したことを意味しない
  • たとえば掲示版を考える
    • 投稿と反応の2種類があると仮定する
    • もしメッセージBがメッセージAのあとで配信されたとしても、Bは投稿Aへの反応とは限らない
      • 因果関係を把握していない
  • 因果関係はベクトルタイムスタンプで把握できる
    • イベントaに割り当てられたベクトルタイムスタンプVT(a)はイベントbにたいして、VT(a) < VT(b)なら、aはbに因果的に先行する
  • 各プロセスPiに次の2つの特性をもつ、1つのベクトルViを維持させることによって構成される
    • Vi[i]はPiにおいて今まで発生したイベントの数である
    • もしVi[j] = kならPjにおいて、k個のイベントが発生したとPiは判断する

Cassandra - A Decentralized Structured Storage System を読んだ

Cassandra - A Decentralized Structured Storage Systemを読んだ時のメモ。

どんなもの?

  • 分散ストレージシステム
  • 大規模データを多数の一般的なサーバに分散させることで可用性を高めて、SPOFを無くす
  • ソフトウェア側で可用性とスケーラビリティをコントロールしている
  • read efficiencyを犠牲にすることなくwrite throughputを高めている
  • FacebookのInbox Searchのストレージのニーズを満たすために作られた

先行研究とくらべて何がすごい?

技術や手法の肝は?

  • DataModel
    • Table
      • 1つのkeyでindexされた多次元マップ
      • valueは高度に構造化されたobject
      • row keyはsize limitのないstring(16~36byte)
      • 1つのrow key下での全ての操作はどれだけ多くのカラムが読み書きされていようがレプリカ単位でatomic
      • カラムはカラムファミリと呼ばれる単位でグルーピングされる
      • カラムを時間もしくは名前でソートできる
  • API

    • insert(table,key,rowMutation)
    • get(table,key,columnName)
    • delete(table,key,columnName)
  • System Architecture

    • Partitioning
      • scale incrementaly(線形にスケールすること?)が設計のキー
        • クラスタ内のNodeをまたいで動的にデータをパーティショニングする必要がある
      • consistent hashingを利用
        • Dynamoとは異なり、リング状の負荷情報を解析して負荷を軽減させるために、負荷の軽いノードを移動させる
          • 設計と実装がシンプルになった
  • Replication

    • Coordinator Nodeにk個のkeyが割り当てられ、それをN-1個のホストにレプリケーションする
      • Rack Unaware, Rack Aware, Datacenter Awareなどのpolicyがある
    • zookeeperを用いてリーダーの選出を行う
    • リーダがどのレンジをレプリケートするかを伝える
    • どのノードがどのレンジを受け持つかのメタデータは、各ノードのlocalとzookeeper内にキャッシュされる
    • 全てのノードは他のノードのことを知っている
  • Membership

    • anti-entropy Gossip base protocolにもとづいている
  • Failure Detection

    • Φ Accrual Failure Detectorを修正したものを使っている
      • ノードの生死をboolean値で送るのではなく、suspection levelを送る
        • Φ=1の時に我々が間違っている尤度は約10%
  • Bootstrapping

    • ノードが起動するときにリング上の自分の位置に関するランダムトークンを選択する
      • このmappingは耐障害性のためにlocalとzookeeperにも保存される
    • トークン情報はcluster中に伝搬されていく
      • 我々が全ノードとそれらのリング上での位置を知る方法
      • どのノードでもキーのリクエストを正しいノードにルーティングできるようにする
  • Scaling the Cluster

    • 新しいノードは負荷の高いノードの負荷を軽減するトークンが割り当てられる
      • 他のノードが担当していたレンジを新しいノードが分割して担当するということ
  • Local Persisitence

    • 典型的なwrite
      • 永続性とリカバリのためにcommit logに書き込みが成功した後、in memoryのデータを更新する
      • commit logの書き込みはシーケンシャルなので、diskのthroughputを最大化させるために専用のdiskを使っている
      • in memoryのデータが一定の閾値を超えた時に、diskにダンプされる
      • ファイルがdiskに沢山存在するように成るので、バックグラウンドでマージ処理が走る
    • 典型的なread
      • disk上のファイルを探す前にin memoryのデータから探す
      • 新しいものから古いものへと順に検索される
      • disk上の複数のファイルから探すことになったときのために、bloom filterがデータとともに保存されており、in memoryにも存在する
      • disk上の全てのカラムをスキャンすることを避けるために、カラムインデックスを保持する

どうやって有効と検証した?

議論はある?

  • 今後は下記のサポートを行っていきたいと考えている
    • compression
    • atomicity across key
    • secondary index

次に読むべき論文は?

A highly available file system for a distributed workstation environment

MapReduce: Simplified Data Processing on Large Clusters を読んだ

MapReduce: Simplified Data Processing on Large Clustersを読んだ時のメモ。

どんなもの?

  • 巨大なデータセットを処理するプログラミングモデル
    • Map
      • key/valueのinputを中間的なkey/valueペアにする
    • Reduce
      • 全ての中間的なvalueを中間的なkeyでまとめる
  • 多数の一般的なマシン上で並列実行される
  • run-timeがinput-dataのパーティショニングや処理のスケジューリング、エラーハンドル等をしてくれるので、プログラマは意識しなくてもOK

先行研究とくらべて何がすごい?

  • 並列分散システムを使ったことのないプログラマでも簡単に使える

    • 並列化、耐障害性、localy optimization、負荷分散を透過的に行う
  • MapReduceの処理で様々なデータを生成出来る

  • 何千台ものマシンにスケールしても動くように作られている

技術や手法の肝は?

  • Lisp関数型言語のmap/reduceに着想を得た
  • map
    • inputを受け取って中間的なkey/valueのペアを作る
    • key I のすべての値をgroup化してreduceに渡す
  • reduce
    • 中間的なkey Iとその値を受け取ってマージする
    • 一般的にreduce後には0もしくは1つの値が作られる

f:id:ikemonn:20170702162202p:plain

処理概要

  1. input fileを16~64MBに分割する & プログラムのコピーをクラスタに配布する
  2. masterがworkerにtaskをassignしていく
    • M個のmapタスクとR個のreduceタスクが生成される
    • masterがidle中のworkerにtaskをassignしていく
  3. map taskのworkerは担当のinput fileを読み込みに行く
    • key/valueのデータをparseしてmap関数にかけていく
    • map関数によって生成されたkey/valueのデータはmemoryにのる
  4. memory上のデータがパーティショニング関数によってR regionに分割され、local diskに書き込まれる
    • ファイルが書かれたlocal diskの場所をmasterに通知し、masterはreduce workerに場所を伝える
  5. 中間データの位置を通知されたreduce workerはRPCを用いてmap workerのlocal diskからデータを読み取りに行く
    • データを読み終わったらkeyをグルーピングするためにソートする
  6. ソートされたデータを舐めていき、reduce関数にかけていく
    • reduce関数の結果はout putファイルにappendされる
  7. mapreduceの処理が終わるとmasterがユーザプログラムに通知する

  8. MasterDataに含まれる情報

    • 各map taskとreduce taskの状態情報を保持している(idle, in-progress, complete)
    • worker machineの情報
    • mapによって生成された中間ファイルの場所とサイズ
  9. Fault Tolerance

    • worker failure
      • masterはworkerにpingを定期的に送る
      • 返ってこない場合はfailedとmarkする
      • そのworkerにassignされていたtaskは、idle statusに戻って他のworkerにassignされる
    • task failure
      • map taskは再実行される
        • mapの結果ファイルがどの場所にあるかわからないため
      • reduce taskは再実行されない
    • master failure
      • master dataにcheckpointを書き出しておく
      • master taskが死ぬと最後に書き込まれたcheckpointから処理が再実行される
        • masterが死んだタイミングでMapReduceのtaskを全部abortしてretry
  10. Locality
    • ネットワーク帯域がボトルネックになっていた
      • map taskを対象のデータを持っているmachine上のworkerで行わせた
      • 無ければ同じネットワーク内のmachineで実行
  11. Backup

    • ハズレのmachineにあたると処理が遅くなり、全体の処理完了までの時間もそのmachineの処理に引きずられて遅くなる
    • MapReduceのtaskが終わりに近づくと、masterがin-progressなtaskのbackup実行をスケジュールする
      • in-progressの処理か、backup処理の一方が終わればそのtaskは完了
    • backup処理を追加することで数%の処理量が増えるように調整し、その結果backupありの方が44%速くなった
  12. 学び

    • プログラミングモデルを限定することで、並列分散処理が簡単になった
    • ネットワーク帯域がボトルネックになった
      • 最適化のいくつかも通信量を減らすためのもの
    • 重複実行は遅いマシンや故障したマシンがいたときに全体の処理を早くするのとデータロスを防ぐのに役に立った

どうやって有効と検証した?

  • machine
    • 2GHzのプロセッサ
    • 4GBのメモリ
    • 160GBのdisk
    • 1800台のサーバ

f:id:ikemonn:20170702162358p:plain

  • Grep
    • 10^10個の100byteのrecordをscan
    • 80secくらいでmap taskが終わり、全体で150secほどかかる
    • start upのoverheadがある
      • 全てのworkerにプログラムを伝搬させる
      • 1000個のinput fileのopen
      • locality optimazation情報の取得

f:id:ikemonn:20170702162258p:plain

  • Sort

    • 10^10個の100byteのrecordをsort
    • a) Normal mapは200secで終わる
      • Shuffle時に2つの山があるのは、batch処理を行っている時
        • 300secくらいで初めの処理が終わり、600secで次が終わる
      • writeが完了するまでは850secくらいで、起動時のオーバヘッドを含めると891sec
    • b) Backupをなくしたとき
      • Backupを無くすとロングテールのグラフになる
        • 最後の5つのreduce taskが遅く、300secくらいかかった
        • 1283secほどかかった
    • c) Machine Failureしたとき
      • 200/1746 workerをkillした
      • re-executionが実行されて、通常時よりも5%ほど時間がかかった
  • MapReduceにしてよかったこと

    • コード量が減った
      • C++のコードが3800->700行に
    • パフォーマンスが良くなった
    • Googleの検索indexを作る処理が簡単に実行できるようになった
      • machineの故障やネットワーク異常があってもMapReduceが検知して再実行してくれる
    • マシンを追加してスケールさせることでパフォーマンスも上がっていくようになった

次に読むべき論文は?

WEBSEARCH FOR A PLANET:THE GOOGLECLUSTER ARCHITECTURE

The Dataflow Model を読んだ

The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processingを読んだ

どんなもの?

  • unbounded で順不同なデータを処理する上で、正確性、latency、costをいい感じにするアプローチについての考察

先行研究とくらべて何がすごい?

  • 既存の考え方では、unboundedなデータに終わりがあると考えているのがおかしい
    • データの総量がどれくらいで、全てのデータがいつ揃うかということは考えない
    • ただ新しいデータが来て、古いデータは撤回されるということだけを考える

技術や手法の肝は?

  • 下記が肝
    • windowing model
    • triggering model
    • incrementral processing model
    • scalable implementation

f:id:ikemonn:20170701004232p:plain

  • Windowing
    • データセットを処理の固まりに分けていくこと
    • 時間ベースで分割する
    • 3種類のwindow
      • fixed
        • 固定長のwindow(時間、日)
        • windowはオーバラップしない
      • sliding
        • 固定長のwindow
        • 時間でslideしていく
        • windowはオーバラップする
        • fixed windowはsliding windowの特殊な場合
      • session
        • activityのある期間をcaptureしたもの
        • timeoutまでの期間を定義する

f:id:ikemonn:20170701004251p:plain

  • Time domain
    • Event Time
      • イベントが実際に起こった時間
    • Processing Time
      • pipelineでそのイベントが観測された時間
  • Window Mergingの流れ

f:id:ikemonn:20170701004316p:plain

  • Triggers & Incremental Processing

    • Windowing
      • event timeのどの時点でデータがグループ化されるのかを決定する
    • Triggering
      • processing timeのどの時点でグループ化の結果がemitされたかを決定する

どうやって有効と検証した?

  • FlumeJavaとMillWheelを用いて、Googleの実際のプロダクトに実装した
    • Youtube(Session window), Recommendation(Processing Time Triggers)等

議論はある?

次に読むべき論文は?

MapReduce: Simplified Data Processing on Large Clusters

The Chubby lock service for loosely-coupled distributed systems を読んだ

The Chubby lock service for loosely-coupled distributed systemsを読んだ。

ロックについての基礎知識が無くて全然わからなかった。

どんなもの?

  • 分散システム用のlock service
    • coarse-grained lock
    • GFS, BigTable, MapReduceが共有リソースへのアクセスの同期にChubbyを使っている
    • パフォーマンスよりも可用性と信頼性に重きをおく
    • 多数(数万単位)の小さなmachineから成る分散システム対象

先行研究とくらべて何がすごい?

  • 分散合意問題を非同期コミュニケーションを用いて解決した(Paxos)
  • 小さなサイズのメタデータを全replicaにsyncさせて可用性を担保している

技術や手法の肝は?

  • System構成
    • Chubby cellは5つのレプリカで構成されている
    • Paxosでmasterを選出している
    • レプリカが死ぬと数時間後に free poolから新たなレプリカが選出される
  • Event
    • Chubby Clientはeventをsubscribeできる
      • fileの変更
      • child nodeの追加/削除
      • masterのfailover
      • lockの異常

議論はある?

  • ネットワークが混雑している時にTCPベースのKeepAliveだとsession lostが多くなる
    • UDPを使うようにした

次に読むべき論文は?

趣向を変えて

The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing

The Google File System を読んだ

The Google File System を読んだ。

どんなもの?

  • Googleのサービスの特性に最適化させた、大規模分散システム用のスケーラブルなファイルシステム
    • 廉価で故障しやすいHW上で稼働することを前提とし、耐障害性がある
    • aggregateのパフォーマンスが高い
    • BigTableにも使われてる

先行研究とくらべて何がすごい?

  • 何千台もの一般的なサーバで動く
    • 故障するのが前提で耐障害性とリカバリを担保
  • 一般的なファイルシステムに比べて、大きなファイルを扱うことを前提としている
    • KB単位ではなくGB単位
  • 新しいデータはappendしてoverwriteしない
  • レスポンスタイムよりも帯域幅に重きを置いている
    • サービスの特性上bulk処理が大半を占めるから

技術や手法の肝は?

f:id:ikemonn:20170627234137p:plain

  • Architecture
    • single masterとmulit chunkserverで構成されている
    • fileは固定サイズ(64MB)のchunkに分割される
      • 利点
        • ファイルサイズが大きいので、小さいファイルに比べて何度もやりとりをする必要がなく通信のoverheadが少ない
        • 通信の回数が少なくなるのでmasterサーバへの負荷が少なくなる
        • masterサーバに保存されるmetaデータのサイズが小さくなる
      • 欠点
        • 同じファイルへのアクセスが多いとhotspotになる
          • Googleのサービスの特性上、複数のchunkファイルをシーケンシャルに読むのであまり問題にならない
    • chunkserverはlocalにchunkをlinuxのファイル形式で保存する
      • ファイル操作はchunck handle(uniqueなidのようなもの)とrangeを使っておこなわれる
    • master server
    • メタデータ
      • 3種類ある
        1. fileとchunkのnamespace (operation logとin memory)
        2. fileからchunkへのmapping (operation logとin memory)
        3. . chunkのreplicaがどのchunkserverになるか (in memoryのみ)
    • chunk server
      • デフォルトで3台のchuckサーバにデータがreplicateされる(primaryとsecondary)

f:id:ikemonn:20170627234152p:plain

Interaction

  1. clientがmasterに問い合わせて、対象chunk&現在leaseを持っているserverと他のreplicaの情報を得る
  2. masterはprimaryとsecondaryの情報をclientに返す(clientはこれをcacheする)
  3. clientがデータを任意の順番で全てのreplicaにpushする
  4. 全てのreplicaにデータが行き届いたら、clientがprimaryにwrite requestを送る
  5. primaryがsecondaryにwrite requestを送る
  6. 全てのsecondaryがprimaryに書き込み完了を通知すると処理が終わる
  7. primaryがclientに通知(エラーがあったらclient側でretryする)

  8. GC

    • アプリケーションによってファイルが消されても、masterはdelete処理をlogに書き込むだけで実体はすぐには消さない
      • renameされて、deletetion timestampが振られる
    • masterのファイルシステムnamespaceのregular scanの際に、3日(変更可能)が過ぎたファイルは削除される

どうやって有効と検証した?

f:id:ikemonn:20170627234211p:plainf:id:ikemonn:20170627234215p:plain

  • A: R&Dで数百人のエンジニアが利用しているCluster
  • B: 本番データで利用しているCluster

  • readはwriteよりも速い

    • 本番環境でもwriteよりもreadの処理が多いことを想定している
    • BがAよりもreadの効率が悪い
  • master opsは200~500 Ops/sを安定して出せている
    • masterの処理がボトルネックにならない
    • 初期のGFSではmasterの処理がボトルネックになっていた(sequential scan)が、master data構造をnamespaceでbinary searchするのに効率的な構造にすると速くなった

議論はある?

  • さまざまな問題の中でも一番大きかったのはdiskとLinux関係の問題
    • 複数versionのIDE protocolをサポートしているようにように思えたdiskが、実は最新バージョンしかサポートしてなかった
      • だいたい動くけれど、たまに動かなくてデータが壊れるのでchecksumをいれるようにした

次に読むべき論文は?

↑も面白そうだけれど、Chubbyを読む