Pacific

December 01, 2009

高度に進化した分散データストアは RDBMS と見分けがつかない? (shibuya.pm #12 スライド)

 昨日開催された shibuya.pm #12 - NoSQL特集で使用したスライドを slideshare にアップロードしました。

 開発しているシャーディングミドルウェアである Incline と Pacific については YAPC::Asia 2009 を始めいろいろな所で話をする機会をいただいてきたので、今回は、なぜ RDBMS ベースのアプローチを採用したのかという背景を中心に説明させていただきました。概念的な話が多くて分かりにくかったと思います(すみません)が、細かな点についてはパフォーマンスとスケーラビリティのためのデータベースアーキテクチャ (BPStudy#25発表資料)を参照いただければと思います。

 また、中で出てきた「実体化ビュー」については、Materialized view - Wikipedia, the free encyclopediaが良くまとまっているかと思います。Incline は一言でいうと、RDBで構成されるshard群の上で read-only かつ eventually consistent な materialized view を実現するためのツールです。

September 11, 2009

A Clever way to scale-out a web application (YAPC::Asia 2009 Presentation)

For couple of months I have been writing middlewares for database shards, and today I made a presentation covering them.  It includes the following.

  • Incline - a trigger and queue based distributed materialized view manager
  • Pacific - a set of perl scripts to manage MySQL shards, a MySQL shard can be split into two in less than 10 seconds of write blocking (and no read blocks)
  • DBIx::ShardManager - a client API for accessing database shards using Incline and Pacific

With these middlewares I think it is no more difficult to write web applications that runs on database shards.  In fact IMHO it is as easy as writing a webapp that runs on a standalone database.

The presentation slides are available from slideshare.  If you have any question or suggestions, please leave a comment.  Thank you.

July 14, 2009

Intruducing Incline - a synchronization tool for RDB shards

For the last weeks, I have been writing a tool called "Incline," a program that automatically maintains consistency between sharded MySQL databases.  The aim of the software is to free application developers from hand-writing code for keeping consistency between RDB nodes, so that they can concentrate on writing the application logic.

Background

Denormalization is unavoidable in a sharded RDB environment.  For example, when a message is sent from a user to another user, the information should be stored on a database node where the sender of the message belongs to, and on another node where the receiver does.  In most cases, denormalization logic is hand-written by web application developers, and since it has been a burden for creating large-scale web services.  Incline takes off the load from developers.  By reading the definition files, Incline keeps the tables on a sharded MySQL environment in sync, by providing a trigger-generator and a replication program that, synchronously or asynchronously reflects the changes of source tables into materialized-view-like tables.

Installing Incline

Incline is written in C++ and uses autotools for building the tools.  However, since I have not yet added automatic lookup for libmysqlclient, you need to specify their location manually to build Incline.  My build procedure is as follows.

% svn co http://kazuho.31tools.com/svn/incline/trunk incline
% cd incline
% autoreconf -i
% ./configure CFLAGS=-I/usr/local/mysql/include/mysql CXXFLAGS=-I/usr/local/mysql/include/mysql LDFLAGS='-L/usr/local/mysql/lib/mysql -lmysqlclient'
% make
% make install

Defining Replication Rules

Replication rules of Incline are written using JSON files.  Consider creating a twitter-like microblog on a shared environment.  It would be consisted of four tables, each of them distributed through RDB shards by the user_id column.  The "tweet" table and "following" table are updated by user actions, while "followed_by" table and "timeline" table are denormalized tables that need to be kept synchronized to the former two.

CREATE TABLE tweet (
  tweet_id INT UNSIGNED NOT NULL AUTO_INCREMENT,
  user_id INT UNSIGNED NOT NULL,
  body VARCHAR(255) NOT NULL,
  ctime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (tweet_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE following (
  user_id INT UNSIGNED NOT NULL,
  following_id INT UNSIGNED NOT NULL,
  PRIMARY KEY (user_id,following_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE followed_by (
  user_id INT UNSIGNED NOT NULL,
  follower_id INT UNSIGNED NOT NULL,
  PRIMARY KEY (user_id,follower_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE timeline (
  user_id INT UNSIGNED NOT NULL,
  tweet_user_id INT UNSIGNED NOT NULL,
  tweet_id INT UNSIGNED NOT NULL,
  ctime TIMESTAMP NOT NULL,
  PRIMARY KEY (user_id,tweet_user_id,tweet_id),
  KEY user_ctime_tweet_user_tweet (user_id,ctime,tweet_user_id,tweet_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Incline uses two JSON files to define a replication rule.  The first file, microblog.json defines the mapping of columns using directives "pk_columns" (primary key columns) and "npk_columns" (non-pk_columns) between the "source" table(s) and "destination" of the replication.  When merging more than two tables to a single destination, "merge_cond" attribute is used to define the inner join condition(s).

microblog.json
[
  {
    "source"      : [ "tweet", "followed_by" ],
    "destination" : "timeline",
    "pk_columns"  : {
      "followed_by.follower_id" : "user_id",
      "tweet.user_id"           : "tweet_user_id",
      "tweet.tweet_id"          : "tweet_id"
    },
    "npk_columns" : {
      "tweet.ctime" : "ctime"
    },
    "merge"       : {
      "tweet.user_id" : "followed_by.user_id"
    },
    "shard-key"   : "user_id"
  },
  {
    "source"      : "following",
    "destination" : "followed_by",
    "pk_columns"  : {
       "following.following_id" : "user_id",
       "following.user_id"      : "follower_id"
    },
    "shard-key"   : "user_id"
  }
]

The second file shard.json defines the mapping between user_id and RDB nodes.  Range-based sharding (on an integer column) is specified in the example.  Other algorithm currently supported are: range-str-case-sensitive and hash-int.

shard.json
{
  "algorithm" : "range-int",
  "map"       : {
    "0"    : "10.0.1.1:3306",
    "1000" : "10.0.1.2:3306",
    "2000" : "10.0.1.3:3306",
    "3000" : "10.0.1.4:3306"
  }
}

With the definition, tables will be synchorized in the direction described in the figure below (only illustrates two nodes).

Running Incline

To run Incline, queue tables should be created and database triggers need to be installed on each RDB node.  This can be done by calling "incline create-queue" and "incline create-trigger" commands for each node, and the setup is complete.

% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.1 create-queue
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.1 create-trigger
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.2 create-queue
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.2 create-trigger
...

The installed triggers take care of synchronizing the denormalized tables within each node.  The next step is to run the forwarder (replicator between RDB nodes) for each node, so that the views should be kept in sync.  This can be done by calling "incline forward."The forwarding logic is defined in a way that the data stored in RDB nodes would become eventually synchronized even if either of the RDB nodes or the forwarder unexpectedly dies.  Use of daemontools or alike to automatically (re)start the forwarder is desirable.

% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.1 forward &
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.2 forward &
...

And this is it.  The two tables: "followed_by" and "timeline" are updated automatically when the "tweet" or "following" table is modified.  Application developers do not need to take care of shard consistency any more, all that needs to be remembered is that changes should be written into the latter two (modification to the former two can be disabled by using different access privileges between the web application and the Incline forwarder).

Next Steps

Having started writing code late last month, Incline is still immature.  Current status is somewhere around early beta.  In the next months, I plan to polish up and optimize the code, add PostgreSQL support (currently Incline works only on MySQL), as well as extending it so that it could used together with Pacific, a framework I have been working, that provides dynamic node addition / removal of nodes on a sharded RDB environment.  Thank you for reading, please stay tuned.

June 15, 2009

Pacific のクライアントAPI (仮) について

 先週、概要を紹介させていただいた Pacific について。まだ API をフリーズしていないつもりなのですが、だいぶ整ってきた気がするので、ざっくりまとめておきたいと思います。


インストール手順

  1. Thrift をインストール注1
  2. Pacific の svn レポジトリからチェックアウト
  3. Perl ドライバを make (cd driver-perl && perl Makefile.PL && make all test install)
  4. リゾルバを make (cd resolver && make)

テーブルのセットアップ手順

 テーブルのセットアップは、pschema コマンドを使って行います。

# リゾルバの裏側の MySQL は 127.0.0.1:33060 で動作
#
# プライマリテーブル「user」を作成
#   ・ 分散キーの名前は「username」
#      (型は varchar(255) not null charset utf8 collate utf8-bin 固定)
#   ・ カラムとして realname varchar(255) not null,last_tweet_at int unsigned not null
#   ・ ノード内で、セカンダリテーブルとのリレーションを表現するカラム _iid
#      も生成される
#
pschema create-primary --manager=127.0.0.1:33060 --primary=user --hostport=127.0.0.1:33061 --primary-key-name=username 'realname varchar(255) not null,last_tweet_at int unsigned not null'

# セカンダリテーブル「tweet」を作成
#   ・ カラムとして mtime と body (と 上記 _iid)
#
pschema create-secondary --manager=$MANAGER --primary=user --secondary=tweet --primary-key=mtime 'mtime int unsigned not null,body varchar(255) not null'

 この結果、MySQL ノードには、以下のようなスキーマのプライマリテーブルが作成されます注2。ER図で書くと、こんな感じになります。

CREATE TABLE `user` (
  `_iid` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
  `realname` varchar(255) NOT NULL,
  PRIMARY KEY (`_iid`),
  UNIQUE KEY `_pac_key` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `tweet` (
  `_iid` int(10) unsigned NOT NULL,
  `mtime` int(10) unsigned NOT NULL,
  `body` varchar(255) NOT NULL,
  PRIMARY KEY (`_iid`,`mtime`),
  CONSTRAINT `tweet_ibfk_1` FOREIGN KEY (`_iid`) REFERENCES `user` (`_iid`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

リゾルバを起動

 先に make した pacific_resolver を起動します。TCP ポート番号は現在のところ 9306 に固定です。リゾルバが使用するデータベースの情報は、環境変数を用いて渡すことができます注3

$ PACIFIC_MGR_MYSQL_HOST=127.0.01 PACIFIC_MGR_MYSQL_PORT=33060 ./pacific_resolver

Perl クライアントからのアクセス方法

 まず、Pacific のドライバオブジェクトを生成します。

use Pacific::Driver::Direct;
use Pacific::Driver::Direct::DBI::DBD::mysql;
use Pacific::Driver::Direct::Resolver;

my $pac = Pacific::Driver::Direct->new({
    dbi => Pacific::Driver::Direct::DBI::DBD::mysql->new({
        user     => 'root',
        pass     => undef,
        database => 'pacific',
    }),
    resolver => Pacific::Driver::Direct::Resolver->new({
        host => '127.0.0.1',
        port => 9306,
    }),
});

 キーによるテーブルのルックアップは、イテレータを使って行います。Pacific は、渡されたキーがどの RDBMS ノードに属するかリゾルバに問い合わせを行い、そのキーに属するデータにリードロックをかけ、順次イテレータに値を渡してきます。

my @rows;
for (my $iter = $pac->query_iter('user', [ qw/Alice Bob Eve/ ]);
     $iter->next;
     undef) {
    my $r = $iter->dbh->selectall_arrayref(
        'SELECT username,realname FROM user WHERE ' . $iter->key_expr,
        {},
        $iter->key_values,
    ) or die $iter->dbh->errstr;
    push @rows, @$r;
}

 レンジクエリ(範囲を指定した検索)も同様に記述することができます。範囲指定の演算子は、< <= > >= を組み合わせて使うことができます。

# Bob 以降10人を取得
my @rows;
for (my $iter = $pac->query_iter('user', { '>=' => 'Bob' });
     $iter->next;
     undef) {
    my $r = $iter->dbh->selectall_arrayref(
        'SELECT username,realname FROM user WHERE ' . $iter->key_expr
            . 'LIMIT ?',
        {},
        $iter->key_values,
        10 - @rows,
    ) or die $iter->dbh->errstr;
    push @rows, @$r;
    last if @rows >= 10;
}

 書き込みにあたっては、query_iter の代わりに modify_iter を使用します注4

for (my $iter = $pac->modify_iter('user', qw[ /Alice Bob/ ]);
     $iter->next;
     undef) {
    $iter->dbh->do(
        'UPDATE user SET hitpoint=hitpoint+10 WHERE ' . $iter->key_expr,
        {},
        $iter->key_values,
    ) or die $iter->dbh->errstr;
}

 ノード内でトランザクションを組むこともできます (下の例のように、単一のキーにアクセスする場合も、イテレータを使います)。

# tweet テーブルに発言を追加し、user テーブルの最終発言時刻を更新
for (my $iter = $pac->modify_iter(
         'user', [ qw/Alice/ ], { transactional => 1 },
     );
     $iter->next;
     undef) {
    $iter->dbh->do(
        'INSERT INTO tweet (_iid,mtime,body) VALUES'
            . ' ((SELECT _iid FROM user WHERE username=?),?,G)',
        {},
        'Alice', $tweet_at, $tweet,
    ) or die $iter->dbh->errstr;
    $iter->dbh->do(
        'UPDATE user SET last_tweet_at=? WHERE user=?',
        {},
        $tweet_at, 'Alice',
    ) or die $iter->dbh->errstr;
}

 また、より高レベルな ORM っぽいインターフェイスを提供する Pacific::Driver::Direct::Accessor モジュールもありますが、自分は元来 ORM 不要派で経験値が低いので、あまり深入りしたくない (深入りしたところでいいものができないと思ってる) 気持ちです。

 それではひとまずこのあたりで。ノードの分割/再配置に使う prelocate コマンドについても、また書きたいと思います。

注1. Pacific の開発は、Thrift の同ページにある Archived release (r760184) を使って行っています。でも、インストールが面倒なわりに Thrift の Perl クライアントはおそいので、何か別のトランスポートに換えようとと考えています。
注2. ノード内部でリレーションを表現するために _iid という値を別途使用するのは、空間効率を高める一方で、データの分断につながります。ですので、今後、_iid を使わず、セカンダリテーブルにも直接分散キーを書き込むモデルをサポートすることも考えています
注3. 使用可能な環境変数については、MySQLDriver.cpp を参考にしてください
注4. 範囲を指定した更新については、現時点で未対応です

June 10, 2009

Pacific という名前の分散ストレージを作り始めた件

 大規模なウェブアプリケーションのボトルネックがデータベースであるという点については、多くの同意が得られるところだと思います。解決策としては、同じ種類のデータを複数の RDBMS に保存する「sharding」 (別名:アプリケーションレベルパーティショニング/レベル2分散注1) が一般的ですが、最近では、分散キーバリューストア (分散 KVS) を使おうとする試みもみられるようになってきています。

 分散 KVS が RDBMS sharding に対して優れている要素としては、事前の分割設計が不要で、動的なノード追加(とそれにともなう負荷の再分散)が容易、といった点が挙げられると思います。一方で、KaiKumofs のような最近の実装では eventually consistent でこそ無くなってきているものの、ハッシュベースの分散 KVS は、レンジクエリができなかったり (例: 最新5件の日記を表示)、トランザクションがないためアプリケーションプログラムが複雑になったりするという問題を抱えています。

 では、どうすればいいのか? MySQL や PostgreSQL を使った RDBMS sharding でも、動的なノード追加(と無停止での負荷の再分散)を実現したい。というのが、今回コードを書き始めた動機でした。それが Pacific です。

 技術的には、大して複雑ではありません。Pacific は、パーティショニング情報とロックを管理する中央サーバ(リゾルバと呼んでいます)と、実際のデータを保存する RDBMS のノード群によって構成されます。

 Pacific では、レンジクエリを実行するために、ユニークキーを利用したレンジパーティショニングを行います。レンジパーティショニングは、ハッシュベースのパーティショニングよりもデータの局所性が向上するので、パフォーマンスや障害の局所性が高まるという効果も期待できます。

 また、トランザクションを可能にするためには、関連するデータが常に同一のノード上に配置される必要があるため注2、全てのデータがパーティショニング用のキーに関連づけられるようなテーブル設計を強制することになります。このデータモデルは、(Pacific が RDBMS 上の分散ストレージであるという点を除けば) Google App Engine の Data Store注3 と同様です。Pacific では、パーティショニング用のキーを含むテーブルをプライマリテーブル、プライマリテーブルと 1:1 または 1:n のリレーションをもつテーブルをセカンダリテーブルと呼んでいます。

 データの再配置は、単一の (あるいは数個の) ユニークキー単位で、1) そのキーに属する全データに排他的書き込みロックをかけ、2) データを別ノードにコピー、3) パーティショニング情報を更新して書き込みロック解除、 4) 旧ノードから読んでいるクライアントがいなくなった時点で旧ノード上のデータ削除、という操作を繰り返すことで行います。再配置中に読み込みがブロックされることはありませんし、書き込みがブロックされる時間も、エンドユーザーが意識しなくていい程度に抑えることができる、と考えています注4

 一番アクセスが集中するのはリゾルバということになりますが、パーティショニング情報の変更は少ないことが予測できますから、ストレージへのアクセス数が 10万 QPS 程度になるまでは問題は発生しないと思います注5。また、パーティショニング情報は RDBMS に保存されるため、リゾルバが不正終了しても、データの不整合が発生することはありません。

 Pacific については、コードは公開の svn レポジトリ注6上においてありますが、現状、テストコードとラフなサンプルが動いている程度で、ドキュメントが全く未整備です。進捗や具体的な使い方等については、今後このブログで書いて行きたいと思います。

17:12追記: 高可用性については、ウェブアプリケーションが使う分散ストレージの場合、ネットワーク分断が発生しない(冗長化によって防止できる)ので、ノードをまたがるような冗長化は必要なく、各ノード毎にクラスタを組めばいいという考えです。

注1. ミクシィのCTOが語る「mixiはいかにして増え続けるトラフィックに対処してきたか」:ITpro
注2. 多くのトランザクションは、関連する数個のテーブルに対する局所的な操作であるという仮定の下、同一のキーに属するデータ内でのみトランザクショナルな操作を可能としています
注3. The Python Datastore API - Google App Engine - Google Code
注4. 書き込みがブロックされる時間は、特定のキーに属するデータサイズ (Google App Engine で言うところのエンティティグループの大きさ) をコピーする時間に依存しますが、一番遅いのはHDDにシーケンシャル書き込みになるでしょうから、数MB/s 程度は目指したいところです
注5. より高いパフォーマンスが必要なら、リゾルバをレプリケーション対応化すればいいという話です。同時に、各ノードへの直接接続をやめて、中継サーバを用意するといった作業も必要になるでしょう
注6. http://kazuho.31tools.com/svn/pacific/