17.6. Cluster Replication

Note

This information was previously available at Cluster Replication.

HBase provides a cluster replication mechanism which allows you to keep one cluster's state synchronized with that of another cluster, using the write-ahead log (WAL) of the source cluster to propagate the changes. Some use cases for cluster replication include:

Note

Replication is enabled at the granularity of the column family. Before enabling replication for a column family, create the table and all column families to be replicated, on the destination cluster.

Cluster replication uses a source-push methodology. An HBase cluster can be a source (also called master or active, meaning that it is the originator of new data), a destination (also called slave or passive, meaning that it receives data via replication), or can fulfill both roles at once. Replication is asynchronous, and the goal of replication is eventual consistency. When the source receives an edit to a column family with replication enabled, that edit is propagated to all destination clusters using the WAL for that for that column family on the RegionServer managing the relevant region.

When data is replicated from one cluster to another, the original source of the data is tracked via a cluster ID which is part of the metadata. In HBase 0.96 and newer (HBASE-7709), all clusters which have already consumed the data are also tracked. This prevents replication loops.

The WALs for each region server must be kept in HDFS as long as they are needed to replicate data to any slave cluster. Each region server reads from the oldest log it needs to replicate and keeps track of its progress processing WALs inside ZooKeeper to simplify failure recovery. The position marker which indicates a slave cluster's progress, as well as the queue of WALs to process, may be different for every slave cluster.

The clusters participating in replication can be of different sizes. The master cluster relies on randomization to attempt to balance the stream of replication on the slave clusters. It is expected that the slave cluster has storage capacity to hold the replicated data, as well as any data it is responsible for ingesting. If a slave cluster does run out of room, or is inaccessible for other reasons, it throws an error and the master retains the WAL and retries the replication at intervals.

Terminology Changes

Previously, terms such as master-master, master-slave, and cyclical were used to describe replication relationships in HBase. These terms added confusion, and have been abandoned in favor of discussions about cluster topologies appropriate for different scenarios.

Cluster Topologies

Multiple levels of replication may be chained together to suit your organization's needs. The following diagram shows a hypothetical scenario. Use the arrows to follow the data paths.

Figure 17.5. Example of a Complex Cluster Replication Configuration

Example of a Complex Cluster Replication Configuration

At the top of the diagram, the San Jose and Tokyo clusters, shown in red, replicate changes to each other, and each also replicates changes to a User Data and a Payment Data cluster.

Each cluster in the second row, shown in blue, replicates its changes to the All Data Backup 1 cluster, shown in grey. The All Data Backup 1 cluster replicates changes to the All Data Backup 2 cluster (also shown in grey), as well as the Data Analysis cluster (shown in green). All Data Backup 2 also propagates any of its own changes back to All Data Backup 1.

The Data Analysis cluster runs MapReduce jobs on its data, and then pushes the processed data back to the San Jose and Tokyo clusters.


HBase replication borrows many concepts from the statement-based replication design used by MySQL. Instead of SQL statements, entire WALEdits (consisting of multiple cell inserts coming from Put and Delete operations on the clients) are replicated in order to maintain atomicity.

17.6.1. Configuring Cluster Replication

