Replicated Serializable Snapshot Isolation解説

ちょっと諸般の事情で放りだしてあったのですが、まとめておかないと忘れるので、記録的においておきます。あとでたぶん自分でも見直すと思うので。
このエントリーは完全にトランザクションの人向けです。現時点これが本当に必要な人は世界でたぶん50人ぐらいだと思います。全日本的には絶対わかんないとまずいという人はたぶん5人ぐらいです。

ただし、分散DBガチの人はわかっていた方がいいと思うので、おいておきます。

論文はこちら
http://sydney.edu.au/engineering/it/~hyungsoo/vldb2011.pdf

内容はSerializable Snapshot Isolation (以下SSIと略記)の分散環境下への適用に関する論文です。一応実装もあってベンチマーク結果が出ています。SSIについては下記エントリーを参照にしてください。
http://d.hatena.ne.jp/okachimachiorz/20130331/1364709005

まず、SSIの前提のSI自体の分散適用は別のエントリーをみてください。
http://d.hatena.ne.jp/okachimachiorz/20130512/1368344294
要するにFirst-Commit-Winルールの適用でクリアしています。

以上が前提です。以降はこれらがわかっている前提で進めます。論文もほぼ同じ前提を引いています。

このエントリーの目的は、同論文の特にAppendixの解説になります。本文じゃなくてAppendixかよって話もあると思いますが、こっちの方が大事だと思います。本文は予備知識の想定が半端ないので、論文自体がabstractっぽくなってます。(こんなんでよくVLDB通るなというか、いや内容的には通って当然なんですけどね・・・)ということでAppendixが重要ですが、初見だとなんかの判じ物にしか見えないというか、最初見たときにもちょっとギョッとなったので、ここで自分向けの解説書いておきます。

分散DBはおおよそこの辺まで来ている、ということでいいかと思います。この論文がほぼ2年前なので、最新の状況はもう少し先を行っていると思います。

・・まずざっく概略を記述すると、現時点でConsistency(ここでは1-Copy-Serializableを仮置きでおきます)の達成手段としてもっとも有望なSSIの分散replication下での実装の方法を述べています。最大のポイントはやはりDangerous Structure(論文ではDescending Structureという言い方になっています)の検出になります。

分散環境下でのDescending Structureの検出、すなわち、GlobalなDependency Checkの方法としてGlobal Dependency Check Protocol(以下GDCP)の実装がポイントになります。

誰でも考えるように単純にAnti-dependencyのあるTxの情報を持ち回ってチェックすればいいのですが、特にReadSetの持ち回りは対象がまるまるテーブル一つになったりすると、相当やばいことになります。すなわち非現実です。よって、どうするのか?ということが最大の問題になります。結論から言うと、コンフリクトのあるWriteSetだけを持ち回るという方法を選択しています。

したがって、暗黙のうちに、細かいWriteがバラバラ来るという処理が多いというドメインを前提にしています。全データの一斉更新とかはさすがに無理ですね。これをトランザクションにして分散処理すると、間違いなくストップ・ザ・ワールドになります。(そもそも、全データの一斉更新のトランザクション処理は、単ノードだろう分散だろうとストップ・ザ・ワールドの「バッチ処理」になります。)

とはいえ、readとwriteが混在したOLTP的な処理を分散環境下で行い、かつ一貫性を担保する手法として、きわめて有用な手法のひとつと言えると思います。

さて・・・

まず、従来のSSIのDangerous Structure(以下Descending Structure)、の検出を、Latest Snapshotを基準に引き直します。基本的にSSIの証明と同じです。ただしlsv (Latest Snapshot Version)を利用して、よりエレガントになっています。結果としては、実装しやすい簡易な形になっていると思います。証明は以下になりますが、結果は割と単純なので、おまじないのように覚えておくといいかもしれません。

Definition
[Descending Structure]
三つのTxを想定する。それぞれTp・Tf・Ttとする(peak, follow, trailからとっている。ややこしいことにこの証明でのTpはTx peakの略で、論文の後半で出てくるTpはTx pendingの略です。混同しないように。)
また各Txで読み込むSnapshotはTx開始時点で最新のものとし、lsv(Tx)とする。

・Anti Dependencyのグラフを想定する
・すなわちTp→rw→Tf  Tf→rw→Tt
このとき
・lsv(Tf)≦lsv(Tp) かつ lsv(Tt)≦lsv(Tp)とする
このグラフをDescending Structureとする

Theorem
・Readはすべて先行するSnapshotからとする
・WriteはFirst-committer-winルールを適用する
このとき
・Descending Structureがない場合
そのTxの集合はSerializableである。

