View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Random;
31  import java.util.Set;
32  import java.util.SortedMap;
33  import java.util.SortedSet;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadPoolExecutor;
41  import java.util.concurrent.TimeUnit;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.Server;
50  import org.apache.hadoop.hbase.TableDescriptors;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.regionserver.HRegionServer;
54  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
55  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
56  import org.apache.hadoop.hbase.replication.ReplicationException;
57  import org.apache.hadoop.hbase.replication.ReplicationListener;
58  import org.apache.hadoop.hbase.replication.ReplicationPeer;
59  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
60  import org.apache.hadoop.hbase.replication.ReplicationPeers;
61  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
62  import org.apache.hadoop.hbase.replication.ReplicationQueues;
63  import org.apache.hadoop.hbase.replication.ReplicationTracker;
64  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
65  
66  import com.google.common.util.concurrent.ThreadFactoryBuilder;
67  
68  /**
69   * This class is responsible to manage all the replication
70   * sources. There are two classes of sources:
71   * <ul>
72   * <li> Normal sources are persistent and one per peer cluster</li>
73   * <li> Old sources are recovered from a failed region server and our
74   * only goal is to finish replicating the WAL queue it had up in ZK</li>
75   * </ul>
76   *
77   * When a region server dies, this class uses a watcher to get notified and it
78   * tries to grab a lock in order to transfer all the queues in a local
79   * old source.
80   *
81   * This class implements the ReplicationListener interface so that it can track changes in
82   * replication state.
83   */
84  @InterfaceAudience.Private
85  public class ReplicationSourceManager implements ReplicationListener {
86    private static final Log LOG =
87        LogFactory.getLog(ReplicationSourceManager.class);
88    // List of all the sources that read this RS's logs
89    private final List<ReplicationSourceInterface> sources;
90    // List of all the sources we got from died RSs
91    private final List<ReplicationSourceInterface> oldsources;
92    private final ReplicationQueues replicationQueues;
93    private final ReplicationTracker replicationTracker;
94    private final ReplicationPeers replicationPeers;
95    // UUID for this cluster
96    private final UUID clusterId;
97    // All about stopping
98    private final Server server;
99    // All logs we are currently tracking
100   // Index structure of the map is: peer_id->logPrefix/logGroup->logs
101   private final Map<String, Map<String, SortedSet<String>>> walsById;
102   // Logs for recovered sources we are currently tracking
103   private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
104   private final Configuration conf;
105   private final FileSystem fs;
106   // The paths to the latest log of each wal group, for new coming peers
107   private Set<Path> latestPaths;
108   // Path to the wals directories
109   private final Path logDir;
110   // Path to the wal archive
111   private final Path oldLogDir;
112   // The number of ms that we wait before moving znodes, HBASE-3596
113   private final long sleepBeforeFailover;
114   // Homemade executer service for replication
115   private final ThreadPoolExecutor executor;
116 
117   private final Random rand;
118 
119 
120   /**
121    * Creates a replication manager and sets the watch on all the other registered region servers
122    * @param replicationQueues the interface for manipulating replication queues
123    * @param replicationPeers
124    * @param replicationTracker
125    * @param conf the configuration to use
126    * @param server the server for this region server
127    * @param fs the file system to use
128    * @param logDir the directory that contains all wal directories of live RSs
129    * @param oldLogDir the directory where old logs are archived
130    * @param clusterId
131    */
132   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
133       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
134       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
135       final Path oldLogDir, final UUID clusterId) {
136     //CopyOnWriteArrayList is thread-safe.
137     //Generally, reading is more than modifying.
138     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
139     this.replicationQueues = replicationQueues;
140     this.replicationPeers = replicationPeers;
141     this.replicationTracker = replicationTracker;
142     this.server = server;
143     this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
144     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
145     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
146     this.conf = conf;
147     this.fs = fs;
148     this.logDir = logDir;
149     this.oldLogDir = oldLogDir;
150     this.sleepBeforeFailover =
151         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
152     this.clusterId = clusterId;
153     this.replicationTracker.registerListener(this);
154     this.replicationPeers.getAllPeerIds();
155     // It's preferable to failover 1 RS at a time, but with good zk servers
156     // more could be processed at the same time.
157     int nbWorkers = conf.getInt("replication.executor.workers", 1);
158     // use a short 100ms sleep since this could be done inline with a RS startup
159     // even if we fail, other region servers can take care of it
160     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
161         100, TimeUnit.MILLISECONDS,
162         new LinkedBlockingQueue<Runnable>());
163     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
164     tfb.setNameFormat("ReplicationExecutor-%d");
165     tfb.setDaemon(true);
166     this.executor.setThreadFactory(tfb.build());
167     this.rand = new Random();
168     this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
169   }
170 
171   /**
172    * Provide the id of the peer and a log key and this method will figure which
173    * wal it belongs to and will log, for this region server, the current
174    * position. It will also clean old logs from the queue.
175    * @param log Path to the log currently being replicated from
176    * replication status in zookeeper. It will also delete older entries.
177    * @param id id of the peer cluster
178    * @param position current location in the log
179    * @param queueRecovered indicates if this queue comes from another region server
180    * @param holdLogInZK if true then the log is retained in ZK
181    */
182   public void logPositionAndCleanOldLogs(Path log, String id, long position,
183       boolean queueRecovered, boolean holdLogInZK) {
184     String fileName = log.getName();
185     this.replicationQueues.setLogPosition(id, fileName, position);
186     if (holdLogInZK) {
187      return;
188     }
189     cleanOldLogs(fileName, id, queueRecovered);
190   }
191 
192   /**
193    * Cleans a log file and all older files from ZK. Called when we are sure that a
194    * log file is closed and has no more entries.
195    * @param key Path to the log
196    * @param id id of the peer cluster
197    * @param queueRecovered Whether this is a recovered queue
198    */
199   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
200     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
201     if (queueRecovered) {
202       SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
203       if (wals != null && !wals.first().equals(key)) {
204         cleanOldLogs(wals, key, id);
205       }
206     } else {
207       synchronized (this.walsById) {
208         SortedSet<String> wals = walsById.get(id).get(logPrefix);
209         if (wals != null && !wals.first().equals(key)) {
210           cleanOldLogs(wals, key, id);
211         }
212       }
213     }
214  }
215 
216   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
217     SortedSet<String> walSet = wals.headSet(key);
218     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
219     for (String wal : walSet) {
220       this.replicationQueues.removeLog(id, wal);
221     }
222     walSet.clear();
223   }
224 
225   /**
226    * Adds a normal source per registered peer cluster and tries to process all
227    * old region server wal queues
228    */
229   protected void init() throws IOException, ReplicationException {
230     boolean replicationForBulkLoadDataEnabled =
231         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
232           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
233     for (String id : this.replicationPeers.getPeerIds()) {
234       addSource(id);
235       if (replicationForBulkLoadDataEnabled) {
236         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
237         // when a peer was added before replication for bulk loaded data was enabled.
238         this.replicationQueues.addPeerToHFileRefs(id);
239       }
240     }
241     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
242     if (currentReplicators == null || currentReplicators.size() == 0) {
243       return;
244     }
245     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
246     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
247         + otherRegionServers);
248 
249     // Look if there's anything to process after a restart
250     for (String rs : currentReplicators) {
251       if (!otherRegionServers.contains(rs)) {
252         transferQueues(rs);
253       }
254     }
255   }
256 
257   /**
258    * Add sources for the given peer cluster on this region server. For the newly added peer, we only
259    * need to enqueue the latest log of each wal group and do replication
260    * @param id the id of the peer cluster
261    * @return the source that was created
262    * @throws IOException
263    */
264   protected ReplicationSourceInterface addSource(String id) throws IOException,
265       ReplicationException {
266     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
267     ReplicationPeer peer = replicationPeers.getPeer(id);
268     ReplicationSourceInterface src =
269         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
270           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
271     synchronized (this.walsById) {
272       this.sources.add(src);
273       Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
274       this.walsById.put(id, walsByGroup);
275       // Add the latest wal to that source's queue
276       synchronized (latestPaths) {
277         if (this.latestPaths.size() > 0) {
278           for (Path logPath : latestPaths) {
279             String name = logPath.getName();
280             String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
281             SortedSet<String> logs = new TreeSet<String>();
282             logs.add(name);
283             walsByGroup.put(walPrefix, logs);
284             try {
285               this.replicationQueues.addLog(id, name);
286             } catch (ReplicationException e) {
287               String message =
288                   "Cannot add log to queue when creating a new source, queueId=" + id
289                       + ", filename=" + name;
290               server.stop(message);
291               throw e;
292             }
293             src.enqueueLog(logPath);
294           }
295         }
296       }
297     }
298     src.startup();
299     return src;
300   }
301 
302   /**
303    * Delete a complete queue of wals associated with a peer cluster
304    * @param peerId Id of the peer cluster queue of wals to delete
305    */
306   public void deleteSource(String peerId, boolean closeConnection) {
307     this.replicationQueues.removeQueue(peerId);
308     if (closeConnection) {
309       this.replicationPeers.peerRemoved(peerId);
310     }
311   }
312 
313   /**
314    * Terminate the replication on this region server
315    */
316   public void join() {
317     this.executor.shutdown();
318     if (this.sources.size() == 0) {
319       this.replicationQueues.removeAllQueues();
320     }
321     for (ReplicationSourceInterface source : this.sources) {
322       source.terminate("Region server is closing");
323     }
324   }
325 
326   /**
327    * Get a copy of the wals of the first source on this rs
328    * @return a sorted set of wal names
329    */
330   protected Map<String, Map<String, SortedSet<String>>> getWALs() {
331     return Collections.unmodifiableMap(walsById);
332   }
333 
334   /**
335    * Get a copy of the wals of the recovered sources on this rs
336    * @return a sorted set of wal names
337    */
338   protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
339     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
340   }
341 
342   /**
343    * Get a list of all the normal sources of this rs
344    * @return lis of all sources
345    */
346   public List<ReplicationSourceInterface> getSources() {
347     return this.sources;
348   }
349 
350   /**
351    * Get a list of all the old sources of this rs
352    * @return list of all old sources
353    */
354   public List<ReplicationSourceInterface> getOldSources() {
355     return this.oldsources;
356   }
357 
358   void preLogRoll(Path newLog) throws IOException {
359     recordLog(newLog);
360     String logName = newLog.getName();
361     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
362     synchronized (latestPaths) {
363       Iterator<Path> iterator = latestPaths.iterator();
364       while (iterator.hasNext()) {
365         Path path = iterator.next();
366         if (path.getName().contains(logPrefix)) {
367           iterator.remove();
368           break;
369         }
370       }
371       this.latestPaths.add(newLog);
372     }
373   }
374 
375   /**
376    * Check and enqueue the given log to the correct source. If there's still no source for the
377    * group to which the given log belongs, create one
378    * @param logPath the log path to check and enqueue
379    * @throws IOException
380    */
381   private void recordLog(Path logPath) throws IOException {
382     String logName = logPath.getName();
383     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
384     // update replication queues on ZK
385     synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
386                                      // the to-be-removed peer
387       for (String id : replicationPeers.getPeerIds()) {
388         try {
389           this.replicationQueues.addLog(id, logName);
390         } catch (ReplicationException e) {
391           throw new IOException("Cannot add log to replication queue"
392               + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
393         }
394       }
395     }
396     // update walsById map
397     synchronized (walsById) {
398       for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
399         String peerId = entry.getKey();
400         Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
401         boolean existingPrefix = false;
402         for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
403           SortedSet<String> wals = walsEntry.getValue();
404           if (this.sources.isEmpty()) {
405             // If there's no slaves, don't need to keep the old wals since
406             // we only consider the last one when a new slave comes in
407             wals.clear();
408           }
409           if (logPrefix.equals(walsEntry.getKey())) {
410             wals.add(logName);
411             existingPrefix = true;
412           }
413         }
414         if (!existingPrefix) {
415           // The new log belongs to a new group, add it into this peer
416           LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
417           SortedSet<String> wals = new TreeSet<String>();
418           wals.add(logName);
419           walsByPrefix.put(logPrefix, wals);
420         }
421       }
422     }
423   }
424 
425   void postLogRoll(Path newLog) throws IOException {
426     // This only updates the sources we own, not the recovered ones
427     for (ReplicationSourceInterface source : this.sources) {
428       source.enqueueLog(newLog);
429     }
430   }
431 
432   /**
433    * Factory method to create a replication source
434    * @param conf the configuration to use
435    * @param fs the file system to use
436    * @param manager the manager to use
437    * @param server the server object for this region server
438    * @param peerId the id of the peer cluster
439    * @return the created source
440    * @throws IOException
441    */
442   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
443       final FileSystem fs, final ReplicationSourceManager manager,
444       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
445       final Server server, final String peerId, final UUID clusterId,
446       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
447       throws IOException {
448     RegionServerCoprocessorHost rsServerHost = null;
449     TableDescriptors tableDescriptors = null;
450     if (server instanceof HRegionServer) {
451       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
452       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
453     }
454     ReplicationSourceInterface src;
455     try {
456       @SuppressWarnings("rawtypes")
457       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
458           ReplicationSource.class.getCanonicalName()));
459       src = (ReplicationSourceInterface) c.newInstance();
460     } catch (Exception e) {
461       LOG.warn("Passed replication source implementation throws errors, " +
462           "defaulting to ReplicationSource", e);
463       src = new ReplicationSource();
464     }
465 
466     ReplicationEndpoint replicationEndpoint = null;
467     try {
468       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
469       if (replicationEndpointImpl == null) {
470         // Default to HBase inter-cluster replication endpoint
471         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
472       }
473       @SuppressWarnings("rawtypes")
474       Class c = Class.forName(replicationEndpointImpl);
475       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
476       if(rsServerHost != null) {
477         ReplicationEndpoint newReplicationEndPoint = rsServerHost
478             .postCreateReplicationEndPoint(replicationEndpoint);
479         if(newReplicationEndPoint != null) {
480           // Override the newly created endpoint from the hook with configured end point
481           replicationEndpoint = newReplicationEndPoint;
482         }
483       }
484     } catch (Exception e) {
485       LOG.warn("Passed replication endpoint implementation throws errors"
486           + " while initializing ReplicationSource for peer: " + peerId, e);
487       throw new IOException(e);
488     }
489 
490     MetricsSource metrics = new MetricsSource(peerId);
491     // init replication source
492     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
493       clusterId, replicationEndpoint, metrics);
494 
495     // init replication endpoint
496     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
497       fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
498 
499     return src;
500   }
501 
502   /**
503    * Transfer all the queues of the specified to this region server.
504    * First it tries to grab a lock and if it works it will move the
505    * znodes and finally will delete the old znodes.
506    *
507    * It creates one old source for any type of source of the old rs.
508    * @param rsZnode
509    */
510   private void transferQueues(String rsZnode) {
511     NodeFailoverWorker transfer =
512         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
513             this.clusterId);
514     try {
515       this.executor.execute(transfer);
516     } catch (RejectedExecutionException ex) {
517       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
518     }
519   }
520 
521   /**
522    * Clear the references to the specified old source
523    * @param src source to clear
524    */
525   public void closeRecoveredQueue(ReplicationSourceInterface src) {
526     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
527     this.oldsources.remove(src);
528     deleteSource(src.getPeerClusterZnode(), false);
529     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
530   }
531 
532   /**
533    * Thie method first deletes all the recovered sources for the specified
534    * id, then deletes the normal source (deleting all related data in ZK).
535    * @param id The id of the peer cluster
536    */
537   public void removePeer(String id) {
538     LOG.info("Closing the following queue " + id + ", currently have "
539         + sources.size() + " and another "
540         + oldsources.size() + " that were recovered");
541     String terminateMessage = "Replication stream was removed by a user";
542     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
543     List<ReplicationSourceInterface> oldSourcesToDelete =
544         new ArrayList<ReplicationSourceInterface>();
545     // First close all the recovered sources for this peer
546     for (ReplicationSourceInterface src : oldsources) {
547       if (id.equals(src.getPeerClusterId())) {
548         oldSourcesToDelete.add(src);
549       }
550     }
551     for (ReplicationSourceInterface src : oldSourcesToDelete) {
552       src.terminate(terminateMessage);
553       closeRecoveredQueue((src));
554     }
555     LOG.info("Number of deleted recovered sources for " + id + ": "
556         + oldSourcesToDelete.size());
557     // Now look for the one on this cluster
558     synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
559                                           // for the to-be-removed peer
560       for (ReplicationSourceInterface src : this.sources) {
561         if (id.equals(src.getPeerClusterId())) {
562           srcToRemove.add(src);
563         }
564       }
565       if (srcToRemove.size() == 0) {
566         LOG.error("The queue we wanted to close is missing " + id);
567         return;
568       }
569       for (ReplicationSourceInterface toRemove : srcToRemove) {
570         toRemove.terminate(terminateMessage);
571         this.sources.remove(toRemove);
572       }
573       deleteSource(id, true);
574     }
575   }
576 
577   @Override
578   public void regionServerRemoved(String regionserver) {
579     transferQueues(regionserver);
580   }
581 
582   @Override
583   public void peerRemoved(String peerId) {
584     removePeer(peerId);
585   }
586 
587   @Override
588   public void peerListChanged(List<String> peerIds) {
589     for (String id : peerIds) {
590       try {
591         boolean added = this.replicationPeers.peerAdded(id);
592         if (added) {
593           addSource(id);
594         }
595       } catch (Exception e) {
596         LOG.error("Error while adding a new peer", e);
597       }
598     }
599   }
600 
601   /**
602    * Class responsible to setup new ReplicationSources to take care of the
603    * queues from dead region servers.
604    */
605   class NodeFailoverWorker extends Thread {
606 
607     private String rsZnode;
608     private final ReplicationQueues rq;
609     private final ReplicationPeers rp;
610     private final UUID clusterId;
611 
612     /**
613      *
614      * @param rsZnode
615      */
616     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
617         final ReplicationPeers replicationPeers, final UUID clusterId) {
618       super("Failover-for-"+rsZnode);
619       this.rsZnode = rsZnode;
620       this.rq = replicationQueues;
621       this.rp = replicationPeers;
622       this.clusterId = clusterId;
623     }
624 
625     @Override
626     public void run() {
627       if (this.rq.isThisOurZnode(rsZnode)) {
628         return;
629       }
630       // Wait a bit before transferring the queues, we may be shutting down.
631       // This sleep may not be enough in some cases.
632       try {
633         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
634       } catch (InterruptedException e) {
635         LOG.warn("Interrupted while waiting before transferring a queue.");
636         Thread.currentThread().interrupt();
637       }
638       // We try to lock that rs' queue directory
639       if (server.isStopped()) {
640         LOG.info("Not transferring queue since we are shutting down");
641         return;
642       }
643       SortedMap<String, SortedSet<String>> newQueues = null;
644 
645       newQueues = this.rq.claimQueues(rsZnode);
646 
647       // Copying over the failed queue is completed.
648       if (newQueues.isEmpty()) {
649         // We either didn't get the lock or the failed region server didn't have any outstanding
650         // WALs to replicate, so we are done.
651         return;
652       }
653 
654       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
655         String peerId = entry.getKey();
656         SortedSet<String> walsSet = entry.getValue();
657         try {
658           // there is not an actual peer defined corresponding to peerId for the failover.
659           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
660           String actualPeerId = replicationQueueInfo.getPeerId();
661           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
662           ReplicationPeerConfig peerConfig = null;
663           try {
664             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
665           } catch (ReplicationException ex) {
666             LOG.warn("Received exception while getting replication peer config, skipping replay"
667                 + ex);
668           }
669           if (peer == null || peerConfig == null) {
670             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
671             continue;
672           }
673           // track sources in walsByIdRecoveredQueues
674           Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
675           walsByIdRecoveredQueues.put(peerId, walsByGroup);
676           for (String wal : walsSet) {
677             String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
678             SortedSet<String> wals = walsByGroup.get(walPrefix);
679             if (wals == null) {
680               wals = new TreeSet<String>();
681               walsByGroup.put(walPrefix, wals);
682             }
683             wals.add(wal);
684           }
685 
686           // enqueue sources
687           ReplicationSourceInterface src =
688               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
689                 server, peerId, this.clusterId, peerConfig, peer);
690           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
691             src.terminate("Recovered queue doesn't belong to any current peer");
692             break;
693           }
694           oldsources.add(src);
695           for (String wal : walsSet) {
696             src.enqueueLog(new Path(oldLogDir, wal));
697           }
698           src.startup();
699         } catch (IOException e) {
700           // TODO manage it
701           LOG.error("Failed creating a source", e);
702         }
703       }
704     }
705   }
706 
707   /**
708    * Get the directory where wals are archived
709    * @return the directory where wals are archived
710    */
711   public Path getOldLogDir() {
712     return this.oldLogDir;
713   }
714 
715   /**
716    * Get the directory where wals are stored by their RSs
717    * @return the directory where wals are stored by their RSs
718    */
719   public Path getLogDir() {
720     return this.logDir;
721   }
722 
723   /**
724    * Get the handle on the local file system
725    * @return Handle on the local file system
726    */
727   public FileSystem getFs() {
728     return this.fs;
729   }
730 
731   /**
732    * Get a string representation of all the sources' metrics
733    */
734   public String getStats() {
735     StringBuffer stats = new StringBuffer();
736     for (ReplicationSourceInterface source : sources) {
737       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
738       stats.append(source.getStats() + "\n");
739     }
740     for (ReplicationSourceInterface oldSource : oldsources) {
741       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
742       stats.append(oldSource.getStats()+ "\n");
743     }
744     return stats.toString();
745   }
746 
747   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
748       throws ReplicationException {
749     for (ReplicationSourceInterface source : this.sources) {
750       source.addHFileRefs(tableName, family, files);
751     }
752   }
753 
754   public void cleanUpHFileRefs(String peerId, List<String> files) {
755     this.replicationQueues.removeHFileRefs(peerId, files);
756   }
757 }