The following is a simplified procedure for configuring cluster replication. It may not cover every edge case. For more information, see the API documentation for replication.

  • Configure and start the source and destination clusters. Create tables with the same names and column families on both the source and destination clusters, so that the destination cluster knows where to store data it will receive. All hosts in the source and destination clusters should be reachable to each other.

  • On the source cluster, enable replication by setting hbase.replication to true in hbase-site.xml.

  • On the source cluster, in HBase Shell, add the destination cluster as a peer, using the add_peer command. The syntax is as follows:

    hbase< add_peer 'ID' 'CLUSTER_KEY'

    The ID is a string (prior to HBASE-11367, it was a short integer), which must not contain a hyphen (see HBASE-11394). To compose the CLUSTER_KEY, use the following template:

    hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent

    If both clusters use the same ZooKeeper cluster, you must use a different zookeeper.znode.parent, because they cannot write in the same folder.

  • On the source cluster, configure each column family to be replicated by setting its REPLICATION_SCOPE to 1, using commands such as the following in HBase Shell.

    hbase> disable 'example_table' 
    hbase> alter 'example_table', {NAME => 'example_family', REPLICATION_SCOPE => '1'} 
    hbase> enable 'example_table'
  • You can verify that replication is taking place by examining the logs on the source cluster for messages such as the following.

    Considering 1 rs, with ratio 0.1
    Getting 1 rs from peer cluster # 0 
    Choosing peer 10.10.1.49:62020
              
  • To verify the validity of replicated data, you can use the included VerifyReplication MapReduce job on the source cluster, providing it with the ID of the replication peer and table name to verify. Other options are possible, such as a time range or specific families to verify.

    The command has the following form:

    hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication [--starttime=timestamp1] [--stoptime=timestamp [--families=comma separated list of families] <peerId><tablename>

    The VerifyReplication command prints out GOODROWS and BADROWS counters to indicate rows that did and did not replicate correctly.

17.6.2. Detailed Information About Cluster Replication

Figure 17.6. Replication Architecture Overview

Replication Architecture Overview

Illustration of the replication architecture in HBase, as described in the prior text.


17.6.2.1. Life of a WAL Edit

A single WAL edit goes through several steps in order to be replicated to a slave cluster.

When the slave responds correctly:

  1. An HBase client uses a Put or Delete operation to manipulate data in HBase.

  2. The region server writes the request to the WAL in a way allows it to be replayed if it is not written successfully.

  3. If the changed cell corresponds to a column family that is scoped for replication, the edit is added to the queue for replication.

  4. In a separate thread, the edit is read from the log, as part of a batch process. Only the KeyValues that are eligible for replication are kept. Replicable KeyValues are part of a column family whose schema is scoped GLOBAL, are not part of a catalog such as hbase:meta, did not originate from the target slave cluster, and have not already been consumed by the target slave cluster.

  5. The edit is tagged with the master's UUID and added to a buffer. When the buffer is filled, or the reader reaches the end of the file, the buffer is sent to a random region server on the slave cluster.

  6. The region server reads the edits sequentially and separates them into buffers, one buffer per table. After all edits are read, each buffer is flushed using HTable, HBase's normal client. The master's UUID and the UUIDs of slaves which have already consumed the data are preserved in the edits they are applied, in order to prevent replication loops.

  7. In the master, the offset for the WAL that is currently being replicated is registered in ZooKeeper.

When the slave does not respond:

  1. The first three steps, where the edit is inserted, are identical.

  2. Again in a separate thread, the region server reads, filters, and edits the log edits in the same way as above. The slave region server does not answer the RPC call.

  3. The master sleeps and tries again a configurable number of times.

  4. If the slave region server is still not available, the master selects a new subset of region server to replicate to, and tries again to send the buffer of edits.

  5. Meanwhile, the WALs are rolled and stored in a queue in ZooKeeper. Logs that are archived by their region server, by moving them from the region server's log directory to a central log directory, will update their paths in the in-memory queue of the replicating thread.

  6. When the slave cluster is finally available, the buffer is applied in the same way as during normal processing. The master region server will then replicate the backlog of logs that accumulated during the outage.

Spreading Queue Failover Load. When replication is active, a subset of region servers in the source cluster is responsible for shipping edits to the sink. This responsibility must be failed over like all other region server functions should a process or node crash. The following configuration settings are recommended for maintaining an even distribution of replication activity over the remaining live servers in the source cluster:

  • Set replication.source.maxretriesmultiplier to 300.

  • Set replication.source.sleepforretries to 1 (1 second). This value, combined with the value of replication.source.maxretriesmultiplier, causes the retry cycle to last about 5 minutes.

  • Set replication.sleep.before.failover to 30000 (30 seconds) in the source cluster site configuration.

Preserving Tags During Replication. By default, the codec used for replication between clusters strips tags, such as cell-level ACLs, from cells. To prevent the tags from being stripped, you can use a different codec which does not strip them. Configure hbase.replication.rpc.codec to use org.apache.hadoop.hbase.codec.KeyValueCodecWithTags, on both the source and sink RegionServers involved in the replication. This option was introduced in HBASE-10322.

17.6.2.2. Replication Internals

Replication State in ZooKeeper

HBase replication maintains its state in ZooKeeper. By default, the state is contained in the base node /hbase/replication. This node contains two child nodes, the Peers znode and the RS znode.

Warning

Replication may be disrupted and data loss may occur if you delete the replication tree (/hbase/replication/) from ZooKeeper. This is despite the information about invariants at Section 18.10.4.1, “No permanent state in ZooKeeper”. Follow progress on this issue at HBASE-10295.

The Peers Znode

The peers znode is stored in /hbase/replication/peers by default. It consists of a list of all peer replication clusters, along with the status of each of them. The value of each peer is its cluster key, which is provided in the HBase Shell. The cluster key contains a list of ZooKeeper nodes in the cluster's quorum, the client port for the ZooKeeper quorum, and the base znode for HBase in HDFS on that cluster.

/hbase/replication/peers
  /1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
  /2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]            
            

