Overview

The replication feature of Apache HBase (TM) provides a way to copy data between HBase deployments. It can serve as a disaster recovery solution and can contribute to provide higher availability at the HBase layer. It can also serve more practically; for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce" cluster which will process old and new data and ship back the results automatically.

The basic architecture pattern used for Apache HBase replication is (HBase cluster) master-push; it is much easier to keep track of what’s currently being replicated since each region server has its own write-ahead-log (aka WAL or HLog), just like other well known solutions like MySQL master/slave replication where there’s only one bin log to keep track of. One master cluster can replicate to any number of slave clusters, and each region server will participate to replicate their own stream of edits. For more information on the different properties of master/slave replication and other types of replication, please consult How Google Serves Data From Multiple Datacenters.

The replication is done asynchronously, meaning that the clusters can be geographically distant, the links between them can be offline for some time, and rows inserted on the master cluster won’t be available at the same time on the slave clusters (eventual consistency).

The replication format used in this design is conceptually the same as MySQL’s statement-based replication . Instead of SQL statements, whole WALEdits (consisting of multiple cell inserts coming from the clients' Put and Delete) are replicated in order to maintain atomicity.

The HLogs from each region server are the basis of HBase replication, and must be kept in HDFS as long as they are needed to replicate data to any slave cluster. Each RS reads from the oldest log it needs to replicate and keeps the current position inside ZooKeeper to simplify failure recovery. That position can be different for every slave cluster, same for the queue of HLogs to process.

The clusters participating in replication can be of asymmetric sizes and the master cluster will do its “best effort” to balance the stream of replication on the slave clusters by relying on randomization.

As of version 0.92, Apache HBase supports master/master and cyclic replication as well as replication to multiple slaves.

Enabling replication

The guide on enabling and using cluster replication is contained in the API documentation shipped with your Apache HBase distribution.

The most up-to-date documentation is available at this address.

Life of a log edit

The following sections describe the life of a single edit going from a client that communicates with a master cluster all the way to a single slave cluster.

Normal processing

The client uses an API that sends a Put, Delete or ICV to a region server. The key values are transformed into a WALEdit by the region server and is inspected by the replication code that, for each family that is scoped for replication, adds the scope to the edit. The edit is appended to the current WAL and is then applied to its MemStore.

In a separate thread, the edit is read from the log (as part of a batch) and only the KVs that are replicable are kept (that is, that they are part of a family scoped GLOBAL in the family's schema, non-catalog so not .META. or -ROOT-, and did not originate in the target slave cluster - in case of cyclic replication).

The edit is then tagged with the master's cluster UUID. When the buffer is filled, or the reader hits the end of the file, the buffer is sent to a random region server on the slave cluster.

Synchronously, the region server that receives the edits reads them sequentially and separates each of them into buffers, one per table. Once all edits are read, each buffer is flushed using HTable, the normal HBase client.The master's cluster UUID is retained in the edits applied at the slave cluster in order to allow cyclic replication.

Back in the master cluster's region server, the offset for the current WAL that's being replicated is registered in ZooKeeper.

Non-responding slave clusters

The edit is inserted in the same way.

In the separate thread, the region server reads, filters and buffers the log edits the same way as during normal processing. The slave region server that's contacted doesn't answer to the RPC, so the master region server will sleep and retry up to a configured number of times. If the slave RS still isn't available, the master cluster RS will select a new subset of RS to replicate to and will retry sending the buffer of edits.

In the mean time, the WALs will be rolled and stored in a queue in ZooKeeper. Logs that are archived by their region server (archiving is basically moving a log from the region server's logs directory to a central logs archive directory) will update their paths in the in-memory queue of the replicating thread.

When the slave cluster is finally available, the buffer will be applied the same way as during normal processing. The master cluster RS will then replicate the backlog of logs.

Internals

This section describes in depth how each of replication's internal features operate.

Replication Zookeeper State

HBase replication maintains all of its state in Zookeeper. By default, this state is contained in the base znode:

                /hbase/replication
        

There are three major child znodes in the base replication znode:

  • State znode: /hbase/replication/state
  • Peers znode: /hbase/replication/peers
  • RS znode: /hbase/replication/rs

The State znode

The state znode indicates whether or not replication is enabled on the cluster corresponding to this zookeeper quorum. It does not have any child znodes and simply contains a boolean value. This value is initialized on startup based on the hbase.replication config parameter in the hbase-site.xml file. The status value is read/maintained by the ReplicationZookeeper.ReplicationStatusTracker class. It is also cached locally using an AtomicBoolean in the ReplicationZookeeper class. This value can be changed on a live cluster using the stop_replication command available through the hbase shell.

                /hbase/replication/state [VALUE: true]
            

The Peers znode

The peers znode contains a list of all peer replication clusters and the current replication state of those clusters. It has one child peer znode for each peer cluster. The peer znode is named with the cluster id provided by the user in the HBase shell. The value of the peer znode contains the peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of zookeeper nodes in the clusters quorum, the client port for the zookeeper quorum, and the base znode for HBase (i.e. “zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase”).

                /hbase/replication/peers
                    /1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
                    /2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
            

Each of these peer znodes has a child znode that indicates whether or not replication is enabled on that peer cluster. These peer-state znodes do not have child znodes and simply contain a boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the ReplicationPeer.PeerStateTracker class. It is also cached locally using an AtomicBoolean in the ReplicationPeer class.

                /hbase/replication/peers
                    /1/peer-state [Value: ENABLED]
                    /2/peer-state [Value: DISABLED]
            

The RS znode

The rs znode contains a list of all outstanding HLog files in the cluster that need to be replicated. The list is divided into a set of queues organized by region server and the peer cluster the region server is shipping the HLogs to. The rs znode has one child znode for each region server in the cluster. The child znode name is simply the regionserver name (a concatenation of the region server’s hostname, client port and start code). These region servers could either be dead or alive.

                /hbase/replication/rs
                    /hostname.example.org,6020,1234
                    /hostname2.example.org,6020,2856
            

Within each region server znode, the region server maintains a set of HLog replication queues. Each region server has one queue for every peer cluster it replicates to. These queues are represented by child znodes named using the cluster id of the peer cluster they represent (see the peer znode section).

                /hbase/replication/rs
                    /hostname.example.org,6020,1234
                        /1
                        /2
            

Each queue has one child znode for every HLog that still needs to be replicated. The value of these HLog child znodes is the latest position that has been replicated. This position is updated every time a HLog entry is replicated.

                /hbase/replication/rs
                    /hostname.example.org,6020,1234
                        /1
                            23522342.23422 [VALUE: 254]
                            12340993.22342 [VALUE: 0]
            

Configuration Parameters

Zookeeper znode paths

All of the base znode names are configurable through parameters:

Parameter Default Value
zookeeper.znode.parent /hbase
zookeeper.znode.replication replication
zookeeper.znode.replication.peers peers
zookeeper.znode.replication.peers.state peer-state
zookeeper.znode.replication.rs rs

The default replication znode structure looks like the following:

                /hbase/replication/state
                /hbase/replication/peers/{peerId}/peer-state
                /hbase/replication/rs
            

Other parameters

  • hbase.replication (Default: false) - Controls whether replication is enabled or disabled for the cluster.
  • replication.sleep.before.failover (Default: 2000) - The amount of time a failover worker waits before attempting to replicate a dead region server’s HLog queues.
  • replication.executor.workers (Default: 1) - The number of dead region servers one region server should attempt to failover simultaneously.

Choosing region servers to replicate to

When a master cluster RS initiates a replication source to a slave cluster, it first connects to the slave's ZooKeeper ensemble using the provided cluster key (that key is composed of the value of hbase.zookeeper.quorum, zookeeper.znode.parent and hbase.zookeeper.property.clientPort). It then scans the "rs" directory to discover all the available sinks (region servers that are accepting incoming streams of edits to replicate) and will randomly choose a subset of them using a configured ratio (which has a default value of 10%). For example, if a slave cluster has 150 machines, 15 will be chosen as potential recipient for edits that this master cluster RS will be sending. Since this is done by all master cluster RSs, the probability that all slave RSs are used is very high, and this method works for clusters of any size. For example, a master cluster of 10 machines replicating to a slave cluster of 5 machines with a ratio of 10% means that the master cluster RSs will choose one machine each at random, thus the chance of overlapping and full usage of the slave cluster is higher.

A ZK watcher is placed on the ${zookeeper.znode.parent}/rs node of the slave cluster by each of the master cluster's region servers. This watch is used to monitor changes in the composition of the slave cluster. When nodes are removed from the slave cluster (or if nodes go down and/or come back up), the master cluster's region servers will respond by selecting a new pool of slave region servers to replicate to.

Keeping track of logs

Every master cluster RS has its own znode in the replication znodes hierarchy. It contains one znode per peer cluster (if 5 slave clusters, 5 znodes are created), and each of these contain a queue of HLogs to process. Each of these queues will track the HLogs created by that RS, but they can differ in size. For example, if one slave cluster becomes unavailable for some time then the HLogs should not be deleted, thus they need to stay in the queue (while the others are processed). See the section named "Region server failover" for an example.

When a source is instantiated, it contains the current HLog that the region server is writing to. During log rolling, the new file is added to the queue of each slave cluster's znode just before it's made available. This ensures that all the sources are aware that a new log exists before HLog is able to append edits into it, but this operations is now more expensive. The queue items are discarded when the replication thread cannot read more entries from a file (because it reached the end of the last block) and that there are other files in the queue. This means that if a source is up-to-date and replicates from the log that the region server writes to, reading up to the "end" of the current file won't delete the item in the queue.

When a log is archived (because it's not used anymore or because there's too many of them per hbase.regionserver.maxlogs typically because insertion rate is faster than region flushing), it will notify the source threads that the path for that log changed. If the a particular source was already done with it, it will just ignore the message. If it's in the queue, the path will be updated in memory. If the log is currently being replicated, the change will be done atomically so that the reader doesn't try to open the file when it's already moved. Also, moving a file is a NameNode operation so, if the reader is currently reading the log, it won't generate any exception.

Reading, filtering and sending edits

By default, a source will try to read from a log file and ship log entries as fast as possible to a sink. This is first limited by the filtering of log entries; only KeyValues that are scoped GLOBAL and that don't belong to catalog tables will be retained. A second limit is imposed on the total size of the list of edits to replicate per slave, which by default is 64MB. This means that a master cluster RS with 3 slaves will use at most 192MB to store data to replicate. This doesn't account the data filtered that wasn't garbage collected.

Once the maximum size of edits was buffered or the reader hits the end of the log file, the source thread will stop reading and will choose at random a sink to replicate to (from the list that was generated by keeping only a subset of slave RSs). It will directly issue a RPC to the chosen machine and will wait for the method to return. If it's successful, the source will determine if the current file is emptied or if it should continue to read from it. If the former, it will delete the znode in the queue. If the latter, it will register the new offset in the log's znode. If the RPC threw an exception, the source will retry 10 times until trying to find a different sink.

Cleaning logs

If replication isn't enabled, the master's logs cleaning thread will delete old logs using a configured TTL. This doesn't work well with replication since archived logs passed their TTL may still be in a queue. Thus, the default behavior is augmented so that if a log is passed its TTL, the cleaning thread will lookup every queue until it finds the log (while caching the ones it finds). If it's not found, the log will be deleted. The next time it has to look for a log, it will first use its cache.

Region server failover

As long as region servers don't fail, keeping track of the logs in ZK doesn't add any value. Unfortunately, they do fail, so since ZooKeeper is highly available we can count on it and its semantics to help us managing the transfer of the queues.

All the master cluster RSs keep a watcher on every other one of them to be notified when one dies (just like the master does). When it happens, they all race to create a znode called "lock" inside the dead RS' znode that contains its queues. The one that creates it successfully will proceed by transferring all the queues to its own znode (one by one since ZK doesn't support the rename operation) and will delete all the old ones when it's done. The recovered queues' znodes will be named with the id of the slave cluster appended with the name of the dead server.

Once that is done, the master cluster RS will create one new source thread per copied queue, and each of them will follow the read/filter/ship pattern. The main difference is that those queues will never have new data since they don't belong to their new region server, which means that when the reader hits the end of the last log, the queue's znode will be deleted and the master cluster RS will close that replication source.

For example, consider a master cluster with 3 region servers that's replicating to a single slave with id '2'. The following hierarchy represents what the znodes layout could be at some point in time. We can see the RSs' znodes all contain a "peers" znode that contains a single queue. The znode names in the queues represent the actual file names on HDFS in the form "address,port.timestamp".

/hbase/replication/rs/
                      1.1.1.1,60020,123456780/
                          2/
                              1.1.1.1,60020.1234  (Contains a position)
                              1.1.1.1,60020.1265
                      1.1.1.2,60020,123456790/
                          2/
                              1.1.1.2,60020.1214  (Contains a position)
                              1.1.1.2,60020.1248
                              1.1.1.2,60020.1312
                      1.1.1.3,60020,    123456630/
                          2/
                              1.1.1.3,60020.1280  (Contains a position)
        

Now let's say that 1.1.1.2 loses its ZK session. The survivors will race to create a lock, and for some reasons 1.1.1.3 wins. It will then start transferring all the queues to its local peers znode by appending the name of the dead server. Right before 1.1.1.3 is able to clean up the old znodes, the layout will look like the following:

/hbase/replication/rs/
                      1.1.1.1,60020,123456780/
                          2/
                              1.1.1.1,60020.1234  (Contains a position)
                              1.1.1.1,60020.1265
                      1.1.1.2,60020,123456790/
                          lock
                          2/
                              1.1.1.2,60020.1214  (Contains a position)
                              1.1.1.2,60020.1248
                              1.1.1.2,60020.1312
                      1.1.1.3,60020,123456630/
                          2/
                              1.1.1.3,60020.1280  (Contains a position)

                          2-1.1.1.2,60020,123456790/
                              1.1.1.2,60020.1214  (Contains a position)
                              1.1.1.2,60020.1248
                              1.1.1.2,60020.1312
        

Some time later, but before 1.1.1.3 is able to finish replicating the last HLog from 1.1.1.2, let's say that it dies too (also some new logs were created in the normal queues). The last RS will then try to lock 1.1.1.3's znode and will begin transferring all the queues. The new layout will be:

/hbase/replication/rs/
                      1.1.1.1,60020,123456780/
                          2/
                              1.1.1.1,60020.1378  (Contains a position)

                          2-1.1.1.3,60020,123456630/
                              1.1.1.3,60020.1325  (Contains a position)
                              1.1.1.3,60020.1401

                          2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
                              1.1.1.2,60020.1312  (Contains a position)
                      1.1.1.3,60020,123456630/
                          lock
                          2/
                              1.1.1.3,60020.1325  (Contains a position)
                              1.1.1.3,60020.1401

                          2-1.1.1.2,60020,123456790/
                              1.1.1.2,60020.1312  (Contains a position)
        

Replication Metrics

Following the some useful metrics which can be used to check the replication progress:
  • source.sizeOfLogQueue: number of HLogs to process (excludes the one which is being processed) at the Replication source
  • source.shippedOps: number of mutations shipped
  • source.logEditsRead: number of mutations read from HLogs at the replication source
  • source.ageOfLastShippedOp: age of last batch that was shipped by the replication source
Please note that the above metrics are at the global level at this regionserver. In 0.95.0 and onwards, these metrics are also exposed per peer level.

FAQ

GLOBAL means replicate? Any provision to replicate only to cluster X and not to cluster Y? or is that for later?

Yes, this is for much later.

You need a bulk edit shipper? Something that allows you transfer 64MB of edits in one go?

You can use the HBase-provided utility called CopyTable from the package org.apache.hadoop.hbase.mapreduce in order to have a discp-like tool to bulk copy data.

Is it a mistake that WALEdit doesn't carry Put and Delete objects, that we have to reinstantiate not only when replicating but when replaying edits also?

Yes, this behavior would help a lot but it's not currently available in HBase (BatchUpdate had that, but it was lost in the new API).