View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.SortedMap;
29  import java.util.SortedSet;
30  import java.util.TreeMap;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Abortable;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.Server;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.replication.regionserver.Replication;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.zookeeper.ClusterId;
45  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
46  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
48  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
49  import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
50  import org.apache.zookeeper.KeeperException;
51  import org.apache.zookeeper.KeeperException.AuthFailedException;
52  import org.apache.zookeeper.KeeperException.ConnectionLossException;
53  import org.apache.zookeeper.KeeperException.SessionExpiredException;
54  
55  /**
56   * This class serves as a helper for all things related to zookeeper in
57   * replication.
58   * <p/>
59   * The layout looks something like this under zookeeper.znode.parent for the
60   * master cluster:
61   * <p/>
62   *
63   * <pre>
64   * replication/
65   *  state      {contains true or false}
66   *  clusterId  {contains a byte}
67   *  peers/
68   *    1/   {contains a full cluster address}
69   *      peer-state  {contains ENABLED or DISABLED}
70   *    2/
71   *    ...
72   *  rs/ {lists all RS that replicate}
73   *    startcode1/ {lists all peer clusters}
74   *      1/ {lists hlogs to process}
75   *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
76   *        10.10.1.76%3A53488.123456790
77   *        ...
78   *      2/
79   *      ...
80   *    startcode2/
81   *    ...
82   * </pre>
83   */
84  public class ReplicationZookeeper {
85    private static final Log LOG =
86      LogFactory.getLog(ReplicationZookeeper.class);
87    // Name of znode we use to lock when failover
88    private final static String RS_LOCK_ZNODE = "lock";
89  
90    // Values of znode which stores state of a peer
91    public static enum PeerState {
92      ENABLED, DISABLED
93    };
94  
95    // Our handle on zookeeper
96    private final ZooKeeperWatcher zookeeper;
97    // Map of peer clusters keyed by their id
98    private Map<String, ReplicationPeer> peerClusters;
99    // Path to the root replication znode
100   private String replicationZNode;
101   // Path to the peer clusters znode
102   private String peersZNode;
103   // Path to the znode that contains all RS that replicates
104   private String rsZNode;
105   // Path to this region server's name under rsZNode
106   private String rsServerNameZnode;
107   // Name node if the replicationState znode
108   private String replicationStateNodeName;
109   // Name of zk node which stores peer state
110   private String peerStateNodeName;
111   private final Configuration conf;
112   // Is this cluster replicating at the moment?
113   private AtomicBoolean replicating;
114   // The key to our own cluster
115   private String ourClusterKey;
116   // Abortable
117   private Abortable abortable;
118   private ReplicationStatusTracker statusTracker;
119 
120   /**
121    * Constructor used by clients of replication (like master and HBase clients)
122    * @param conf  conf to use
123    * @param zk    zk connection to use
124    * @throws IOException
125    */
126   public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
127                               final ZooKeeperWatcher zk)
128     throws KeeperException {
129 
130     this.conf = conf;
131     this.zookeeper = zk;
132     this.replicating = new AtomicBoolean();
133     setZNodes(abortable);
134   }
135 
136   /**
137    * Constructor used by region servers, connects to the peer cluster right away.
138    *
139    * @param server
140    * @param replicating    atomic boolean to start/stop replication
141    * @throws IOException
142    * @throws KeeperException 
143    */
144   public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
145   throws IOException, KeeperException {
146     this.abortable = server;
147     this.zookeeper = server.getZooKeeper();
148     this.conf = server.getConfiguration();
149     this.replicating = replicating;
150     setZNodes(server);
151 
152     this.peerClusters = new HashMap<String, ReplicationPeer>();
153     ZKUtil.createWithParents(this.zookeeper,
154         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
155     this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
156     ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
157     connectExistingPeers();
158   }
159 
160   private void setZNodes(Abortable abortable) throws KeeperException {
161     String replicationZNodeName =
162         conf.get("zookeeper.znode.replication", "replication");
163     String peersZNodeName =
164         conf.get("zookeeper.znode.replication.peers", "peers");
165     this.peerStateNodeName = conf.get(
166         "zookeeper.znode.replication.peers.state", "peer-state");
167     this.replicationStateNodeName =
168         conf.get("zookeeper.znode.replication.state", "state");
169     String rsZNodeName =
170         conf.get("zookeeper.znode.replication.rs", "rs");
171     this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
172     this.replicationZNode =
173       ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
174     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
175     ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
176     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
177     ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
178 
179     // Set a tracker on replicationStateNodeNode
180     this.statusTracker =
181         new ReplicationStatusTracker(this.zookeeper, abortable);
182     statusTracker.start();
183     readReplicationStateZnode();
184   }
185 
186   private void connectExistingPeers() throws IOException, KeeperException {
187     List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
188     if (znodes != null) {
189       for (String z : znodes) {
190         connectToPeer(z);
191       }
192     }
193   }
194 
195   /**
196    * List this cluster's peers' IDs
197    * @return list of all peers' identifiers
198    */
199   public List<String> listPeersIdsAndWatch() {
200     List<String> ids = null;
201     try {
202       ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
203     } catch (KeeperException e) {
204       this.abortable.abort("Cannot get the list of peers ", e);
205     }
206     return ids;
207   }
208 
209   /**
210    * Map of this cluster's peers for display.
211    * @return A map of peer ids to peer cluster keys
212    */
213   public Map<String,String> listPeers() {
214     Map<String,String> peers = new TreeMap<String,String>();
215     List<String> ids = null;
216     try {
217       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
218       for (String id : ids) {
219         peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper,
220             ZKUtil.joinZNode(this.peersZNode, id))));
221       }
222     } catch (KeeperException e) {
223       this.abortable.abort("Cannot get the list of peers ", e);
224     }
225     return peers;
226   }
227   /**
228    * Returns all region servers from given peer
229    *
230    * @param peerClusterId (byte) the cluster to interrogate
231    * @return addresses of all region servers
232    */
233   public List<ServerName> getSlavesAddresses(String peerClusterId) {
234     if (this.peerClusters.size() == 0) {
235       return Collections.emptyList();
236     }
237     ReplicationPeer peer = this.peerClusters.get(peerClusterId);
238     if (peer == null) {
239       return Collections.emptyList();
240     }
241     // Synchronize peer cluster connection attempts to avoid races and rate
242     // limit connections when multiple replication sources try to connect to
243     // the peer cluster. If the peer cluster is down we can get out of control
244     // over time.
245     synchronized (peer) {
246       List<ServerName> addresses;
247       try {
248         addresses = fetchSlavesAddresses(peer.getZkw());
249       } catch (KeeperException ke) {
250         if (LOG.isDebugEnabled()) {
251           LOG.debug("Fetch salves addresses failed.", ke);
252         }
253         reconnectPeer(ke, peer);
254         addresses = Collections.emptyList();
255       }
256       peer.setRegionServers(addresses);
257     }
258     return peer.getRegionServers();
259   }
260 
261   /**
262    * Get the list of all the region servers from the specified peer
263    * @param zkw zk connection to use
264    * @return list of region server addresses or an empty list if the slave
265    * is unavailable
266    */
267   private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
268     throws KeeperException {
269     return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
270   }
271 
272   /**
273    * Lists the children of the specified znode, retrieving the data of each
274    * child as a server address.
275    *
276    * Used to list the currently online regionservers and their addresses.
277    *
278    * Sets no watches at all, this method is best effort.
279    *
280    * Returns an empty list if the node has no children.  Returns null if the
281    * parent node itself does not exist.
282    *
283    * @param zkw zookeeper reference
284    * @param znode node to get children of as addresses
285    * @return list of data of children of specified znode, empty if no children,
286    *         null if parent does not exist
287    * @throws KeeperException if unexpected zookeeper exception
288    */
289   public static List<ServerName> listChildrenAndGetAsServerNames(
290       ZooKeeperWatcher zkw, String znode)
291   throws KeeperException {
292     List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
293     if(children == null) {
294       return Collections.emptyList();
295     }
296     List<ServerName> addresses = new ArrayList<ServerName>(children.size());
297     for (String child : children) {
298       addresses.add(ServerName.parseServerName(child));
299     }
300     return addresses;
301   }
302 
303   /**
304    * This method connects this cluster to another one and registers it
305    * in this region server's replication znode
306    * @param peerId id of the peer cluster
307    * @throws KeeperException 
308    */
309   public boolean connectToPeer(String peerId)
310       throws IOException, KeeperException {
311     if (peerClusters == null) {
312       return false;
313     }
314     if (this.peerClusters.containsKey(peerId)) {
315       return false;
316     }
317     ReplicationPeer peer = getPeer(peerId);
318     if (peer == null) {
319       return false;
320     }
321     this.peerClusters.put(peerId, peer);
322     ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
323         this.rsServerNameZnode, peerId));
324     LOG.info("Added new peer cluster " + peer.getClusterKey());
325     return true;
326   }
327 
328   /**
329    * Helper method to connect to a peer
330    * @param peerId peer's identifier
331    * @return object representing the peer
332    * @throws IOException
333    * @throws KeeperException
334    */
335   public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
336     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
337     byte [] data = ZKUtil.getData(this.zookeeper, znode);
338     String otherClusterKey = Bytes.toString(data);
339     if (this.ourClusterKey.equals(otherClusterKey)) {
340       LOG.debug("Not connecting to " + peerId + " because it's us");
341       return null;
342     }
343     // Construct the connection to the new peer
344     Configuration otherConf = new Configuration(this.conf);
345     try {
346       ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
347     } catch (IOException e) {
348       LOG.error("Can't get peer because:", e);
349       return null;
350     }
351 
352     ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
353         otherClusterKey);
354     peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
355     return peer;
356   }
357 
358   /**
359    * Set the new replication state for this cluster
360    * @param newState
361    */
362   public void setReplicating(boolean newState) throws KeeperException {
363     ZKUtil.createWithParents(this.zookeeper,
364         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
365     ZKUtil.setData(this.zookeeper,
366         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
367         Bytes.toBytes(Boolean.toString(newState)));
368   }
369 
370   /**
371    * Remove the peer from zookeeper. which will trigger the watchers on every
372    * region server and close their sources
373    * @param id
374    * @throws IllegalArgumentException Thrown when the peer doesn't exist
375    */
376   public void removePeer(String id) throws IOException {
377     try {
378       if (!peerExists(id)) {
379         throw new IllegalArgumentException("Cannot remove inexisting peer");
380       }
381       ZKUtil.deleteNodeRecursively(this.zookeeper,
382           ZKUtil.joinZNode(this.peersZNode, id));
383     } catch (KeeperException e) {
384       throw new IOException("Unable to remove a peer", e);
385     }
386   }
387 
388   /**
389    * Add a new peer to this cluster
390    * @param id peer's identifier
391    * @param clusterKey ZK ensemble's addresses, client port and root znode
392    * @throws IllegalArgumentException Thrown when the peer doesn't exist
393    * @throws IllegalStateException Thrown when a peer already exists, since
394    *         multi-slave isn't supported yet.
395    */
396   public void addPeer(String id, String clusterKey) throws IOException {
397     try {
398       if (peerExists(id)) {
399         throw new IllegalArgumentException("Cannot add existing peer");
400       }
401       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
402       ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
403         Bytes.toBytes(clusterKey));
404       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
405       // peer-state znode. This happens while adding a peer.
406       // The peer state data is set as "ENABLED" by default.
407       ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
408         Bytes.toBytes(PeerState.ENABLED.name()));
409     } catch (KeeperException e) {
410       throw new IOException("Unable to add peer", e);
411     }
412   }
413 
414   private boolean peerExists(String id) throws KeeperException {
415     return ZKUtil.checkExists(this.zookeeper,
416           ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
417   }
418 
419   /**
420    * Enable replication to the peer
421    *
422    * @param id peer's identifier
423    * @throws IllegalArgumentException
424    *           Thrown when the peer doesn't exist
425    */
426   public void enablePeer(String id) throws IOException {
427     changePeerState(id, PeerState.ENABLED);
428     LOG.info("peer " + id + " is enabled");
429   }
430 
431   /**
432    * Disable replication to the peer
433    *
434    * @param id peer's identifier
435    * @throws IllegalArgumentException
436    *           Thrown when the peer doesn't exist
437    */
438   public void disablePeer(String id) throws IOException {
439     changePeerState(id, PeerState.DISABLED);
440     LOG.info("peer " + id + " is disabled");
441   }
442 
443   private void changePeerState(String id, PeerState state) throws IOException {
444     try {
445       if (!peerExists(id)) {
446         throw new IllegalArgumentException("peer " + id + " is not registered");
447       }
448       String peerStateZNode = getPeerStateNode(id);
449       if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
450         ZKUtil.setData(this.zookeeper, peerStateZNode,
451           Bytes.toBytes(state.name()));
452       } else {
453         ZKUtil.createAndWatch(zookeeper, peerStateZNode,
454             Bytes.toBytes(state.name()));
455       }
456       LOG.info("state of the peer " + id + " changed to " + state.name());
457     } catch (KeeperException e) {
458       throw new IOException("Unable to change state of the peer " + id, e);
459     }
460   }
461 
462   /**
463    * Get state of the peer. This method checks the state by connecting to ZK.
464    *
465    * @param id peer's identifier
466    * @return current state of the peer
467    */
468   public PeerState getPeerState(String id) throws KeeperException {
469     byte[] peerStateBytes = ZKUtil
470         .getData(this.zookeeper, getPeerStateNode(id));
471     return PeerState.valueOf(Bytes.toString(peerStateBytes));
472   }
473 
474   /**
475    * Check whether the peer is enabled or not. This method checks the atomic
476    * boolean of ReplicationPeer locally.
477    *
478    * @param id peer identifier
479    * @return true if the peer is enabled, otherwise false
480    * @throws IllegalArgumentException
481    *           Thrown when the peer doesn't exist
482    */
483   public boolean getPeerEnabled(String id) {
484     if (!this.peerClusters.containsKey(id)) {
485       throw new IllegalArgumentException("peer " + id + " is not registered");
486     }
487     return this.peerClusters.get(id).getPeerEnabled().get();
488   }
489 
490   private String getPeerStateNode(String id) {
491     return ZKUtil.joinZNode(this.peersZNode,
492         ZKUtil.joinZNode(id, this.peerStateNodeName));
493   }
494 
495   /**
496    * This reads the state znode for replication and sets the atomic boolean
497    */
498   private void readReplicationStateZnode() {
499     try {
500       this.replicating.set(getReplication());
501       LOG.info("Replication is now " + (this.replicating.get()?
502         "started" : "stopped"));
503     } catch (KeeperException e) {
504       this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
505     }
506   }
507 
508   /**
509    * Get the replication status of this cluster. If the state znode doesn't
510    * exist it will also create it and set it true.
511    * @return returns true when it's enabled, else false
512    * @throws KeeperException
513    */
514   public boolean getReplication() throws KeeperException {
515     byte [] data = this.statusTracker.getData(false);
516     if (data == null || data.length == 0) {
517       setReplicating(true);
518       return true;
519     }
520     return Boolean.parseBoolean(Bytes.toString(data));
521   }
522 
523   private String getRepStateNode() {
524     return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
525   }
526 
527   /**
528    * Add a new log to the list of hlogs in zookeeper
529    * @param filename name of the hlog's znode
530    * @param peerId name of the cluster's znode
531    */
532   public void addLogToList(String filename, String peerId)
533     throws KeeperException {
534     String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId);
535     znode = ZKUtil.joinZNode(znode, filename);
536     ZKUtil.createWithParents(this.zookeeper, znode);
537   }
538 
539   /**
540    * Remove a log from the list of hlogs in zookeeper
541    * @param filename name of the hlog's znode
542    * @param clusterId name of the cluster's znode
543    */
544   public void removeLogFromList(String filename, String clusterId) {
545     try {
546       String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
547       znode = ZKUtil.joinZNode(znode, filename);
548       ZKUtil.deleteNode(this.zookeeper, znode);
549     } catch (KeeperException e) {
550       this.abortable.abort("Failed remove from list", e);
551     }
552   }
553 
554   /**
555    * Set the current position of the specified cluster in the current hlog
556    * @param filename filename name of the hlog's znode
557    * @param clusterId clusterId name of the cluster's znode
558    * @param position the position in the file
559    * @throws IOException
560    */
561   public void writeReplicationStatus(String filename, String clusterId,
562       long position) {
563     try {
564       String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
565       znode = ZKUtil.joinZNode(znode, filename);
566       // Why serialize String of Long and note Long as bytes?
567       ZKUtil.setData(this.zookeeper, znode,
568         Bytes.toBytes(Long.toString(position)));
569     } catch (KeeperException e) {
570       this.abortable.abort("Writing replication status", e);
571     }
572   }
573 
574   /**
575    * Get a list of all the other region servers in this cluster
576    * and set a watch
577    * @return a list of server nanes
578    */
579   public List<String> getRegisteredRegionServers() {
580     List<String> result = null;
581     try {
582       result = ZKUtil.listChildrenAndWatchThem(
583           this.zookeeper, this.zookeeper.rsZNode);
584     } catch (KeeperException e) {
585       this.abortable.abort("Get list of registered region servers", e);
586     }
587     return result;
588   }
589 
590   /**
591    * Get the list of the replicators that have queues, they can be alive, dead
592    * or simply from a previous run
593    * @return a list of server names
594    */
595   public List<String> getListOfReplicators() {
596     List<String> result = null;
597     try {
598       result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
599     } catch (KeeperException e) {
600       this.abortable.abort("Get list of replicators", e);
601     }
602     return result;
603   }
604 
605   /**
606    * Get the list of peer clusters for the specified server names
607    * @param rs server names of the rs
608    * @return a list of peer cluster
609    */
610   public List<String> getListPeersForRS(String rs) {
611     String znode = ZKUtil.joinZNode(rsZNode, rs);
612     List<String> result = null;
613     try {
614       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
615     } catch (KeeperException e) {
616       this.abortable.abort("Get list of peers for rs", e);
617     }
618     return result;
619   }
620 
621   /**
622    * Get the list of hlogs for the specified region server and peer cluster
623    * @param rs server names of the rs
624    * @param id peer cluster
625    * @return a list of hlogs
626    */
627   public List<String> getListHLogsForPeerForRS(String rs, String id) {
628     String znode = ZKUtil.joinZNode(rsZNode, rs);
629     znode = ZKUtil.joinZNode(znode, id);
630     List<String> result = null;
631     try {
632       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
633     } catch (KeeperException e) {
634       this.abortable.abort("Get list of hlogs for peer", e);
635     }
636     return result;
637   }
638 
639   /**
640    * Checks if the provided znode is the same as this region server's
641    * @param znode to check
642    * @return if this is this rs's znode
643    */
644   public boolean isThisOurZnode(String znode) {
645     String otherRs = ZKUtil.joinZNode(this.rsZNode, znode);
646     return otherRs.equals(rsServerNameZnode);
647   }
648 
649   /**
650    * Try to set a lock in another server's znode.
651    * @param znode the server names of the other server
652    * @return true if the lock was acquired, false in every other cases
653    */
654   public boolean lockOtherRS(String znode) {
655     try {
656       String parent = ZKUtil.joinZNode(this.rsZNode, znode);
657       String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
658       ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
659     } catch (KeeperException e) {
660       // This exception will pop up if the znode under which we're trying to
661       // create the lock is already deleted by another region server, meaning
662       // that the transfer already occurred.
663       // NoNode => transfer is done and znodes are already deleted
664       // NodeExists => lock znode already created by another RS
665       if (e instanceof KeeperException.NoNodeException ||
666           e instanceof KeeperException.NodeExistsException) {
667         LOG.info("Won't transfer the queue," +
668             " another RS took care of it because of: " + e.getMessage());
669       } else {
670         LOG.info("Failed lock other rs", e);
671       }
672       return false;
673     }
674     return true;
675   }
676 
677   /**
678    * It "atomically" copies all the hlogs queues from another region server and returns them all
679    * sorted per peer cluster (appended with the dead server's znode).
680    * @param znode
681    * @return HLog queues sorted per peer cluster
682    */
683   public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
684     SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
685     String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
686     List<String> peerIdsToProcess = null;
687     List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
688     try {
689       peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
690       if (peerIdsToProcess == null) return queues; // node already processed
691       for (String peerId : peerIdsToProcess) {
692         String newPeerId = peerId + "-" + znode;
693         String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
694         // check the logs queue for the old peer cluster
695         String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
696         List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
697         if (hlogs == null || hlogs.size() == 0) {
698           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
699           continue; // empty log queue.
700         }
701         // create the new cluster znode
702         SortedSet<String> logQueue = new TreeSet<String>();
703         queues.put(newPeerId, logQueue);
704         ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
705         listOfOps.add(op);
706         // get the offset of the logs and set it to new znodes
707         for (String hlog : hlogs) {
708           String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
709           byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
710           LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
711           String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
712           listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
713           // add ops for deleting
714           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
715           logQueue.add(hlog);
716         }
717         // add delete op for peer
718         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
719       }
720       // add delete op for dead rs
721       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
722       LOG.debug(" The multi list size is: " + listOfOps.size());
723       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
724       LOG.info("Atomically moved the dead regionserver logs. ");
725     } catch (KeeperException e) {
726       // Multi call failed; it looks like some other regionserver took away the logs.
727       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
728       queues.clear();
729     }
730     return queues;
731   }
732 
733   /**
734    * This methods copies all the hlogs queues from another region server
735    * and returns them all sorted per peer cluster (appended with the dead
736    * server's znode)
737    * @param znode server names to copy
738    * @return all hlogs for all peers of that cluster, null if an error occurred
739    */
740   public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
741     // TODO this method isn't atomic enough, we could start copying and then
742     // TODO fail for some reason and we would end up with znodes we don't want.
743     SortedMap<String,SortedSet<String>> queues =
744         new TreeMap<String,SortedSet<String>>();
745     try {
746       String nodePath = ZKUtil.joinZNode(rsZNode, znode);
747       List<String> clusters =
748         ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
749       // We have a lock znode in there, it will count as one.
750       if (clusters == null || clusters.size() <= 1) {
751         return queues;
752       }
753       // The lock isn't a peer cluster, remove it
754       clusters.remove(RS_LOCK_ZNODE);
755       for (String cluster : clusters) {
756         // We add the name of the recovered RS to the new znode, we can even
757         // do that for queues that were recovered 10 times giving a znode like
758         // number-startcode-number-otherstartcode-number-anotherstartcode-etc
759         String newCluster = cluster+"-"+znode;
760         String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
761         String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
762         List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
763         // That region server didn't have anything to replicate for this cluster
764         if (hlogs == null || hlogs.size() == 0) {
765           continue;
766         }
767         ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
768             HConstants.EMPTY_BYTE_ARRAY);
769         SortedSet<String> logQueue = new TreeSet<String>();
770         queues.put(newCluster, logQueue);
771         for (String hlog : hlogs) {
772           String z = ZKUtil.joinZNode(clusterPath, hlog);
773           byte [] position = ZKUtil.getData(this.zookeeper, z);
774           LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
775           String child = ZKUtil.joinZNode(newClusterZnode, hlog);
776           ZKUtil.createAndWatch(this.zookeeper, child, position);
777           logQueue.add(hlog);
778         }
779       }
780     } catch (KeeperException e) {
781       this.abortable.abort("Copy queues from rs", e);
782     }
783     return queues;
784   }
785 
786   /**
787    * Delete a complete queue of hlogs
788    * @param peerZnode znode of the peer cluster queue of hlogs to delete
789    */
790   public void deleteSource(String peerZnode, boolean closeConnection) {
791     try {
792       ZKUtil.deleteNodeRecursively(this.zookeeper,
793           ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
794       if (closeConnection) {
795         this.peerClusters.get(peerZnode).getZkw().close();
796         this.peerClusters.remove(peerZnode);
797       }
798     } catch (KeeperException e) {
799       this.abortable.abort("Failed delete of " + peerZnode, e);
800     }
801   }
802 
803   /**
804    * Recursive deletion of all znodes in specified rs' znode
805    * @param znode
806    */
807   public void deleteRsQueues(String znode) {
808     String fullpath = ZKUtil.joinZNode(rsZNode, znode);
809     try {
810       List<String> clusters =
811         ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
812       for (String cluster : clusters) {
813         // We'll delete it later
814         if (cluster.equals(RS_LOCK_ZNODE)) {
815           continue;
816         }
817         String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
818         ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
819       }
820       // Finish cleaning up
821       ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
822     } catch (KeeperException e) {
823       if (e instanceof KeeperException.NoNodeException ||
824           e instanceof KeeperException.NotEmptyException) {
825         // Testing a special case where another region server was able to
826         // create a lock just after we deleted it, but then was also able to
827         // delete the RS znode before us or its lock znode is still there.
828         if (e.getPath().equals(fullpath)) {
829           return;
830         }
831       }
832       this.abortable.abort("Failed delete of " + znode, e);
833     }
834   }
835 
836   /**
837    * Delete this cluster's queues
838    */
839   public void deleteOwnRSZNode() {
840     try {
841       ZKUtil.deleteNodeRecursively(this.zookeeper,
842           this.rsServerNameZnode);
843     } catch (KeeperException e) {
844       // if the znode is already expired, don't bother going further
845       if (e instanceof KeeperException.SessionExpiredException) {
846         return;
847       }
848       this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
849     }
850   }
851 
852   /**
853    * Get the position of the specified hlog in the specified peer znode
854    * @param peerId znode of the peer cluster
855    * @param hlog name of the hlog
856    * @return the position in that hlog
857    * @throws KeeperException 
858    */
859   public long getHLogRepPosition(String peerId, String hlog)
860   throws KeeperException {
861     String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
862     String znode = ZKUtil.joinZNode(clusterZnode, hlog);
863     String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
864     return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
865   }
866 
867   /**
868    * Returns the UUID of the provided peer id. Should a connection loss or session
869    * expiration happen, the ZK handler will be reopened once and if it still doesn't
870    * work then it will bail and return null.
871    * @param peerId the peer's ID that will be converted into a UUID
872    * @return a UUID or null if there's a ZK connection issue
873    */
874   public UUID getPeerUUID(String peerId) {
875     ReplicationPeer peer = getPeerClusters().get(peerId);
876     UUID peerUUID = null;
877     // Synchronize peer cluster connection attempts to avoid races and rate
878     // limit connections when multiple replication sources try to connect to
879     // the peer cluster. If the peer cluster is down we can get out of control
880     // over time.
881     synchronized (peer) {
882       try {
883         peerUUID = getUUIDForCluster(peer.getZkw());
884       } catch (KeeperException ke) {
885         reconnectPeer(ke, peer);
886       }
887     }
888     return peerUUID;
889   }
890 
891   /**
892    * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
893    * @param zkw watcher connected to an ensemble
894    * @return the UUID read from zookeeper
895    * @throws KeeperException
896    */
897   public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
898     return UUID.fromString(ClusterId.readClusterIdZNode(zkw));
899   }
900 
901   private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
902     if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
903         || ke instanceof AuthFailedException) {
904       LOG.warn(
905         "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
906         ke);
907       try {
908         peer.reloadZkWatcher();
909       } catch(IOException io) {
910         LOG.warn(
911           "Creation of ZookeeperWatcher failed for peer "
912             + peer.getClusterKey(), io);
913       }
914     }
915   }
916 
917   public void registerRegionServerListener(ZooKeeperListener listener) {
918     this.zookeeper.registerListener(listener);
919   }
920 
921   /**
922    * Get a map of all peer clusters
923    * @return map of peer cluster keyed by id
924    */
925   public Map<String, ReplicationPeer> getPeerClusters() {
926     return this.peerClusters;
927   }
928 
929   /**
930    * Extracts the znode name of a peer cluster from a ZK path
931    * @param fullPath Path to extract the id from
932    * @return the id or an empty string if path is invalid
933    */
934   public static String getZNodeName(String fullPath) {
935     String[] parts = fullPath.split("/");
936     return parts.length > 0 ? parts[parts.length-1] : "";
937   }
938 
939   /**
940    * Get this cluster's zk connection
941    * @return zk connection
942    */
943   public ZooKeeperWatcher getZookeeperWatcher() {
944     return this.zookeeper;
945   }
946 
947 
948   /**
949    * Get the full path to the peers' znode
950    * @return path to peers in zk
951    */
952   public String getPeersZNode() {
953     return peersZNode;
954   }
955 
956   /**
957    * Tracker for status of the replication
958    */
959   public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
960     public ReplicationStatusTracker(ZooKeeperWatcher watcher,
961         Abortable abortable) {
962       super(watcher, getRepStateNode(), abortable);
963     }
964 
965     @Override
966     public synchronized void nodeDataChanged(String path) {
967       if (path.equals(node)) {
968         super.nodeDataChanged(path);
969         readReplicationStateZnode();
970       }
971     }
972   }
973 }