Each peer has a child znode which indicates whether or not replication is enabled on that cluster. These peer-state znodes do not contain any child znodes, but only contain a Boolean value. This value is read and maintained by the ReplicationPeer.PeerStateTracker class.

/hbase/replication/peers
  /1/peer-state [Value: ENABLED]
  /2/peer-state [Value: DISABLED]
            
The RS Znode

The rs znode contains a list of WAL logs which need to be replicated. This list is divided into a set of queues organized by region server and the peer cluster the region server is shipping the logs to. The rs znode has one child znode for each region server in the cluster. The child znode name is the region server's hostname, client port, and start code. This list includes both live and dead region servers.

/hbase/replication/rs
  /hostname.example.org,6020,1234
  /hostname2.example.org,6020,2856            
            

Each rs znode contains a list of WAL replication queues, one queue for each peer cluster it replicates to. These queues are represented by child znodes named by the cluster ID of the peer cluster they represent.

/hbase/replication/rs
  /hostname.example.org,6020,1234
    /1
    /2            
           

Each queue has one child znode for each WAL log that still needs to be replicated. the value of these child znodes is the last position that was replicated. This position is updated each time a WAL log is replicated.

/hbase/replication/rs
  /hostname.example.org,6020,1234
    /1
      23522342.23422 [VALUE: 254]
      12340993.22342 [VALUE: 0]            
            

17.6.2.3. Choosing Region Servers to Replicate To

When a master cluster region server initiates a replication source to a slave cluster, it first connects to the slave's ZooKeeper ensemble using the provided cluster key . It then scans the rs/ directory to discover all the available sinks (region servers that are accepting incoming streams of edits to replicate) and randomly chooses a subset of them using a configured ratio which has a default value of 10%. For example, if a slave cluster has 150 machines, 15 will be chosen as potential recipient for edits that this master cluster region server sends. Because this selection is performed by each master region server, the probability that all slave region servers are used is very high, and this method works for clusters of any size. For example, a master cluster of 10 machines replicating to a slave cluster of 5 machines with a ratio of 10% causes the master cluster region servers to choose one machine each at random.

A ZooKeeper watcher is placed on the ${zookeeper.znode.parent}/rs node of the slave cluster by each of the master cluster's region servers. This watch is used to monitor changes in the composition of the slave cluster. When nodes are removed from the slave cluster, or if nodes go down or come back up, the master cluster's region servers will respond by selecting a new pool of slave region servers to replicate to.

