« パストラック (Pathtraq) のおしらせ:総選挙特集ページ開設と IE 版プラグインのバージョンアップについて | Main | MySQL のボトルネックを統計的に監視・解析する方法 »

July 14, 2009

Intruducing Incline - a synchronization tool for RDB shards

For the last weeks, I have been writing a tool called "Incline," a program that automatically maintains consistency between sharded MySQL databases.  The aim of the software is to free application developers from hand-writing code for keeping consistency between RDB nodes, so that they can concentrate on writing the application logic.

Background

Denormalization is unavoidable in a sharded RDB environment.  For example, when a message is sent from a user to another user, the information should be stored on a database node where the sender of the message belongs to, and on another node where the receiver does.  In most cases, denormalization logic is hand-written by web application developers, and since it has been a burden for creating large-scale web services.  Incline takes off the load from developers.  By reading the definition files, Incline keeps the tables on a sharded MySQL environment in sync, by providing a trigger-generator and a replication program that, synchronously or asynchronously reflects the changes of source tables into materialized-view-like tables.

Installing Incline

Incline is written in C++ and uses autotools for building the tools.  However, since I have not yet added automatic lookup for libmysqlclient, you need to specify their location manually to build Incline.  My build procedure is as follows.

% svn co http://kazuho.31tools.com/svn/incline/trunk incline
% cd incline
% autoreconf -i
% ./configure CFLAGS=-I/usr/local/mysql/include/mysql CXXFLAGS=-I/usr/local/mysql/include/mysql LDFLAGS='-L/usr/local/mysql/lib/mysql -lmysqlclient'
% make
% make install

Defining Replication Rules

Replication rules of Incline are written using JSON files.  Consider creating a twitter-like microblog on a shared environment.  It would be consisted of four tables, each of them distributed through RDB shards by the user_id column.  The "tweet" table and "following" table are updated by user actions, while "followed_by" table and "timeline" table are denormalized tables that need to be kept synchronized to the former two.

CREATE TABLE tweet (
  tweet_id INT UNSIGNED NOT NULL AUTO_INCREMENT,
  user_id INT UNSIGNED NOT NULL,
  body VARCHAR(255) NOT NULL,
  ctime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (tweet_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE following (
  user_id INT UNSIGNED NOT NULL,
  following_id INT UNSIGNED NOT NULL,
  PRIMARY KEY (user_id,following_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE followed_by (
  user_id INT UNSIGNED NOT NULL,
  follower_id INT UNSIGNED NOT NULL,
  PRIMARY KEY (user_id,follower_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE timeline (
  user_id INT UNSIGNED NOT NULL,
  tweet_user_id INT UNSIGNED NOT NULL,
  tweet_id INT UNSIGNED NOT NULL,
  ctime TIMESTAMP NOT NULL,
  PRIMARY KEY (user_id,tweet_user_id,tweet_id),
  KEY user_ctime_tweet_user_tweet (user_id,ctime,tweet_user_id,tweet_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Incline uses two JSON files to define a replication rule.  The first file, microblog.json defines the mapping of columns using directives "pk_columns" (primary key columns) and "npk_columns" (non-pk_columns) between the "source" table(s) and "destination" of the replication.  When merging more than two tables to a single destination, "merge_cond" attribute is used to define the inner join condition(s).

microblog.json
[
  {
    "source"      : [ "tweet", "followed_by" ],
    "destination" : "timeline",
    "pk_columns"  : {
      "followed_by.follower_id" : "user_id",
      "tweet.user_id"           : "tweet_user_id",
      "tweet.tweet_id"          : "tweet_id"
    },
    "npk_columns" : {
      "tweet.ctime" : "ctime"
    },
    "merge"       : {
      "tweet.user_id" : "followed_by.user_id"
    },
    "shard-key"   : "user_id"
  },
  {
    "source"      : "following",
    "destination" : "followed_by",
    "pk_columns"  : {
       "following.following_id" : "user_id",
       "following.user_id"      : "follower_id"
    },
    "shard-key"   : "user_id"
  }
]

The second file shard.json defines the mapping between user_id and RDB nodes.  Range-based sharding (on an integer column) is specified in the example.  Other algorithm currently supported are: range-str-case-sensitive and hash-int.

shard.json
{
  "algorithm" : "range-int",
  "map"       : {
    "0"    : "10.0.1.1:3306",
    "1000" : "10.0.1.2:3306",
    "2000" : "10.0.1.3:3306",
    "3000" : "10.0.1.4:3306"
  }
}

With the definition, tables will be synchorized in the direction described in the figure below (only illustrates two nodes).

Running Incline

To run Incline, queue tables should be created and database triggers need to be installed on each RDB node.  This can be done by calling "incline create-queue" and "incline create-trigger" commands for each node, and the setup is complete.

% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.1 create-queue
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.1 create-trigger
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.2 create-queue
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.2 create-trigger
...

The installed triggers take care of synchronizing the denormalized tables within each node.  The next step is to run the forwarder (replicator between RDB nodes) for each node, so that the views should be kept in sync.  This can be done by calling "incline forward."The forwarding logic is defined in a way that the data stored in RDB nodes would become eventually synchronized even if either of the RDB nodes or the forwarder unexpectedly dies.  Use of daemontools or alike to automatically (re)start the forwarder is desirable.

% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.1 forward &
% incline --mode=shard --source=microblog.json --shard-source=shard.json --database=microblog --mysql-host=10.0.1.2 forward &
...

And this is it.  The two tables: "followed_by" and "timeline" are updated automatically when the "tweet" or "following" table is modified.  Application developers do not need to take care of shard consistency any more, all that needs to be remembered is that changes should be written into the latter two (modification to the former two can be disabled by using different access privileges between the web application and the Incline forwarder).

Next Steps

Having started writing code late last month, Incline is still immature.  Current status is somewhere around early beta.  In the next months, I plan to polish up and optimize the code, add PostgreSQL support (currently Incline works only on MySQL), as well as extending it so that it could used together with Pacific, a framework I have been working, that provides dynamic node addition / removal of nodes on a sharded RDB environment.  Thank you for reading, please stay tuned.

TrackBack

TrackBack URL for this entry:
http://bb.lekumo.jp/t/trackback/404050/20578905

Listed below are links to weblogs that reference Intruducing Incline - a synchronization tool for RDB shards:

Comments

Post a comment