View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.SortedMap;
36  import java.util.SortedSet;
37  import java.util.TreeSet;
38  import java.util.UUID;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.CopyOnWriteArrayList;
41  import java.util.concurrent.LinkedBlockingQueue;
42  import java.util.concurrent.RejectedExecutionException;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.TableDescriptors;
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.hbase.regionserver.HRegionServer;
55  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
56  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
57  import org.apache.hadoop.hbase.replication.ReplicationException;
58  import org.apache.hadoop.hbase.replication.ReplicationListener;
59  import org.apache.hadoop.hbase.replication.ReplicationPeer;
60  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
61  import org.apache.hadoop.hbase.replication.ReplicationPeers;
62  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
63  import org.apache.hadoop.hbase.replication.ReplicationQueues;
64  import org.apache.hadoop.hbase.replication.ReplicationTracker;
65  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66  
67  /**
68   * This class is responsible to manage all the replication
69   * sources. There are two classes of sources:
70   * <ul>
71   * <li> Normal sources are persistent and one per peer cluster</li>
72   * <li> Old sources are recovered from a failed region server and our
73   * only goal is to finish replicating the WAL queue it had up in ZK</li>
74   * </ul>
75   *
76   * When a region server dies, this class uses a watcher to get notified and it
77   * tries to grab a lock in order to transfer all the queues in a local
78   * old source.
79   *
80   * This class implements the ReplicationListener interface so that it can track changes in
81   * replication state.
82   */
83  @InterfaceAudience.Private
84  public class ReplicationSourceManager implements ReplicationListener {
85    private static final Log LOG =
86        LogFactory.getLog(ReplicationSourceManager.class);
87    // List of all the sources that read this RS's logs
88    private final List<ReplicationSourceInterface> sources;
89    // List of all the sources we got from died RSs
90    private final List<ReplicationSourceInterface> oldsources;
91    private final ReplicationQueues replicationQueues;
92    private final ReplicationTracker replicationTracker;
93    private final ReplicationPeers replicationPeers;
94    // UUID for this cluster
95    private final UUID clusterId;
96    // All about stopping
97    private final Server server;
98    // All logs we are currently tracking
99    // Index structure of the map is: peer_id->logPrefix/logGroup->logs
100   private final Map<String, Map<String, SortedSet<String>>> walsById;
101   // Logs for recovered sources we are currently tracking
102   private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
103   private final Configuration conf;
104   private final FileSystem fs;
105   // The paths to the latest log of each wal group, for new coming peers
106   private Set<Path> latestPaths;
107   // Path to the wals directories
108   private final Path logDir;
109   // Path to the wal archive
110   private final Path oldLogDir;
111   // The number of ms that we wait before moving znodes, HBASE-3596
112   private final long sleepBeforeFailover;
113   // Homemade executer service for replication
114   private final ThreadPoolExecutor executor;
115 
116   private final Random rand;
117 
118 
119   /**
120    * Creates a replication manager and sets the watch on all the other registered region servers
121    * @param replicationQueues the interface for manipulating replication queues
122    * @param replicationPeers
123    * @param replicationTracker
124    * @param conf the configuration to use
125    * @param server the server for this region server
126    * @param fs the file system to use
127    * @param logDir the directory that contains all wal directories of live RSs
128    * @param oldLogDir the directory where old logs are archived
129    * @param clusterId
130    */
131   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
132       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
133       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
134       final Path oldLogDir, final UUID clusterId) {
135     //CopyOnWriteArrayList is thread-safe.
136     //Generally, reading is more than modifying.
137     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
138     this.replicationQueues = replicationQueues;
139     this.replicationPeers = replicationPeers;
140     this.replicationTracker = replicationTracker;
141     this.server = server;
142     this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
143     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
144     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
145     this.conf = conf;
146     this.fs = fs;
147     this.logDir = logDir;
148     this.oldLogDir = oldLogDir;
149     this.sleepBeforeFailover =
150         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
151     this.clusterId = clusterId;
152     this.replicationTracker.registerListener(this);
153     this.replicationPeers.getAllPeerIds();
154     // It's preferable to failover 1 RS at a time, but with good zk servers
155     // more could be processed at the same time.
156     int nbWorkers = conf.getInt("replication.executor.workers", 1);
157     // use a short 100ms sleep since this could be done inline with a RS startup
158     // even if we fail, other region servers can take care of it
159     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
160         100, TimeUnit.MILLISECONDS,
161         new LinkedBlockingQueue<Runnable>());
162     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
163     tfb.setNameFormat("ReplicationExecutor-%d");
164     tfb.setDaemon(true);
165     this.executor.setThreadFactory(tfb.build());
166     this.rand = new Random();
167     this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
168   }
169 
170   /**
171    * Provide the id of the peer and a log key and this method will figure which
172    * wal it belongs to and will log, for this region server, the current
173    * position. It will also clean old logs from the queue.
174    * @param log Path to the log currently being replicated from
175    * replication status in zookeeper. It will also delete older entries.
176    * @param id id of the peer cluster
177    * @param position current location in the log
178    * @param queueRecovered indicates if this queue comes from another region server
179    * @param holdLogInZK if true then the log is retained in ZK
180    */
181   public void logPositionAndCleanOldLogs(Path log, String id, long position,
182       boolean queueRecovered, boolean holdLogInZK) {
183     String fileName = log.getName();
184     this.replicationQueues.setLogPosition(id, fileName, position);
185     if (holdLogInZK) {
186      return;
187     }
188     cleanOldLogs(fileName, id, queueRecovered);
189   }
190 
191   /**
192    * Cleans a log file and all older files from ZK. Called when we are sure that a
193    * log file is closed and has no more entries.
194    * @param key Path to the log
195    * @param id id of the peer cluster
196    * @param queueRecovered Whether this is a recovered queue
197    */
198   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
199     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
200     if (queueRecovered) {
201       SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
202       if (wals != null && !wals.first().equals(key)) {
203         cleanOldLogs(wals, key, id);
204       }
205     } else {
206       synchronized (this.walsById) {
207         SortedSet<String> wals = walsById.get(id).get(logPrefix);
208         if (wals != null && !wals.first().equals(key)) {
209           cleanOldLogs(wals, key, id);
210         }
211       }
212     }
213  }
214 
215   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
216     SortedSet<String> walSet = wals.headSet(key);
217     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
218     for (String wal : walSet) {
219       this.replicationQueues.removeLog(id, wal);
220     }
221     walSet.clear();
222   }
223 
224   /**
225    * Adds a normal source per registered peer cluster and tries to process all
226    * old region server wal queues
227    */
228   protected void init() throws IOException, ReplicationException {
229     for (String id : this.replicationPeers.getPeerIds()) {
230       addSource(id);
231     }
232     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
233     if (currentReplicators == null || currentReplicators.size() == 0) {
234       return;
235     }
236     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
237     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
238         + otherRegionServers);
239 
240     // Look if there's anything to process after a restart
241     for (String rs : currentReplicators) {
242       if (!otherRegionServers.contains(rs)) {
243         transferQueues(rs);
244       }
245     }
246   }
247 
248   /**
249    * Add sources for the given peer cluster on this region server. For the newly added peer, we only
250    * need to enqueue the latest log of each wal group and do replication
251    * @param id the id of the peer cluster
252    * @return the source that was created
253    * @throws IOException
254    */
255   protected ReplicationSourceInterface addSource(String id) throws IOException,
256       ReplicationException {
257     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
258     ReplicationPeer peer = replicationPeers.getPeer(id);
259     ReplicationSourceInterface src =
260         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
261           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
262     synchronized (this.walsById) {
263       this.sources.add(src);
264       Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
265       this.walsById.put(id, walsByGroup);
266       // Add the latest wal to that source's queue
267       synchronized (latestPaths) {
268         if (this.latestPaths.size() > 0) {
269           for (Path logPath : latestPaths) {
270             String name = logPath.getName();
271             String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
272             SortedSet<String> logs = new TreeSet<String>();
273             logs.add(name);
274             walsByGroup.put(walPrefix, logs);
275             try {
276               this.replicationQueues.addLog(id, name);
277             } catch (ReplicationException e) {
278               String message =
279                   "Cannot add log to queue when creating a new source, queueId=" + id
280                       + ", filename=" + name;
281               server.stop(message);
282               throw e;
283             }
284             src.enqueueLog(logPath);
285           }
286         }
287       }
288     }
289     src.startup();
290     return src;
291   }
292 
293   /**
294    * Delete a complete queue of wals associated with a peer cluster
295    * @param peerId Id of the peer cluster queue of wals to delete
296    */
297   public void deleteSource(String peerId, boolean closeConnection) {
298     this.replicationQueues.removeQueue(peerId);
299     if (closeConnection) {
300       this.replicationPeers.peerRemoved(peerId);
301     }
302   }
303 
304   /**
305    * Terminate the replication on this region server
306    */
307   public void join() {
308     this.executor.shutdown();
309     if (this.sources.size() == 0) {
310       this.replicationQueues.removeAllQueues();
311     }
312     for (ReplicationSourceInterface source : this.sources) {
313       source.terminate("Region server is closing");
314     }
315   }
316 
317   /**
318    * Get a copy of the wals of the first source on this rs
319    * @return a sorted set of wal names
320    */
321   protected Map<String, Map<String, SortedSet<String>>> getWALs() {
322     return Collections.unmodifiableMap(walsById);
323   }
324 
325   /**
326    * Get a copy of the wals of the recovered sources on this rs
327    * @return a sorted set of wal names
328    */
329   protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
330     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
331   }
332 
333   /**
334    * Get a list of all the normal sources of this rs
335    * @return lis of all sources
336    */
337   public List<ReplicationSourceInterface> getSources() {
338     return this.sources;
339   }
340 
341   /**
342    * Get a list of all the old sources of this rs
343    * @return list of all old sources
344    */
345   public List<ReplicationSourceInterface> getOldSources() {
346     return this.oldsources;
347   }
348 
349   @VisibleForTesting
350   List<String> getAllQueues() {
351     return replicationQueues.getAllQueues();
352   }
353 
354   void preLogRoll(Path newLog) throws IOException {
355     recordLog(newLog);
356     String logName = newLog.getName();
357     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
358     synchronized (latestPaths) {
359       Iterator<Path> iterator = latestPaths.iterator();
360       while (iterator.hasNext()) {
361         Path path = iterator.next();
362         if (path.getName().contains(logPrefix)) {
363           iterator.remove();
364           break;
365         }
366       }
367       this.latestPaths.add(newLog);
368     }
369   }
370 
371   /**
372    * Check and enqueue the given log to the correct source. If there's still no source for the
373    * group to which the given log belongs, create one
374    * @param logPath the log path to check and enqueue
375    * @throws IOException
376    */
377   private void recordLog(Path logPath) throws IOException {
378     String logName = logPath.getName();
379     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
380     // update replication queues on ZK
381     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
382     synchronized (replicationPeers) {
383       for (String id : replicationPeers.getPeerIds()) {
384         try {
385           this.replicationQueues.addLog(id, logName);
386         } catch (ReplicationException e) {
387           throw new IOException("Cannot add log to replication queue"
388               + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
389         }
390       }
391     }
392     // update walsById map
393     synchronized (walsById) {
394       for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
395         String peerId = entry.getKey();
396         Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
397         boolean existingPrefix = false;
398         for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
399           SortedSet<String> wals = walsEntry.getValue();
400           if (this.sources.isEmpty()) {
401             // If there's no slaves, don't need to keep the old wals since
402             // we only consider the last one when a new slave comes in
403             wals.clear();
404           }
405           if (logPrefix.equals(walsEntry.getKey())) {
406             wals.add(logName);
407             existingPrefix = true;
408           }
409         }
410         if (!existingPrefix) {
411           // The new log belongs to a new group, add it into this peer
412           LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
413           SortedSet<String> wals = new TreeSet<String>();
414           wals.add(logName);
415           walsByPrefix.put(logPrefix, wals);
416         }
417       }
418     }
419   }
420 
421   void postLogRoll(Path newLog) throws IOException {
422     // This only updates the sources we own, not the recovered ones
423     for (ReplicationSourceInterface source : this.sources) {
424       source.enqueueLog(newLog);
425     }
426   }
427 
428   /**
429    * Factory method to create a replication source
430    * @param conf the configuration to use
431    * @param fs the file system to use
432    * @param manager the manager to use
433    * @param server the server object for this region server
434    * @param peerId the id of the peer cluster
435    * @return the created source
436    * @throws IOException
437    */
438   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
439       final FileSystem fs, final ReplicationSourceManager manager,
440       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
441       final Server server, final String peerId, final UUID clusterId,
442       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
443       throws IOException {
444     RegionServerCoprocessorHost rsServerHost = null;
445     TableDescriptors tableDescriptors = null;
446     if (server instanceof HRegionServer) {
447       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
448       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
449     }
450     ReplicationSourceInterface src;
451     try {
452       @SuppressWarnings("rawtypes")
453       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
454           ReplicationSource.class.getCanonicalName()));
455       src = (ReplicationSourceInterface) c.newInstance();
456     } catch (Exception e) {
457       LOG.warn("Passed replication source implementation throws errors, " +
458           "defaulting to ReplicationSource", e);
459       src = new ReplicationSource();
460     }
461 
462     ReplicationEndpoint replicationEndpoint = null;
463     try {
464       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
465       if (replicationEndpointImpl == null) {
466         // Default to HBase inter-cluster replication endpoint
467         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
468       }
469       @SuppressWarnings("rawtypes")
470       Class c = Class.forName(replicationEndpointImpl);
471       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
472       if(rsServerHost != null) {
473         ReplicationEndpoint newReplicationEndPoint = rsServerHost
474             .postCreateReplicationEndPoint(replicationEndpoint);
475         if(newReplicationEndPoint != null) {
476           // Override the newly created endpoint from the hook with configured end point
477           replicationEndpoint = newReplicationEndPoint;
478         }
479       }
480     } catch (Exception e) {
481       LOG.warn("Passed replication endpoint implementation throws errors"
482           + " while initializing ReplicationSource for peer: " + peerId, e);
483       throw new IOException(e);
484     }
485 
486     MetricsSource metrics = new MetricsSource(peerId);
487     // init replication source
488     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
489       clusterId, replicationEndpoint, metrics);
490 
491     // init replication endpoint
492     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
493       fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
494 
495     return src;
496   }
497 
498   /**
499    * Transfer all the queues of the specified to this region server.
500    * First it tries to grab a lock and if it works it will move the
501    * znodes and finally will delete the old znodes.
502    *
503    * It creates one old source for any type of source of the old rs.
504    * @param rsZnode
505    */
506   private void transferQueues(String rsZnode) {
507     NodeFailoverWorker transfer =
508         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
509             this.clusterId);
510     try {
511       this.executor.execute(transfer);
512     } catch (RejectedExecutionException ex) {
513       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
514     }
515   }
516 
517   /**
518    * Clear the references to the specified old source
519    * @param src source to clear
520    */
521   public void closeRecoveredQueue(ReplicationSourceInterface src) {
522     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
523     if (src instanceof ReplicationSource) {
524       ((ReplicationSource) src).getSourceMetrics().clear();
525     }
526     this.oldsources.remove(src);
527     deleteSource(src.getPeerClusterZnode(), false);
528     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
529   }
530 
531   /**
532    * Clear the references to the specified old source
533    * @param src source to clear
534    */
535   public void closeQueue(ReplicationSourceInterface src) {
536     LOG.info("Done with the queue " + src.getPeerClusterZnode());
537     if (src instanceof ReplicationSource) {
538       ((ReplicationSource) src).getSourceMetrics().clear();
539     }
540     this.sources.remove(src);
541     deleteSource(src.getPeerClusterZnode(), true);
542     this.walsById.remove(src.getPeerClusterZnode());
543   }
544 
545   /**
546    * Thie method first deletes all the recovered sources for the specified
547    * id, then deletes the normal source (deleting all related data in ZK).
548    * @param id The id of the peer cluster
549    */
550   public void removePeer(String id) {
551     LOG.info("Closing the following queue " + id + ", currently have "
552         + sources.size() + " and another "
553         + oldsources.size() + " that were recovered");
554     String terminateMessage = "Replication stream was removed by a user";
555     List<ReplicationSourceInterface> oldSourcesToDelete =
556         new ArrayList<ReplicationSourceInterface>();
557     // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
558     // see NodeFailoverWorker.run
559     synchronized (oldsources) {
560       // First close all the recovered sources for this peer
561       for (ReplicationSourceInterface src : oldsources) {
562         if (id.equals(src.getPeerClusterId())) {
563           oldSourcesToDelete.add(src);
564         }
565       }
566       for (ReplicationSourceInterface src : oldSourcesToDelete) {
567         src.terminate(terminateMessage);
568         closeRecoveredQueue(src);
569       }
570     }
571     LOG.info("Number of deleted recovered sources for " + id + ": "
572         + oldSourcesToDelete.size());
573     // Now look for the one on this cluster
574     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
575     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
576     synchronized (this.replicationPeers) {
577       for (ReplicationSourceInterface src : this.sources) {
578         if (id.equals(src.getPeerClusterId())) {
579           srcToRemove.add(src);
580         }
581       }
582       if (srcToRemove.size() == 0) {
583         LOG.error("The queue we wanted to close is missing " + id);
584         return;
585       }
586       for (ReplicationSourceInterface toRemove : srcToRemove) {
587         toRemove.terminate(terminateMessage);
588         if (toRemove instanceof ReplicationSource) {
589           ((ReplicationSource) toRemove).getSourceMetrics().clear();
590         }
591         this.sources.remove(toRemove);
592       }
593       deleteSource(id, true);
594     }
595   }
596 
597   @Override
598   public void regionServerRemoved(String regionserver) {
599     transferQueues(regionserver);
600   }
601 
602   @Override
603   public void peerRemoved(String peerId) {
604     removePeer(peerId);
605   }
606 
607   @Override
608   public void peerListChanged(List<String> peerIds) {
609     for (String id : peerIds) {
610       try {
611         boolean added = this.replicationPeers.peerAdded(id);
612         if (added) {
613           addSource(id);
614         }
615       } catch (Exception e) {
616         LOG.error("Error while adding a new peer", e);
617       }
618     }
619   }
620 
621   /**
622    * Class responsible to setup new ReplicationSources to take care of the
623    * queues from dead region servers.
624    */
625   class NodeFailoverWorker extends Thread {
626 
627     private String rsZnode;
628     private final ReplicationQueues rq;
629     private final ReplicationPeers rp;
630     private final UUID clusterId;
631 
632     /**
633      * @param rsZnode
634      */
635     public NodeFailoverWorker(String rsZnode) {
636       this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
637     }
638 
639     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
640         final ReplicationPeers replicationPeers, final UUID clusterId) {
641       super("Failover-for-"+rsZnode);
642       this.rsZnode = rsZnode;
643       this.rq = replicationQueues;
644       this.rp = replicationPeers;
645       this.clusterId = clusterId;
646     }
647 
648     @Override
649     public void run() {
650       if (this.rq.isThisOurZnode(rsZnode)) {
651         return;
652       }
653       // Wait a bit before transferring the queues, we may be shutting down.
654       // This sleep may not be enough in some cases.
655       try {
656         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
657       } catch (InterruptedException e) {
658         LOG.warn("Interrupted while waiting before transferring a queue.");
659         Thread.currentThread().interrupt();
660       }
661       // We try to lock that rs' queue directory
662       if (server.isStopped()) {
663         LOG.info("Not transferring queue since we are shutting down");
664         return;
665       }
666       SortedMap<String, SortedSet<String>> newQueues = null;
667 
668       newQueues = this.rq.claimQueues(rsZnode);
669 
670       // Copying over the failed queue is completed.
671       if (newQueues.isEmpty()) {
672         // We either didn't get the lock or the failed region server didn't have any outstanding
673         // WALs to replicate, so we are done.
674         return;
675       }
676 
677       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
678         String peerId = entry.getKey();
679         SortedSet<String> walsSet = entry.getValue();
680         try {
681           // there is not an actual peer defined corresponding to peerId for the failover.
682           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
683           String actualPeerId = replicationQueueInfo.getPeerId();
684           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
685           ReplicationPeerConfig peerConfig = null;
686           try {
687             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
688           } catch (ReplicationException ex) {
689             LOG.warn("Received exception while getting replication peer config, skipping replay"
690                 + ex);
691           }
692           if (peer == null || peerConfig == null) {
693             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
694             replicationQueues.removeQueue(peerId);
695             continue;
696           }
697           // track sources in walsByIdRecoveredQueues
698           Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
699           walsByIdRecoveredQueues.put(peerId, walsByGroup);
700           for (String wal : walsSet) {
701             String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
702             SortedSet<String> wals = walsByGroup.get(walPrefix);
703             if (wals == null) {
704               wals = new TreeSet<String>();
705               walsByGroup.put(walPrefix, wals);
706             }
707             wals.add(wal);
708           }
709 
710           // enqueue sources
711           ReplicationSourceInterface src =
712               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
713                 server, peerId, this.clusterId, peerConfig, peer);
714           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
715           // see removePeer
716           synchronized (oldsources) {
717             if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
718               src.terminate("Recovered queue doesn't belong to any current peer");
719               closeRecoveredQueue(src);
720               continue;
721             }
722             oldsources.add(src);
723             for (String wal : walsSet) {
724               src.enqueueLog(new Path(oldLogDir, wal));
725             }
726             src.startup();
727           }
728         } catch (IOException e) {
729           // TODO manage it
730           LOG.error("Failed creating a source", e);
731         }
732       }
733     }
734   }
735 
736   /**
737    * Get the directory where wals are archived
738    * @return the directory where wals are archived
739    */
740   public Path getOldLogDir() {
741     return this.oldLogDir;
742   }
743 
744   /**
745    * Get the directory where wals are stored by their RSs
746    * @return the directory where wals are stored by their RSs
747    */
748   public Path getLogDir() {
749     return this.logDir;
750   }
751 
752   /**
753    * Get the handle on the local file system
754    * @return Handle on the local file system
755    */
756   public FileSystem getFs() {
757     return this.fs;
758   }
759 
760   /**
761    * Get a string representation of all the sources' metrics
762    */
763   public String getStats() {
764     StringBuffer stats = new StringBuffer();
765     for (ReplicationSourceInterface source : sources) {
766       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
767       stats.append(source.getStats() + "\n");
768     }
769     for (ReplicationSourceInterface oldSource : oldsources) {
770       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
771       stats.append(oldSource.getStats()+ "\n");
772     }
773     return stats.toString();
774   }
775 }