Proof(以下Appendix A)
まずLemma

・Readはすべて先行するSnapshotからとする
・WriteはFirst-committer-winルールを適用する

とすると以下が成立する

・Ti→wr→Tj ならば commit(Ti)≦lsv(Tj)
・Ti→ww→Tj ならば commit(Ti)≦lsv(Tj)
・Ti→rw→Tj ならば begin(Ti)<commit(Tj)

順に説明
・Ti→wr→Tj ならば commit(Ti)≦lsv(Tj)
これはSnapshotからのReadを前提にするので自明です。後続するTxは常に最新のSnapshotを読むという前提になっている。

・Ti→ww→Tj ならば commit(Ti)≦lsv(Tj)
これはFirst-committer-winルールから導きます。まず書き込みが成立しているので、Ci<Cjになります。その結果、Ci≦snapshot j<Cjになります。Ci≦snapshot j≦lsv(Tj)<Cj よって、commit(Ti)≦lsv(Tj)

・Ti→rw→Tj ならば begin(Ti)<commit(Tj)
Ti→rw→Tjなので、TiではXkなるなんらかのTi開始以前に作られたversionを読んでいる。TjではW(Xj)となり、なんらかのversionを生成する。TiではTjでのversionは見えてしまうと、Ti→rw→Tjにはならないので、cjよりも以前のversionをTiで読むことになる。したがって、begin(Ti)<commit(Tj)

以上で準備を完了してProofにはいる
方針として対偶を利用する。
すなわち「Serializableでなければ、必ずDescending Structureが存在する」ことを証明する。

・Serializableでない場合は、Txの依存関係でなんらかの循環が発生する(循環がない場合はtopological sortでserialize可能なので矛盾)
・循環するTxの中でもっとも最新のlsvを読んでいるTxを取り出す。これをTpとする
・その後にシーケンスにつながるTf、Ttを取り出す

さて、ここで最新のlsvをTpが読んでいるので、当然、lsv(Tf)≦lsv(Tp) かつlsv(Tt)≦lsv(Tp)になる。んで、ここで、Tp→Tf Tf→Ttがrwであることを示せばDescending Structureの存在を示したことになる。どうするか、っていうと、Tp→Tf Tf→Ttがwwまたはwrではない、ということを示す。

まず、Tp→Tfについて

もし、wwまたはwrであれば、Lemmaより commit(Tp)≦lsv(Tf)
ここで、当然に、lsv(Tp) <commit(Tp)なので、lsv(Tp) <commit(Tp) ≦lsv(Tf)になり、よって、lsv(Tp) <lsv(Tf) よって矛盾。よって、ww-wrではありえない

したがって、Tp→Tfはrwのdependencyになる。

ここでLemmaを利用して、Tp→Tfがrwであれば、begin(Tp)<commit(Tf)になる。
よって、lsv(Tp)< begin(Tp)<commit(Tf)が成立。

さて次は、Tf→Ttがrwであることを示す。
同様に、もし、wwまたはwrであれば、Lemmaより
commit(Tf)≦lsv(Tt)

ここでlsv(Tp)< begin(Tp)<commit(Tf)なので、lsv(Tp)< begin(Tp)<commit(Tf) ≦lsv(Tt)
よって、lsv(Tp)<lsv(Tt)になる。これは矛盾。よってww-wrではありえない。

したがって、Tf→Ttはrwのdependencyになる。

以上より、Tp→Tf Tf→Ttはrwであり、Descending Structureが存在する。
よって、Serializableでなければ、必ずDescending Structureが存在する
よって、Descending Structureが存在しなければ、Serializableである。
以上、極めてクリアです。

なので、分散の系の中で、あるTxがcommitできるかどうか、SIでのread-fromと、first-commit-winルールを適用している限りにおいては、Descending Structureの存在をチェックすればよい、ということになります。

さて次に、肝心のDescending Structureの存在チェックの実装になります。

が、その前に、どういう環境なのか、何をしようとしているのかざっくり書いておきます。

前提としては、複数のプロセス(ノード)があり、クライアントはあるプロセスにアクセスし、そのプロセスのデータの読む込み・書き込みを自由に行う環境を想定します。その結果、各プロセスでTxが発生します。

各プロセスは、それぞれの下位にDB層をもちローカルデータの保存/読み出しを行います。また、データはすべてのプロセスで共有化されようにします。すなわち、あるプロセスで書かれた内容は、他のほかのプロセスに伝播する仕組みになります。当然、その結果として各データの不整合が発生する可能性があるので、それを裁量する必要があり、それをserializableな形に処理することが目標になります。(いわゆる、Read-One-Write-AllのROWAを想定しています。)