17.6.2.4. Keeping Track of Logs

Each master cluster region server has its own znode in the replication znodes hierarchy. It contains one znode per peer cluster (if 5 slave clusters, 5 znodes are created), and each of these contain a queue of WALs to process. Each of these queues will track the WALs created by that region server, but they can differ in size. For example, if one slave cluster becomes unavailable for some time, the WALs should not be deleted, so they need to stay in the queue while the others are processed. See Section 17.6.2.7, “Region Server Failover” for an example.

When a source is instantiated, it contains the current WAL that the region server is writing to. During log rolling, the new file is added to the queue of each slave cluster's znode just before it is made available. This ensures that all the sources are aware that a new log exists before the region server is able to append edits into it, but this operations is now more expensive. The queue items are discarded when the replication thread cannot read more entries from a file (because it reached the end of the last block) and there are other files in the queue. This means that if a source is up to date and replicates from the log that the region server writes to, reading up to the "end" of the current file will not delete the item in the queue.

A log can be archived if it is no longer used or if the number of logs exceeds hbase.regionserver.maxlogs because the insertion rate is faster than regions are flushed. When a log is archived, the source threads are notified that the path for that log changed. If a particular source has already finished with an archived log, it will just ignore the message. If the log is in the queue, the path will be updated in memory. If the log is currently being replicated, the change will be done atomically so that the reader doesn't attempt to open the file when has already been moved. Because moving a file is a NameNode operation , if the reader is currently reading the log, it won't generate any exception.

17.6.2.5. Reading, Filtering and Sending Edits

By default, a source attempts to read from a WAL and ship log entries to a sink as quickly as possible. Speed is limited by the filtering of log entries Only KeyValues that are scoped GLOBAL and that do not belong to catalog tables will be retained. Speed is also limited by total size of the list of edits to replicate per slave, which is limited to 64 MB by default. With this configuration, a master cluster region server with three slaves would use at most 192 MB to store data to replicate. This does not account for the data which was filtered but not garbage collected.

Once the maximum size of edits has been buffered or the reader reaces the end of the WAL, the source thread stops reading and chooses at random a sink to replicate to (from the list that was generated by keeping only a subset of slave region servers). It directly issues a RPC to the chosen region server and waits for the method to return. If the RPC was successful, the source determines whether the current file has been emptied or it contains more data which needs to be read. If the file has been emptied, the source deletes the znode in the queue. Otherwise, it registers the new offset in the log's znode. If the RPC threw an exception, the source will retry 10 times before trying to find a different sink.

17.6.2.6. Cleaning Logs

If replication is not enabled, the master's log-cleaning thread deletes old logs using a configured TTL. This TTL-based method does not work well with replication, because archived logs which have exceeded their TTL may still be in a queue. The default behavior is augmented so that if a log is past its TTL, the cleaning thread looks up every queue until it finds the log, while caching queues it has found. If the log is not found in any queues, the log will be deleted. The next time the cleaning process needs to look for a log, it starts by using its cached list.

17.6.2.7. Region Server Failover

When no region servers are failing, keeping track of the logs in ZooKeeper adds no value. Unfortunately, region servers do fail, and since ZooKeeper is highly available, it is useful for managing the transfer of the queues in the event of a failure.

Each of the master cluster region servers keeps a watcher on every other region server, in order to be notified when one dies (just as the master does). When a failure happens, they all race to create a znode called lock inside the dead region server's znode that contains its queues. The region server that creates it successfully then transfers all the queues to its own znode, one at a time since ZooKeeper does not support renaming queues. After queues are all transferred, they are deleted from the old location. The znodes that were recovered are renamed with the ID of the slave cluster appended with the name of the dead server.

Next, the master cluster region server creates one new source thread per copied queue, and each of the source threads follows the read/filter/ship pattern. The main difference is that those queues will never receive new data, since they do not belong to their new region server. When the reader hits the end of the last log, the queue's znode is deleted and the master cluster region server closes that replication source.

