1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import com.google.protobuf.InvalidProtocolBufferException;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Abortable;
27  import org.apache.hadoop.hbase.Server;
28  import org.apache.hadoop.hbase.ServerName;
29  import org.apache.hadoop.hbase.exceptions.DeserializationException;
30  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
31  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
34  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
35  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
36  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
37  import org.apache.zookeeper.KeeperException;
38  import org.apache.zookeeper.KeeperException.ConnectionLossException;
39  import org.apache.zookeeper.KeeperException.NodeExistsException;
40  import org.apache.zookeeper.KeeperException.SessionExpiredException;
41  
42  import java.io.Closeable;
43  import java.io.IOException;
44  import java.util.ArrayList;
45  import java.util.Collections;
46  import java.util.HashMap;
47  import java.util.List;
48  import java.util.Map;
49  import java.util.SortedMap;
50  import java.util.SortedSet;
51  import java.util.TreeMap;
52  import java.util.UUID;
53  import java.util.concurrent.atomic.AtomicBoolean;
54  
55  /**
56   * This class serves as a helper for all things related to zookeeper in
57   * replication.
58   * <p/>
59   * The layout looks something like this under zookeeper.znode.parent for the
60   * master cluster:
61   * <p/>
62   *
63   * <pre>
64   * replication/
65   *  state      {contains true or false}
66   *  clusterId  {contains a byte}
67   *  peers/
68   *    1/   {contains a full cluster address}
69   *      peer-state  {contains ENABLED or DISABLED}
70   *    2/
71   *    ...
72   *  rs/ {lists all RS that replicate}
73   *    startcode1/ {lists all peer clusters}
74   *      1/ {lists hlogs to process}
75   *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
76   *        10.10.1.76%3A53488.123456790
77   *        ...
78   *      2/
79   *      ...
80   *    startcode2/
81   *    ...
82   * </pre>
83   */
84  @InterfaceAudience.Private
85  public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
86    private static final Log LOG =
87      LogFactory.getLog(ReplicationZookeeper.class);
88  
89    // Our handle on zookeeper
90    private final ZooKeeperWatcher zookeeper;
91    // Map of peer clusters keyed by their id
92    private Map<String, ReplicationPeer> peerClusters;
93    // Path to the root replication znode
94    private String replicationZNode;
95    // Path to the peer clusters znode
96    private String peersZNode;
97    // Path to the znode that contains all RS that replicates
98    private String rsZNode;
99    // Path to this region server's name under rsZNode
100   private String rsServerNameZnode;
101   // Name node if the replicationState znode
102   private String replicationStateNodeName;
103   // Name of zk node which stores peer state. The peer-state znode is under a
104   // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
105   private String peerStateNodeName;
106   private final Configuration conf;
107   // The key to our own cluster
108   private String ourClusterKey;
109   // Abortable
110   private Abortable abortable;
111   private final ReplicationStateInterface replicationState;
112   private final ReplicationQueues replicationQueues;
113 
114   /**
115    * ZNode content if enabled state.
116    */
117   // Public so it can be seen by test code.
118   public static final byte[] ENABLED_ZNODE_BYTES =
119       toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
120 
121   /**
122    * ZNode content if disabled state.
123    */
124   static final byte[] DISABLED_ZNODE_BYTES =
125       toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
126 
127   /**
128    * Constructor used by clients of replication (like master and HBase clients)
129    * @param conf  conf to use
130    * @param zk    zk connection to use
131    * @throws IOException
132    */
133   public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
134       final ZooKeeperWatcher zk) throws KeeperException {
135     super(zk, conf, abortable);
136     this.conf = conf;
137     this.zookeeper = zk;
138     setZNodes(abortable);
139     this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
140     // TODO This interface is no longer used by anyone using this constructor. When this class goes
141     // away, we will no longer have this null initialization business
142     this.replicationQueues = null;
143   }
144 
145   /**
146    * Constructor used by region servers, connects to the peer cluster right away.
147    *
148    * @param server
149    * @param replicating    atomic boolean to start/stop replication
150    * @throws IOException
151    * @throws KeeperException
152    */
153   public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
154   throws IOException, KeeperException {
155     super(server.getZooKeeper(), server.getConfiguration(), server);
156     this.abortable = server;
157     this.zookeeper = server.getZooKeeper();
158     this.conf = server.getConfiguration();
159     setZNodes(server);
160 
161     this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
162     this.peerClusters = new HashMap<String, ReplicationPeer>();
163     ZKUtil.createWithParents(this.zookeeper,
164         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
165     this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
166     ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
167     this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
168     this.replicationQueues.init(server.getServerName().toString());
169     connectExistingPeers();
170   }
171 
172   private void setZNodes(Abortable abortable) throws KeeperException {
173     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
174     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
175     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
176     this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
177     String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
178     this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
179     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
180     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
181     ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
182     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
183     ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
184   }
185 
186   private void connectExistingPeers() throws IOException, KeeperException {
187     List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
188     if (znodes != null) {
189       for (String z : znodes) {
190         connectToPeer(z);
191       }
192     }
193   }
194 
195   /**
196    * List this cluster's peers' IDs
197    * @return list of all peers' identifiers
198    */
199   public List<String> listPeersIdsAndWatch() {
200     List<String> ids = null;
201     try {
202       ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
203     } catch (KeeperException e) {
204       this.abortable.abort("Cannot get the list of peers ", e);
205     }
206     return ids;
207   }
208 
209   /**
210    * Map of this cluster's peers for display.
211    * @return A map of peer ids to peer cluster keys
212    */
213   public Map<String,String> listPeers() {
214     Map<String,String> peers = new TreeMap<String,String>();
215     List<String> ids = null;
216     try {
217       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
218       for (String id : ids) {
219         byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
220         String clusterKey = null;
221         try {
222           clusterKey = parsePeerFrom(bytes);
223         } catch (DeserializationException de) {
224           LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
225           continue;
226         }
227         peers.put(id, clusterKey);
228       }
229     } catch (KeeperException e) {
230       this.abortable.abort("Cannot get the list of peers ", e);
231     }
232     return peers;
233   }
234 
235   /**
236    * Returns all region servers from given peer
237    *
238    * @param peerClusterId (byte) the cluster to interrogate
239    * @return addresses of all region servers
240    */
241   public List<ServerName> getSlavesAddresses(String peerClusterId) {
242     if (this.peerClusters.size() == 0) {
243       return Collections.emptyList();
244     }
245     ReplicationPeer peer = this.peerClusters.get(peerClusterId);
246     if (peer == null) {
247       return Collections.emptyList();
248     }
249     
250     List<ServerName> addresses;
251     try {
252       addresses = fetchSlavesAddresses(peer.getZkw());
253     } catch (KeeperException ke) {
254       reconnectPeer(ke, peer);
255       addresses = Collections.emptyList();
256     }
257     peer.setRegionServers(addresses);
258     return peer.getRegionServers();
259   }
260 
261   /**
262    * Get the list of all the region servers from the specified peer
263    * @param zkw zk connection to use
264    * @return list of region server addresses or an empty list if the slave
265    * is unavailable
266    */
267   private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
268     throws KeeperException {
269     return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
270   }
271 
272   /**
273    * Lists the children of the specified znode, retrieving the data of each
274    * child as a server address.
275    *
276    * Used to list the currently online regionservers and their addresses.
277    *
278    * Sets no watches at all, this method is best effort.
279    *
280    * Returns an empty list if the node has no children.  Returns null if the
281    * parent node itself does not exist.
282    *
283    * @param zkw zookeeper reference
284    * @param znode node to get children of as addresses
285    * @return list of data of children of specified znode, empty if no children,
286    *         null if parent does not exist
287    * @throws KeeperException if unexpected zookeeper exception
288    */
289   public static List<ServerName> listChildrenAndGetAsServerNames(
290       ZooKeeperWatcher zkw, String znode)
291   throws KeeperException {
292     List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
293     if(children == null) {
294       return Collections.emptyList();
295     }
296     List<ServerName> addresses = new ArrayList<ServerName>(children.size());
297     for (String child : children) {
298       addresses.add(ServerName.parseServerName(child));
299     }
300     return addresses;
301   }
302 
303   /**
304    * This method connects this cluster to another one and registers it
305    * in this region server's replication znode
306    * @param peerId id of the peer cluster
307    * @throws KeeperException 
308    */
309   public boolean connectToPeer(String peerId)
310       throws IOException, KeeperException {
311     if (peerClusters == null) {
312       return false;
313     }
314     if (this.peerClusters.containsKey(peerId)) {
315       return false;
316     }
317     ReplicationPeer peer = getPeer(peerId);
318     if (peer == null) {
319       return false;
320     }
321     this.peerClusters.put(peerId, peer);
322     ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
323         this.rsServerNameZnode, peerId));
324     LOG.info("Added new peer cluster " + peer.getClusterKey());
325     return true;
326   }
327 
328   /**
329    * Helper method to connect to a peer
330    * @param peerId peer's identifier
331    * @return object representing the peer
332    * @throws IOException
333    * @throws KeeperException
334    */
335   public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
336     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
337     byte [] data = ZKUtil.getData(this.zookeeper, znode);
338     String otherClusterKey = "";
339     try {
340       otherClusterKey = parsePeerFrom(data);
341     } catch (DeserializationException e) {
342       LOG.warn("Failed parse of cluster key from peerId=" + peerId
343           + ", specifically the content from the following znode: " + znode);
344     }
345     if (this.ourClusterKey.equals(otherClusterKey)) {
346       LOG.debug("Not connecting to " + peerId + " because it's us");
347       return null;
348     }
349     // Construct the connection to the new peer
350     Configuration otherConf = new Configuration(this.conf);
351     try {
352       ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
353     } catch (IOException e) {
354       LOG.error("Can't get peer because:", e);
355       return null;
356     }
357 
358     ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
359         otherClusterKey);
360     peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
361     return peer;
362   }
363 
364   /**
365    * Remove the peer from zookeeper. which will trigger the watchers on every
366    * region server and close their sources
367    * @param id
368    * @throws IllegalArgumentException Thrown when the peer doesn't exist
369    */
370   public void removePeer(String id) throws IOException {
371     try {
372       if (!peerExists(id)) {
373         throw new IllegalArgumentException("Cannot remove inexisting peer");
374       }
375       ZKUtil.deleteNodeRecursively(this.zookeeper,
376           ZKUtil.joinZNode(this.peersZNode, id));
377     } catch (KeeperException e) {
378       throw new IOException("Unable to remove a peer", e);
379     }
380   }
381 
382   /**
383    * Add a new peer to this cluster
384    * @param id peer's identifier
385    * @param clusterKey ZK ensemble's addresses, client port and root znode
386    * @throws IllegalArgumentException Thrown when the peer doesn't exist
387    * @throws IllegalStateException Thrown when a peer already exists, since
388    *         multi-slave isn't supported yet.
389    */
390   public void addPeer(String id, String clusterKey) throws IOException {
391     try {
392       if (peerExists(id)) {
393         throw new IllegalArgumentException("Cannot add existing peer");
394       }
395       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
396       ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
397         toByteArray(clusterKey));
398       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
399       // peer-state znode. This happens while adding a peer.
400       // The peer state data is set as "ENABLED" by default.
401       ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
402         ENABLED_ZNODE_BYTES);
403       // A peer is enabled by default
404     } catch (KeeperException e) {
405       throw new IOException("Unable to add peer", e);
406     }
407   }
408 
409   /**
410    * @param clusterKey
411    * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
412    *         prepended suitable for use as content of a this.peersZNode; i.e.
413    *         the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
414    */
415   static byte[] toByteArray(final String clusterKey) {
416     byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
417         .toByteArray();
418     return ProtobufUtil.prependPBMagic(bytes);
419   }
420 
421   /**
422    * @param state
423    * @return Serialized protobuf of <code>state</code> with pb magic prefix
424    *         prepended suitable for use as content of either the cluster state
425    *         znode -- whether or not we should be replicating kept in
426    *         /hbase/replication/state -- or as content of a peer-state znode
427    *         under a peer cluster id as in
428    *         /hbase/replication/peers/PEER_ID/peer-state.
429    */
430   static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
431     byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
432         .toByteArray();
433     return ProtobufUtil.prependPBMagic(bytes);
434   }
435 
436   /**
437    * @param position
438    * @return Serialized protobuf of <code>position</code> with pb magic prefix
439    *         prepended suitable for use as content of an hlog position in a
440    *         replication queue.
441    */
442   public static byte[] positionToByteArray(
443       final long position) {
444     return ZKUtil.positionToByteArray(position);
445   }
446 
447   /**
448    * @param lockOwner
449    * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
450    *         prepended suitable for use as content of an replication lock during
451    *         region server fail over.
452    */
453   static byte[] lockToByteArray(
454       final String lockOwner) {
455     byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
456         .toByteArray();
457     return ProtobufUtil.prependPBMagic(bytes);
458   }
459 
460   /**
461    * @param bytes Content of a peer znode.
462    * @return ClusterKey parsed from the passed bytes.
463    * @throws DeserializationException
464    */
465   static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
466     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
467       int pblen = ProtobufUtil.lengthOfPBMagic();
468       ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
469           .newBuilder();
470       ZooKeeperProtos.ReplicationPeer peer;
471       try {
472         peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
473       } catch (InvalidProtocolBufferException e) {
474         throw new DeserializationException(e);
475       }
476       return peer.getClusterkey();
477     } else {
478       if (bytes.length > 0) {
479         return Bytes.toString(bytes);
480       }
481       return "";
482     }
483   }
484 
485   /**
486    * @param bytes Content of a state znode.
487    * @return State parsed from the passed bytes.
488    * @throws DeserializationException
489    */
490   static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
491       throws DeserializationException {
492     ProtobufUtil.expectPBMagicPrefix(bytes);
493     int pblen = ProtobufUtil.lengthOfPBMagic();
494     ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
495         .newBuilder();
496     ZooKeeperProtos.ReplicationState state;
497     try {
498       state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
499       return state.getState();
500     } catch (InvalidProtocolBufferException e) {
501       throw new DeserializationException(e);
502     }
503   }
504 
505   /**
506    * @param bytes - Content of a HLog position znode.
507    * @return long - The current HLog position.
508    * @throws DeserializationException
509    */
510   public static long parseHLogPositionFrom(
511       final byte[] bytes) throws DeserializationException {
512     return ZKUtil.parseHLogPositionFrom(bytes);
513   }
514 
515   /**
516    * @param bytes - Content of a lock znode.
517    * @return String - The owner of the lock.
518    * @throws DeserializationException
519    */
520   static String parseLockOwnerFrom(
521       final byte[] bytes) throws DeserializationException {
522     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
523       int pblen = ProtobufUtil.lengthOfPBMagic();
524       ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
525           .newBuilder();
526       ZooKeeperProtos.ReplicationLock lock;
527       try {
528         lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
529       } catch (InvalidProtocolBufferException e) {
530         throw new DeserializationException(e);
531       }
532       return lock.getLockOwner();
533     } else {
534       if (bytes.length > 0) {
535         return Bytes.toString(bytes);
536       }
537       return "";
538     }
539   }
540 
541   /**
542    * Enable replication to the peer
543    *
544    * @param id peer's identifier
545    * @throws IllegalArgumentException
546    *           Thrown when the peer doesn't exist
547    */
548   public void enablePeer(String id) throws IOException {
549     changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
550     LOG.info("peer " + id + " is enabled");
551   }
552 
553   /**
554    * Disable replication to the peer
555    *
556    * @param id peer's identifier
557    * @throws IllegalArgumentException
558    *           Thrown when the peer doesn't exist
559    */
560   public void disablePeer(String id) throws IOException {
561     changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
562     LOG.info("peer " + id + " is disabled");
563   }
564 
565   private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
566       throws IOException {
567     try {
568       if (!peerExists(id)) {
569         throw new IllegalArgumentException("peer " + id + " is not registered");
570       }
571       String peerStateZNode = getPeerStateNode(id);
572       byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
573           : DISABLED_ZNODE_BYTES;
574       if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
575         ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
576       } else {
577         ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
578       }
579       LOG.info("state of the peer " + id + " changed to " + state.name());
580     } catch (KeeperException e) {
581       throw new IOException("Unable to change state of the peer " + id, e);
582     }
583   }
584 
585   /**
586    * Check whether the peer is enabled or not. This method checks the atomic
587    * boolean of ReplicationPeer locally.
588    *
589    * @param id peer identifier
590    * @return true if the peer is enabled, otherwise false
591    * @throws IllegalArgumentException
592    *           Thrown when the peer doesn't exist
593    */
594   public boolean getPeerEnabled(String id) {
595     if (!this.peerClusters.containsKey(id)) {
596       throw new IllegalArgumentException("peer " + id + " is not registered");
597     }
598     return this.peerClusters.get(id).getPeerEnabled().get();
599   }
600 
601   private String getPeerStateNode(String id) {
602     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
603   }
604 
605   /**
606    * Get the replication status of this cluster. If the state znode doesn't exist it will also
607    * create it and set it true.
608    * @return returns true when it's enabled, else false
609    * @throws KeeperException
610    */
611   public boolean getReplication() throws KeeperException {
612     return this.replicationState.getState();
613   }
614 
615   /**
616    * Set the new replication state for this cluster
617    * @param newState
618    * @throws KeeperException
619    */
620   public void setReplication(boolean newState) throws KeeperException {
621     this.replicationState.setState(newState);
622   }
623 
624   /**
625    * Add a new log to the list of hlogs in zookeeper
626    * @param filename name of the hlog's znode
627    * @param peerId name of the cluster's znode
628    */
629   public void addLogToList(String filename, String peerId) throws KeeperException {
630     this.replicationQueues.addLog(peerId, filename);
631   }
632 
633   /**
634    * Remove a log from the list of hlogs in zookeeper
635    * @param filename name of the hlog's znode
636    * @param clusterId name of the cluster's znode
637    */
638   public void removeLogFromList(String filename, String clusterId) {
639     this.replicationQueues.removeLog(clusterId, filename);
640   }
641 
642   /**
643    * Set the current position of the specified cluster in the current hlog
644    * @param filename filename name of the hlog's znode
645    * @param clusterId clusterId name of the cluster's znode
646    * @param position the position in the file
647    */
648   public void writeReplicationStatus(String filename, String clusterId, long position) {
649     this.replicationQueues.setLogPosition(clusterId, filename, position);
650   }
651 
652   /**
653    * Get a list of all the other region servers in this cluster
654    * and set a watch
655    * @return a list of server nanes
656    */
657   public List<String> getRegisteredRegionServers() {
658     List<String> result = null;
659     try {
660       result = ZKUtil.listChildrenAndWatchThem(
661           this.zookeeper, this.zookeeper.rsZNode);
662     } catch (KeeperException e) {
663       this.abortable.abort("Get list of registered region servers", e);
664     }
665     return result;
666   }
667 
668 
669   /**
670    * Take ownership for the set of queues belonging to a dead region server.
671    * @param regionserver the id of the dead region server
672    * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in
673    *         each queue.
674    */
675   public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) {
676     return this.replicationQueues.claimQueues(regionserver);
677   }
678 
679   /**
680    * Delete a complete queue of hlogs
681    * @param peerZnode znode of the peer cluster queue of hlogs to delete
682    */
683   public void deleteSource(String peerZnode, boolean closeConnection) {
684     this.replicationQueues.removeQueue(peerZnode);
685     if (closeConnection) {
686       this.peerClusters.get(peerZnode).getZkw().close();
687       this.peerClusters.remove(peerZnode);
688     }
689   }
690 
691   /**
692    * Delete this cluster's queues
693    */
694   public void deleteOwnRSZNode() {
695     this.replicationQueues.removeAllQueues();
696   }
697 
698   /**
699    * Get the position of the specified hlog in the specified peer znode
700    * @param peerId znode of the peer cluster
701    * @param hlog name of the hlog
702    * @return the position in that hlog
703    * @throws KeeperException
704    */
705   public long getHLogRepPosition(String peerId, String hlog) throws KeeperException {
706     return this.replicationQueues.getLogPosition(peerId, hlog);
707   }
708 
709   /**
710    * Returns the UUID of the provided peer id. Should a connection loss or session
711    * expiration happen, the ZK handler will be reopened once and if it still doesn't
712    * work then it will bail and return null.
713    * @param peerId the peer's ID that will be converted into a UUID
714    * @return a UUID or null if there's a ZK connection issue
715    */
716   public UUID getPeerUUID(String peerId) {
717     ReplicationPeer peer = getPeerClusters().get(peerId);
718     UUID peerUUID = null;
719     try {
720       peerUUID = getUUIDForCluster(peer.getZkw());
721     } catch (KeeperException ke) {
722       reconnectPeer(ke, peer);
723     }
724     return peerUUID;
725   }
726 
727   /**
728    * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
729    * @param zkw watcher connected to an ensemble
730    * @return the UUID read from zookeeper
731    * @throws KeeperException
732    */
733   public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
734     return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
735   }
736 
737   private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
738     if (ke instanceof ConnectionLossException
739       || ke instanceof SessionExpiredException) {
740       LOG.warn(
741         "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
742         ke);
743       try {
744         peer.reloadZkWatcher();
745       } catch(IOException io) {
746         LOG.warn(
747           "Creation of ZookeeperWatcher failed for peer "
748             + peer.getClusterKey(), io);
749       }
750     }
751   }
752 
753   public void registerRegionServerListener(ZooKeeperListener listener) {
754     this.zookeeper.registerListener(listener);
755   }
756 
757   /**
758    * Get a map of all peer clusters
759    * @return map of peer cluster keyed by id
760    */
761   public Map<String, ReplicationPeer> getPeerClusters() {
762     return this.peerClusters;
763   }
764 
765   /**
766    * Determine if a ZK path points to a peer node.
767    * @param path path to be checked
768    * @return true if the path points to a peer node, otherwise false
769    */
770   public boolean isPeerPath(String path) {
771     return path.split("/").length == peersZNode.split("/").length + 1;
772   }
773 
774   /**
775    * Extracts the znode name of a peer cluster from a ZK path
776    * @param fullPath Path to extract the id from
777    * @return the id or an empty string if path is invalid
778    */
779   public static String getZNodeName(String fullPath) {
780     String[] parts = fullPath.split("/");
781     return parts.length > 0 ? parts[parts.length-1] : "";
782   }
783 
784   /**
785    * Get this cluster's zk connection
786    * @return zk connection
787    */
788   public ZooKeeperWatcher getZookeeperWatcher() {
789     return this.zookeeper;
790   }
791 
792 
793   /**
794    * Get the full path to the peers' znode
795    * @return path to peers in zk
796    */
797   public String getPeersZNode() {
798     return peersZNode;
799   }
800 
801   @Override
802   public void close() throws IOException {
803     if (replicationState != null) replicationState.close();
804   }
805 
806   /**
807    * Utility method to ensure an ENABLED znode is in place; if not present, we
808    * create it.
809    * @param zookeeper
810    * @param path Path to znode to check
811    * @return True if we created the znode.
812    * @throws NodeExistsException
813    * @throws KeeperException
814    */
815   static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
816       throws NodeExistsException, KeeperException {
817     if (ZKUtil.checkExists(zookeeper, path) == -1) {
818       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
819       // peer-state znode. This happens while adding a peer.
820       // The peer state data is set as "ENABLED" by default.
821       ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
822       return true;
823     }
824     return false;
825   }
826 
827   /**
828    * @param bytes
829    * @return True if the passed in <code>bytes</code> are those of a pb
830    *         serialized ENABLED state.
831    * @throws DeserializationException
832    */
833   static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
834     ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
835     return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
836   }
837 }