また、各Txの全順序は保証されているものとしています。この仮定は微妙ではありますが、SSIを前提にしている以上、TO(Timestamp Ordering)の確保は基本になるので、仮定としては、それほどでたらめな話ではありません。

課題は、あるローカルのデータセットに書き込み要求があったときに、その書き込みがコミットできるどうか? できたとして、それを他のプロセスではどう扱うべきか、ということに帰着します。

課題の解決は、大きく二つのラウンドで行われます。

最初のラウンドは、各プロセスが、自分で保持しているTxにおいて、自分のローカルのWriteSetへの書き込みが可能かどうか?を各ノードに問い合わせを行い、可能(Serializable)であればcommitするというものになります。

次のラウンドは、commitした結果を各ノードに通知して、各ノードで更新処理を行うと同時にコンフリクトのあるリモートTxの処理を各ノードで処理させるというものになります。

よって2 roundの処理になっています。この手の分散処理は大抵は2 roundの処理になることが多く、RSSIでも同じになっています。トラディショナルな手法では2PL+2PCが主流であるのに対して、RSSIはロックフリーで、2PCも行いません。まったく異なったアプローチで分散コミットを行います。

コミットレベルではeagerとlazyの中間ぐらいに見えますが、厳密にいうとlazyでしょうね。とりあえずcommitの前に全ノードに聞いているので、その時点はeagerっぽいのですが、commit後のデータ更新伝播なので、その意味ではlazyでしょう。

データ一貫性を担保するのであれば、eagerが望ましいのですが、いかんせんかなり非現実的な話になります。事前の問い合わせでconsistencyがとれていれば、ローカルコミットは可能であることと、そもそも該当Txがpending状態であることがリモートサイドでも認識できているので、lazyでも不都合はないでしょう。まぁ、確かにそーゆー解決法は現実的ではあります。

では、Algorithm1 で疑似コードが提示されていますので、そのあたりから・・・
正直まじめにわかりづらいです。大雑把につかむとそれなりにシンプルにはなっていますが・・・。

[1]まずは分散環境でのDescending Structureの存在チェックのために、持ち回るデータセットを定義します。

構成要素は以下。
Wi Ri := (あるローカルでのTxである)TiでのRead setとWrite set
Tc :=コミットされたTxのログ
Tp :=ペンディングになっているTxのログ
Tl,c :=Tcのうちでローカルなもの
Tl,p :=Tpのうちでローカルなもの

lsv := latest snapshot versionのTimestamp
Tiでアクセスされるもっとも値の新しいsnaphotのversionを示す。したがって、lsv(Ti)<=begin(Ti) (等号はlsvを生成したTjのコミットのタイミングとTiの開始のタイミングが一致して、かつTjのwsとTiのrsで共通部分がある場合のみ成立)

Df,c := Ti(よってRi)からrw-dependencyをもつTc(Writeをもつ)の、TxのIDとlsvのペアの集合
Df,p := Ti(よってRi)からrw-dependencyをもつTp(Writeをもつ)の、TxのIDとlsvのペアの集合
Dt,c := Ti(よってWi)へのrw-dependencyをもつTc(Readをもつ)の、TxのIDとlsvのペア。後述するように該当Tcについては最新のlsvのみが重要なのでこれは単一ペアでよい。
Dt,p:= Ti(よってWi)へのrw-dependencyをもつTp(Readをもつ)の、TxのIDとlsvのペアの集合

以上より{Wi Df,c Df,p Dt,c Dt,p 終端記号}のデータセットをつくる。

[2]最初のブロードキャストの準備
前提)基本的な流れは、自分のローカルのWriteSetを飛ばして、ほかのノードでのWritesetを受け取って、自分のところのReadSetとチェックをする、という形になる。