Given a master cluster with 3 region servers replicating to a single slave with id 2, the following hierarchy represents what the znodes layout could be at some point in time. The region servers' znodes all contain a peers znode which contains a single queue. The znode names in the queues represent the actual file names on HDFS in the form address,port.timestamp.

/hbase/replication/rs/
  1.1.1.1,60020,123456780/
    2/
      1.1.1.1,60020.1234  (Contains a position)
      1.1.1.1,60020.1265
  1.1.1.2,60020,123456790/
    2/
      1.1.1.2,60020.1214  (Contains a position)
      1.1.1.2,60020.1248
      1.1.1.2,60020.1312
  1.1.1.3,60020,    123456630/
    2/
      1.1.1.3,60020.1280  (Contains a position)            
          

Assume that 1.1.1.2 loses its ZooKeeper session. The survivors will race to create a lock, and, arbitrarily, 1.1.1.3 wins. It will then start transferring all the queues to its local peers znode by appending the name of the dead server. Right before 1.1.1.3 is able to clean up the old znodes, the layout will look like the following:

/hbase/replication/rs/
  1.1.1.1,60020,123456780/
    2/
      1.1.1.1,60020.1234  (Contains a position)
      1.1.1.1,60020.1265
  1.1.1.2,60020,123456790/
    lock
    2/
      1.1.1.2,60020.1214  (Contains a position)
      1.1.1.2,60020.1248
      1.1.1.2,60020.1312
  1.1.1.3,60020,123456630/
    2/
      1.1.1.3,60020.1280  (Contains a position)

    2-1.1.1.2,60020,123456790/
      1.1.1.2,60020.1214  (Contains a position)
      1.1.1.2,60020.1248
      1.1.1.2,60020.1312            
          

Some time later, but before 1.1.1.3 is able to finish replicating the last WAL from 1.1.1.2, it dies too. Some new logs were also created in the normal queues. The last region server will then try to lock 1.1.1.3's znode and will begin transferring all the queues. The new layout will be:

/hbase/replication/rs/
  1.1.1.1,60020,123456780/
    2/
      1.1.1.1,60020.1378  (Contains a position)

    2-1.1.1.3,60020,123456630/
      1.1.1.3,60020.1325  (Contains a position)
      1.1.1.3,60020.1401

    2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
      1.1.1.2,60020.1312  (Contains a position)
  1.1.1.3,60020,123456630/
    lock
    2/
      1.1.1.3,60020.1325  (Contains a position)
      1.1.1.3,60020.1401

    2-1.1.1.2,60020,123456790/
      1.1.1.2,60020.1312  (Contains a position)            
          

17.6.3. Replication Metrics

The following metrics are exposed at the global region server level and (since HBase 0.95) at the peer level:

source.sizeOfLogQueue

number of WALs to process (excludes the one which is being processed) at the Replication source

source.shippedOps

number of mutations shipped

source.logEditsRead

number of mutations read from WALs at the replication source

source.ageOfLastShippedOp

age of last batch that was shipped by the replication source

17.6.4. Replication Configuration Options

OptionDescriptionDefault

zookeeper.znode.parent

The name of the base ZooKeeper znode used for HBase

/hbase

zookeeper.znode.replication

The name of the base znode used for replication

replication

zookeeper.znode.replication.peers

The name of the peer znode

peers

zookeeper.znode.replication.peers.state

The name of peer-state znode

peer-state

zookeeper.znode.replication.rs

The name of the rs znode

rs

hbase.replication

Whether replication is enabled or disabled on a given cluster

false

eplication.sleep.before.failover

How many milliseconds a worker should sleep before attempting to replicate a dead region server's WAL queues.

replication.executor.workers

The number of region servers a given region server should attempt to failover simultaneously.

1

comments powered by Disqus