Session Guarantees for Weakly Consistent Replicated Data を読んだ
Session Guarantees for Weakly Consistent Replicated Data を読んだ時のメモ。
どんなもの?
- weak consistencyのread-any, write-anyの特性を活かしつつも、ある1つのクライアントからは一貫性があるように見えるようにしたもの
- モバイル端末のユーザはread/writeで別のサーバに接続しに行く時があるので、従来のweak consistencyではinconsistencyに気づく可能性があった
- 下記の4つのモデルがポイント
- Read Your Writes
- Monotonic Reads
- Writes Follow Reads
- Monotonic Writes
先行研究とくらべて何がすごい?
- atomic transactionとは異なる
- sessionという概念の導入
- アプリケーションが実行する、ひとつづきのread/writeを抽象化したもの
- 複数のサーバに接続先が切り替わっても大丈夫
- sessionという概念の導入
技術や手法の肝は?
- Read Your Writes
- 同一セッション内であれば、以前に行われた全てのwriteの結果をreadできる
- 「DBを更新したあとすぐに書き込んだ値を読めるか」という、ユーザやアプリケーションを悩ませる問題にモチベートされた
- 例) Webサービスで、パスワードを変更したあとすぐに新しいパスワードでログインしようとすると、データの更新が伝播されていない場合にログインできない時がある
- ログインプロセスが新しいパスワードに更新される前のサーバと通信したから
- Read Your Writesが担保されていると、パスワード更新後に別のサーバにつなぎに行っても新しいパスワードでログインできる
- 同一セッション内であれば、以前に行われた全てのwriteの結果をreadできる
- Monotonic Read
- 同一セッション内で、あるプロセスがreadした後に、再度readをすると前回と同じかそれより新しいデータをreadすることができる
- 例) 分散DBを利用しているメールシステムで、あるメールを読んだ後にreloadして別のサーバに繋ぎにいっても、前回読んだメールは全て読める
- Write Follow Reads
- 同一セッション内で、あるreadに後続するwriteは、前回のread時に取得した値かそれより新しい値に対して行われる
- 例) 掲示板の書き込みで、前回読み込んだ記事への返信を書き込む場合、別のサーバにアクセスしても、その記事が読み込める限り返信が書き込まれている
どうやって有効と検証した?
- 分散ファイルシステムのCodaに実装して検証した
議論はある?
- session guaranteeを導入することで、可用性を損なう可能性がある
- 実用上は問題なさそう
次に読むべき論文は?
Coda: A Highly Available File System for aDistributed Workstation Environment
分散システム 原理とパラダイム の同期について読んだ
Time, Clocks, and the Ordering of Events in a Distributed System を読んでいたが、ぼんやりとしか分からなかったので、まず下記の同期の章を読んだ。
- 作者: アンドリュー・S.タネンバウム,マールテン・ファンスティーン,Andrew S. Tanenbaum,Maarten van Steen,水野忠則,東野輝夫,宮西洋太郎,鈴木健二,西山智,佐藤文明
- 出版社/メーカー: ピアソンエデュケーション
- 発売日: 2003/10
- メディア: 単行本
- クリック: 25回
- この商品を含むブログ (20件) を見る
物理的クロック
- クロック
クロック同期アルゴリズム
- 全てのアルゴリズムは下記のモデルに基づいている
- 各マシンは1秒にH回の割り込みを発生するタイマをもっていると仮定する
- このタイマが動いた時、割り込み処理ハンドラは1をソフトウェアクロックに加える
- ソフトウェアクロックは同意がなされている過去のある時点以来のtickの数を追跡管理している(C)
- UTC時間がtのとき、マシンpのクロック地はCp(t)である
- 完全な世界では全てのpとtに対して、Cp(t) = tである
- だが実際はずれる
Cristioanのアルゴリズム
- 正確な時間を持つtime serverがいるとする
- 短い周期で各マシンが現在の時間を問い合わせるために時間サーバにメッセージを送る
- 各マシンが時間を受け取ると、調整した後に自己のクロックをその時間にあわせる
- 各マシンが返ってきた時間をそのまま自己クロックに適用させると問題が起こる
- 時間サーバへの要求送信と、返答の到着との時間間隔を正確に記録する
Berckeleyのアルゴリズム
- Cristioanのアルゴリズムにおいて、time serverは受動的
- Berckeleyのアルゴリズムでは時間デーモンが、各マシンにそれぞれが保持している時間について時々問い合わせる
- その返答に基づいてサーバは平均の時間を計算し、全ての他のマシンにその時間を進める/戻すように通知する
論理クロック
- クロックの同期は可能であるが、絶対的である必要はない
- 通常問題になるのは、全てプロセスが現在の時間について正確に同意しているということではなく、イベントの発生順序について同意しているということ
Lamportのタイムスタンプ
- 事前発生
- a -> b は「aはb以前に発生する」
- aがまず発生し、次にイベントbが発生していることに全てプロセスが同意していることを示す
- 事前発生関係は次の2つの状況において直接的に観察できる
- もしaおよびbが同じプロセスにおけるイベントで、aがb以前に発生するならa->b
- もしaが1つのプロセスによって送信されたメッセージのイベントであり、bが別のプロセスによって受信されたそのメッセージのイベントなら、a->bは真
- a -> b は「aはb以前に発生する」
- それぞれのイベントaに対して、全てのプロセスが合意する時間時C(a)を割り当てられることが重要
- a->bならばC(a) > C(b)
- クロック時間Cは常に増加しなければならない
- 減算してはいけない
- 例
- 各プロセスが異なる速度で動作しているとする
- 避けるべき例
- A: プロセス0(以降p0)がp1にメッセージを送ると、p1は10かかったと判断する
- B: p1がp2にメッセージを送ると16かかったと判断する
- C: p2がp1にメッセージを送ると-4となり明らかに不正である
- 対応方法
- C: p2では60に出発しているので60以降でないといけない、60未満であれば出発時の時間+1になるように自己クロックを進める
- 必ずイベント間には1のtickがある
- この方法を利用すると次の条件にしたがって、分散システムにおけるすべてのイベントに時間を割り当てられる
- 同じプロセスに置いて、aがb以前に発生するなら C(a) < C(b)
- もしaおよびbがそれぞれメッセージ送信および受信を表すなら C(a) < C(b)
- すべての区別できるイベントaおよびbに対してC(a) ≠ C(b)
ベクトルタイムスタンプ
- Lamportタイムスタンプでは、2つのイベントの順序については言及できるが関係については言及できない
- C(a) < C(b)でも、必ずしも本当にaがbより以前に発生したことを意味しない
- たとえば掲示版を考える
- 投稿と反応の2種類があると仮定する
- もしメッセージBがメッセージAのあとで配信されたとしても、Bは投稿Aへの反応とは限らない
- 因果関係を把握していない
- 因果関係はベクトルタイムスタンプで把握できる
- イベントaに割り当てられたベクトルタイムスタンプVT(a)はイベントbにたいして、VT(a) < VT(b)なら、aはbに因果的に先行する
- 各プロセスPiに次の2つの特性をもつ、1つのベクトルViを維持させることによって構成される
- Vi[i]はPiにおいて今まで発生したイベントの数である
- もしVi[j] = kならPjにおいて、k個のイベントが発生したとPiは判断する
Cassandra - A Decentralized Structured Storage System を読んだ
Cassandra - A Decentralized Structured Storage Systemを読んだ時のメモ。
どんなもの?
- 分散ストレージシステム
- 大規模データを多数の一般的なサーバに分散させることで可用性を高めて、SPOFを無くす
- ソフトウェア側で可用性とスケーラビリティをコントロールしている
- read efficiencyを犠牲にすることなくwrite throughputを高めている
- FacebookのInbox Searchのストレージのニーズを満たすために作られた
先行研究とくらべて何がすごい?
- Scale Incrementaly
- 低latencyでかつ、writeのスループットが高い
技術や手法の肝は?
- DataModel
- Table
- 1つのkeyでindexされた多次元マップ
- valueは高度に構造化されたobject
- row keyはsize limitのないstring(16~36byte)
- 1つのrow key下での全ての操作はどれだけ多くのカラムが読み書きされていようがレプリカ単位でatomic
- カラムはカラムファミリと呼ばれる単位でグルーピングされる
- カラムを時間もしくは名前でソートできる
- Table
-
- insert(table,key,rowMutation)
- get(table,key,columnName)
- delete(table,key,columnName)
System Architecture
Replication
Membership
- anti-entropy Gossip base protocolにもとづいている
Failure Detection
- Φ Accrual Failure Detectorを修正したものを使っている
- ノードの生死をboolean値で送るのではなく、suspection levelを送る
- Φ=1の時に我々が間違っている尤度は約10%
- ノードの生死をboolean値で送るのではなく、suspection levelを送る
- Φ Accrual Failure Detectorを修正したものを使っている
Bootstrapping
Scaling the Cluster
- 新しいノードは負荷の高いノードの負荷を軽減するトークンが割り当てられる
- 他のノードが担当していたレンジを新しいノードが分割して担当するということ
- 新しいノードは負荷の高いノードの負荷を軽減するトークンが割り当てられる
Local Persisitence
- 典型的なwrite
- 典型的なread
- disk上のファイルを探す前にin memoryのデータから探す
- 新しいものから古いものへと順に検索される
- disk上の複数のファイルから探すことになったときのために、bloom filterがデータとともに保存されており、in memoryにも存在する
- disk上の全てのカラムをスキャンすることを避けるために、カラムインデックスを保持する
どうやって有効と検証した?
- Facebook Inbox Searchで利用
議論はある?
- 今後は下記のサポートを行っていきたいと考えている
- compression
- atomicity across key
- secondary index
次に読むべき論文は?
A highly available file system for a distributed workstation environment
MapReduce: Simplified Data Processing on Large Clusters を読んだ
MapReduce: Simplified Data Processing on Large Clustersを読んだ時のメモ。
どんなもの?
- 巨大なデータセットを処理するプログラミングモデル
- Map
- key/valueのinputを中間的なkey/valueペアにする
- Reduce
- 全ての中間的なvalueを中間的なkeyでまとめる
- Map
- 多数の一般的なマシン上で並列実行される
- run-timeがinput-dataのパーティショニングや処理のスケジューリング、エラーハンドル等をしてくれるので、プログラマは意識しなくてもOK
先行研究とくらべて何がすごい?
並列分散システムを使ったことのないプログラマでも簡単に使える
- 並列化、耐障害性、localy optimization、負荷分散を透過的に行う
MapReduceの処理で様々なデータを生成出来る
何千台ものマシンにスケールしても動くように作られている
技術や手法の肝は?
- Lispや関数型言語のmap/reduceに着想を得た
- map
- inputを受け取って中間的なkey/valueのペアを作る
- key I のすべての値をgroup化してreduceに渡す
- reduce
- 中間的なkey Iとその値を受け取ってマージする
- 一般的にreduce後には0もしくは1つの値が作られる
処理概要
- input fileを16~64MBに分割する & プログラムのコピーをクラスタに配布する
- masterがworkerにtaskをassignしていく
- M個のmapタスクとR個のreduceタスクが生成される
- masterがidle中のworkerにtaskをassignしていく
- map taskのworkerは担当のinput fileを読み込みに行く
- key/valueのデータをparseしてmap関数にかけていく
- map関数によって生成されたkey/valueのデータはmemoryにのる
- memory上のデータがパーティショニング関数によってR regionに分割され、local diskに書き込まれる
- ファイルが書かれたlocal diskの場所をmasterに通知し、masterはreduce workerに場所を伝える
- 中間データの位置を通知されたreduce workerはRPCを用いてmap workerのlocal diskからデータを読み取りに行く
- データを読み終わったらkeyをグルーピングするためにソートする
- ソートされたデータを舐めていき、reduce関数にかけていく
- reduce関数の結果はout putファイルにappendされる
mapreduceの処理が終わるとmasterがユーザプログラムに通知する
MasterDataに含まれる情報
- 各map taskとreduce taskの状態情報を保持している(idle, in-progress, complete)
- worker machineの情報
- mapによって生成された中間ファイルの場所とサイズ
Fault Tolerance
- worker failure
- masterはworkerにpingを定期的に送る
- 返ってこない場合はfailedとmarkする
- そのworkerにassignされていたtaskは、idle statusに戻って他のworkerにassignされる
- task failure
- map taskは再実行される
- mapの結果ファイルがどの場所にあるかわからないため
- reduce taskは再実行されない
- 結果はグローバルなファイルシステムに書き込まれるため
- map taskは再実行される
- master failure
- master dataにcheckpointを書き出しておく
- master taskが死ぬと最後に書き込まれたcheckpointから処理が再実行される
- masterが死んだタイミングでMapReduceのtaskを全部abortしてretry
- worker failure
- Locality
- ネットワーク帯域がボトルネックになっていた
- map taskを対象のデータを持っているmachine上のworkerで行わせた
- 無ければ同じネットワーク内のmachineで実行
- ネットワーク帯域がボトルネックになっていた
Backup
- ハズレのmachineにあたると処理が遅くなり、全体の処理完了までの時間もそのmachineの処理に引きずられて遅くなる
- MapReduceのtaskが終わりに近づくと、masterがin-progressなtaskのbackup実行をスケジュールする
- in-progressの処理か、backup処理の一方が終わればそのtaskは完了
- backup処理を追加することで数%の処理量が増えるように調整し、その結果backupありの方が44%速くなった
学び
- プログラミングモデルを限定することで、並列分散処理が簡単になった
- ネットワーク帯域がボトルネックになった
- 最適化のいくつかも通信量を減らすためのもの
- 重複実行は遅いマシンや故障したマシンがいたときに全体の処理を早くするのとデータロスを防ぐのに役に立った
どうやって有効と検証した?
- machine
- 2GHzのプロセッサ
- 4GBのメモリ
- 160GBのdisk
- 1800台のサーバ
- Grep
- 10^10個の100byteのrecordをscan
- 80secくらいでmap taskが終わり、全体で150secほどかかる
- start upのoverheadがある
- 全てのworkerにプログラムを伝搬させる
- 1000個のinput fileのopen
- locality optimazation情報の取得
Sort
- 10^10個の100byteのrecordをsort
- a) Normal mapは200secで終わる
- Shuffle時に2つの山があるのは、batch処理を行っている時
- 300secくらいで初めの処理が終わり、600secで次が終わる
- writeが完了するまでは850secくらいで、起動時のオーバヘッドを含めると891sec
- Shuffle時に2つの山があるのは、batch処理を行っている時
- b) Backupをなくしたとき
- Backupを無くすとロングテールのグラフになる
- 最後の5つのreduce taskが遅く、300secくらいかかった
- 1283secほどかかった
- Backupを無くすとロングテールのグラフになる
- c) Machine Failureしたとき
- 200/1746 workerをkillした
- re-executionが実行されて、通常時よりも5%ほど時間がかかった
MapReduceにしてよかったこと
次に読むべき論文は?
The Dataflow Model を読んだ
どんなもの?
- unbounded で順不同なデータを処理する上で、正確性、latency、costをいい感じにするアプローチについての考察
先行研究とくらべて何がすごい?
- 既存の考え方では、unboundedなデータに終わりがあると考えているのがおかしい
- データの総量がどれくらいで、全てのデータがいつ揃うかということは考えない
- ただ新しいデータが来て、古いデータは撤回されるということだけを考える
技術や手法の肝は?
- 下記が肝
- windowing model
- triggering model
- incrementral processing model
- scalable implementation
- Windowing
- Time domain
- Event Time
- イベントが実際に起こった時間
- Processing Time
- pipelineでそのイベントが観測された時間
- Event Time
- Window Mergingの流れ
Triggers & Incremental Processing
- Windowing
- event timeのどの時点でデータがグループ化されるのかを決定する
- Triggering
- processing timeのどの時点でグループ化の結果がemitされたかを決定する
- Windowing
どうやって有効と検証した?
- FlumeJavaとMillWheelを用いて、Googleの実際のプロダクトに実装した
- Youtube(Session window), Recommendation(Processing Time Triggers)等
議論はある?
次に読むべき論文は?
The Chubby lock service for loosely-coupled distributed systems を読んだ
The Chubby lock service for loosely-coupled distributed systemsを読んだ。
ロックについての基礎知識が無くて全然わからなかった。
どんなもの?
- 分散システム用のlock service
先行研究とくらべて何がすごい?
- 分散合意問題を非同期コミュニケーションを用いて解決した(Paxos)
- 小さなサイズのメタデータを全replicaにsyncさせて可用性を担保している
技術や手法の肝は?
- System構成
- Chubby cellは5つのレプリカで構成されている
- Paxosでmasterを選出している
- レプリカが死ぬと数時間後に free poolから新たなレプリカが選出される
- Event
- Chubby Clientはeventをsubscribeできる
- fileの変更
- child nodeの追加/削除
- masterのfailover
- lockの異常
- Chubby Clientはeventをsubscribeできる
議論はある?
次に読むべき論文は?
趣向を変えて
The Google File System を読んだ
The Google File System を読んだ。
どんなもの?
- Googleのサービスの特性に最適化させた、大規模分散システム用のスケーラブルなファイルシステム
- 廉価で故障しやすいHW上で稼働することを前提とし、耐障害性がある
- aggregateのパフォーマンスが高い
- BigTableにも使われてる
先行研究とくらべて何がすごい?
- 何千台もの一般的なサーバで動く
- 故障するのが前提で耐障害性とリカバリを担保
- 一般的なファイルシステムに比べて、大きなファイルを扱うことを前提としている
- KB単位ではなくGB単位
- 新しいデータはappendしてoverwriteしない
- レスポンスタイムよりも帯域幅に重きを置いている
- サービスの特性上bulk処理が大半を占めるから
技術や手法の肝は?
- Architecture
- single masterとmulit chunkserverで構成されている
- fileは固定サイズ(64MB)のchunkに分割される
- 利点
- ファイルサイズが大きいので、小さいファイルに比べて何度もやりとりをする必要がなく通信のoverheadが少ない
- 通信の回数が少なくなるのでmasterサーバへの負荷が少なくなる
- masterサーバに保存されるmetaデータのサイズが小さくなる
- 欠点
- 同じファイルへのアクセスが多いとhotspotになる
- Googleのサービスの特性上、複数のchunkファイルをシーケンシャルに読むのであまり問題にならない
- 同じファイルへのアクセスが多いとhotspotになる
- 利点
- chunkserverはlocalにchunkをlinuxのファイル形式で保存する
- ファイル操作はchunck handle(uniqueなidのようなもの)とrangeを使っておこなわれる
- master server
- メタデータ
- 3種類ある
- fileとchunkのnamespace (operation logとin memory)
- fileからchunkへのmapping (operation logとin memory)
- . chunkのreplicaがどのchunkserverになるか (in memoryのみ)
- 3種類ある
- chunk server
- デフォルトで3台のchuckサーバにデータがreplicateされる(primaryとsecondary)
Interaction
- clientがmasterに問い合わせて、対象chunk&現在leaseを持っているserverと他のreplicaの情報を得る
- masterはprimaryとsecondaryの情報をclientに返す(clientはこれをcacheする)
- clientがデータを任意の順番で全てのreplicaにpushする
- 全てのreplicaにデータが行き届いたら、clientがprimaryにwrite requestを送る
- primaryがsecondaryにwrite requestを送る
- 全てのsecondaryがprimaryに書き込み完了を通知すると処理が終わる
primaryがclientに通知(エラーがあったらclient側でretryする)
-
- アプリケーションによってファイルが消されても、masterはdelete処理をlogに書き込むだけで実体はすぐには消さない
- renameされて、deletetion timestampが振られる
- masterのファイルシステムnamespaceのregular scanの際に、3日(変更可能)が過ぎたファイルは削除される
- アプリケーションによってファイルが消されても、masterはdelete処理をlogに書き込むだけで実体はすぐには消さない
どうやって有効と検証した?
- A: R&Dで数百人のエンジニアが利用しているCluster
B: 本番データで利用しているCluster
readはwriteよりも速い
- 本番環境でもwriteよりもreadの処理が多いことを想定している
- BがAよりもreadの効率が悪い
- master opsは200~500 Ops/sを安定して出せている
議論はある?
- さまざまな問題の中でも一番大きかったのはdiskとLinux関係の問題
- 複数versionのIDE protocolをサポートしているようにように思えたdiskが、実は最新バージョンしかサポートしてなかった
- だいたい動くけれど、たまに動かなくてデータが壊れるのでchecksumをいれるようにした
- 複数versionのIDE protocolをサポートしているようにように思えたdiskが、実は最新バージョンしかサポートしてなかった
次に読むべき論文は?
↑も面白そうだけれど、Chubbyを読む