まずWiとRiの獲得。これはローカルから取得
・Df,cの作成。コミットされたTxのなかから「Tiからのanti-dependency」のあるものを探す。んで、ヒットしたらそのlsvをとってきてセット。
・Df,pの作成。ペンディングになっているTxのなかから「Tiからのanti-dependency」のあるものを探す。んで、ヒットしたらそのlsvをとってきてセット。
・Dt,cの作成。コミットされたTxのなかから「Tiへのanti-dependency」のあるものを探す。んで、ヒットしたらその最大のlsvをとってきて、TxのIDと当該lsvセット。またこのlsvをlsvmとしてセット。なお、Dt,cはこのTiについては最新のsnapshotを利用しているエッジがクリティカル(それより前は問題ではない。Descending structureの要件が、Tp→rw→Tf  Tf→rw→Ttの時のlsv(Tf)≦lsv(Tp) かつ lsv(Tt)≦lsv(Tp)なので、Tp/Tfについては(ここではDt,c)については最大のもののみが問題になる。すでにコミットされているので、最大のものだけを基準にするということになる。)なので、最大のものをそのままペアとして処理する。
・Dt,pの作成。ペンディングになっているTxのなかから「Tiへのanti-dependency」のあるものを探す。さらに、そのTxのlsvがlsvmよりも最新である場合に、当該TxのIDとlsvをとってきてセット。これはまだコミットされていないので、まずい可能性があるものをすべて列挙しておく必要がある。最大のlsvになる可能性があるものはすべて押さえておくということになる。

[3]最初のチェックとブロードキャスト
まず、この状態で、Df,cとDt,cで共通しているものがあれば、abort処理する。そうでないものはペンディングにして、Tl,pにアペンドする。その上で、{Wi Df,c Df,p Dt,c Dt,p 終端記号}で構成される構造をブロード・キャストする。
なお、終端記号は該当するWiについて、commit可能かどうかの判断が終わっているかどうかを表す。論文では未定の場合は⊥、終わっている場合はDECIDEという記法を用いている。最初にキャストする時は全部⊥になる。

[4]次に、このグラフ構造を受け取った場合の処理
処理のパターンとしては2種類ある。一つは終端記号が未決のもの(⊥)と、終端記号が決定済みのもの(DECIDE)になっているもの。後者のケースは後述。まず最初に受け取るものは通常は未決状態のもになる。なお、終端記号はデータセット自身にセットされるものと、Dt,cにセットされるものの二つがある。前者は処理が可能かどうかを示し、後者はTxが循環しているかどうかを示す。(後者の終端記号は、⊥,⊥になっている)

[4-1]データセットの終端記号が未決(⊥)の場合
この場合、まずWiを検査して、自分のローカルTxかどうか確認する。

[リモートの場合]
まず、Dt,cが終端記号(⊥,⊥)でない場合は、Tpのリストにアペンドしておく。その上でリモートの場合は、ローカルのコミット済みTxでリモートのTxのDtになるものがないかどうかチェックして、ある場合でかつlsvがより大きい場合は、Dtにappendする。Wiがローカルではないので、ローカルサイドではread側のTxがローカルにあるかどうか判断するということになる。(これ重要!)
なお、それがDfである場合は循環になる。(2Txで循環するケース)ので、不可という判断をする。
同時に、構造を確認して、循環の場合は、Dt,cに終端記号をセットする。それから次にブロードキャストする。

つぎに、同じリモートでも終端記号(⊥,⊥)がある場合は、すでに循環になっていて、かつ自分のローカルでは関係がないので、そのままブロードキャストする。

[ローカルの場合]
これはメッセージが全ノードを回って戻ってきたことを意味する。Dt,cに終端記号(⊥,⊥)がある場合は、循環しているので、ローカルTxをabortする。その上で、ペンディングTxをTl,pから該当分を除去する。
さらに、グラフ構造に終端記号をDECIDEにして、ブロードキャストする。二周目を開始する。

なお、このときに循環がなければ、Tp上のデータセットについてはDECIDEになるので、後続のgdcpDeliverプロセスで、コミットの優先権(Tpキューの最初に該当するローカルTxが存在)があればコミット処理される。つまり、結果として、一周目の終了でローカル・コミットになる。優先権がない場合は先行するTxが更新可能状態になったその時点でコミットされる。この場合は一周目の終了がしたからと言ってコミット可能というわけではないことに注意。・・・なのでeagerとはいいづらい部分もあるというのが感想で、ただし、理論上はデータセットのルーティングの追い越しがない場合は、必ず二周完了時点には完了するわけで、ということで、平均すると1.5周ぐらいでコミットになる・・・

最後に、Tpを処理するプロセス(gdcpDeliver())を起動する。

[4-2]つぎに、データセットの終端記号が決定済み(DECIDE)になっている場合
[ローカルの場合]
WiがTl,pに含まれている場合は、ローカルなので、Tl,pの該当するTiをDECIDEとマークし、データセットの更新処理をする。(データセットの更新であり、ローカル・データの更新ではない)
[リモートの場合]
単純に該当するTiをTpに加えて、次にブロードキャストする。

最後に、Tpを処理するプロセス(gdcpDeliver())を起動する。

[5]結果の処理(gdcpDeliver())
Tpを処理するプロセスになる。このプロセスでローカルへの書き込みを実行する。
前提として、Txの全順序が保証されている。すなわちTpのキューは、TOでソートされている。first-commit-winルールでも必要なので、これは前提としては“できるかどうかは別として”妥当。
Tpのキューを最初から取り出して、そのマークがDECIDEである場合は、順に処理をしてDECIDEでないところまで進める。よって、例えば、最初のTxがDECIDEでない場合は、何もしない。
処理を行う場合、Descending Structureがない場合はローカルの場合はcommitを行い、リモートの場合はupdateを行う(別にcommitでもよい)ことになる。

この辺は、すごく良くできていて、たぶん理論的に抽象度をもっと上げるときれいな数式で表現できるはずなのですが、ここでは実装優先で理論をつくっているので、ちょっとごちゃごちゃしてます。

ここまで流れの解説がAppendix Bにあるのですが、(かなり)わかりづらいので、わかりやすくした(つもりの)ものを下につけておきます。ちょっとアレしていますが、冷静にゆっくり読めばわかるはずです。はずだ。はずに違いない。・・実際、わかると「なるほどー」ということになります。相互に直接連絡することなく、それぞれのプロセスで整合性をもってコミットできるので、これはこれで手品みたいな感じですが。

このアルゴリズム自体は、割と実装寄りのアルゴリズムになっている感じで、やるべきことは割とシンプルになっています。最大のトリックはTpキューの全順序による結果反映で、今までは割と実現が難しいところでしたが、例のSpanner以降は実際に実装が可能になってきています。その意味ではかなり現実的な解だと思われます。

以下、補足事項
・データ転送の手順について
基本的に、Ring topologyを利用していて、使われている手法はLCRです。詳細は以下
Throughput optimal total order broadcast for cluster environments
http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.167.868
ACMからDLした方がいいかと思います。

このあたりは、何度も議論されているところなので、論点としてはあるとは思いますが、今回は省略。

GC
不要なTxのlogのGCですが、基準としては、そのTxのcommit以前に開始されたPending Txがなくなった時、ということになります。これはTpのもっとも最初のTxのbegin timestampになるので、各ノードがその値を通知し、その中でもっとも古いものを選択する方式を採用しています。その値以前のTxのログを除去するということになります。

・障害対策
まず対応は単純なstop-failureについて。あんまり冴えた方法はなくて、そもそもこれローカルにDBがあるので、そこに書いておいて、復帰時にマスターレプリカから更新して、Ring topologyに復帰して、継続処理開始になる。

なお、Byzantine障害については
Practical Byzantine Fault Tolerance and Proactive Recovery
Zyzzyva: Speculative Byzantine Fault Tolerance
を参考にして適当に手をうつ、とか書いてあります。まじですか。この逃げ方は業界のデファクトですか的なアレですか。

・全体的なパフォーマンス
比較すべき対象は、rwを片っ端からabortして、retryさせる手法になりますが、そのベンチマークの結果も出ています。この場合、偽陽性が陽性の3倍出て、RSSIのオーバーヘッドが2倍程度なので、RSSIの勝ちとか出てますね。正直、オーバーヘッドの縮小は、環境やら実装で相当削れるアルゴリズムなので、片っ端からabortする手法よりは、より効率がよい、というのは、その通りではないかなと思います。

・課題
論文にあるとおり、持ち回るデータセットを少なくしているとはいえ、全体の環境としては、低遅延が望ましいのは間違いないでしょう。したがって、DC間のようなレイテンシーが高いものはさすがにちょっと無理があります。せめて、round数を減らす方向でいかないとまずい(のでMDCCとかあっちの方向にいくという展開かと)と思われます。

・とはいえ
とはいえ、低遅延環境が保証される複数ノードクラスターの分散Tx処理で、1-copy serializableを達成する仕組みとしては、最有力候補の一つであることは間違いないでしょう。DC内部での複数ラックにまたがるようなサイズでの分散Txはもう技術的に可能ですね、ということだと思います。

今後の話題としては、DC間の分散TXになると思います。個人的には、そのレンジになると、2 round処理ではコミットが間に合わないので、1 round+αぐらいで決着をつける、方向になるのでは?とか思っています。

・・・というわけですよ。これでSSIについてはほぼ完成系じゃないかと思います。あとは持ち回りの工夫だけなので、これを超える仕組みはまったく違う理論でくみ上げないと駄目だと思います。そんな感じ。gdcpについてはもっとリファインできる気がするので、誰かやってFekete大先生に送りつけるといいかもしれません。