View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import com.google.common.util.concurrent.ThreadFactoryBuilder;
23
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.SortedSet;
35  import java.util.TreeSet;
36  import java.util.UUID;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.LinkedBlockingQueue;
40  import java.util.concurrent.RejectedExecutionException;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.Server;
51  import org.apache.hadoop.hbase.TableDescriptors;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.hbase.regionserver.HRegionServer;
55  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
56  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
57  import org.apache.hadoop.hbase.replication.ReplicationException;
58  import org.apache.hadoop.hbase.replication.ReplicationListener;
59  import org.apache.hadoop.hbase.replication.ReplicationPeer;
60  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
61  import org.apache.hadoop.hbase.replication.ReplicationPeers;
62  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
63  import org.apache.hadoop.hbase.replication.ReplicationQueues;
64  import org.apache.hadoop.hbase.replication.ReplicationTracker;
65  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
66
67  /**
68   * This class is responsible to manage all the replication
69   * sources. There are two classes of sources:
70   * <ul>
71   * <li> Normal sources are persistent and one per peer cluster</li>
72   * <li> Old sources are recovered from a failed region server and our
73   * only goal is to finish replicating the WAL queue it had up in ZK</li>
74   * </ul>
75   *
76   * When a region server dies, this class uses a watcher to get notified and it
77   * tries to grab a lock in order to transfer all the queues in a local
78   * old source.
79   *
80   * This class implements the ReplicationListener interface so that it can track changes in
81   * replication state.
82   */
83  @InterfaceAudience.Private
84  public class ReplicationSourceManager implements ReplicationListener {
85    private static final Log LOG =
86        LogFactory.getLog(ReplicationSourceManager.class);
87    // List of all the sources that read this RS's logs
88    private final List<ReplicationSourceInterface> sources;
89    // List of all the sources we got from died RSs
90    private final List<ReplicationSourceInterface> oldsources;
91    private final ReplicationQueues replicationQueues;
92    private final ReplicationTracker replicationTracker;
93    private final ReplicationPeers replicationPeers;
94    // UUID for this cluster
95    private final UUID clusterId;
96    // All about stopping
97    private final Server server;
98    // All logs we are currently tracking
99    // Index structure of the map is: peer_id->logPrefix/logGroup->logs
100   private final Map<String, Map<String, SortedSet<String>>> walsById;
101   // Logs for recovered sources we are currently tracking
102   private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
103   private final Configuration conf;
104   private final FileSystem fs;
105   // The paths to the latest log of each wal group, for new coming peers
106   private Set<Path> latestPaths;
107   // Path to the wals directories
108   private final Path logDir;
109   // Path to the wal archive
110   private final Path oldLogDir;
111   // The number of ms that we wait before moving znodes, HBASE-3596
112   private final long sleepBeforeFailover;
113   // Homemade executer service for replication
114   private final ThreadPoolExecutor executor;
115
116   private final Random rand;
117   private final boolean replicationForBulkLoadDataEnabled;
118 
119 
120   /**
121    * Creates a replication manager and sets the watch on all the other registered region servers
122    * @param replicationQueues the interface for manipulating replication queues
123    * @param replicationPeers
124    * @param replicationTracker
125    * @param conf the configuration to use
126    * @param server the server for this region server
127    * @param fs the file system to use
128    * @param logDir the directory that contains all wal directories of live RSs
129    * @param oldLogDir the directory where old logs are archived
130    * @param clusterId
131    */
132   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
133       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
134       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
135       final Path oldLogDir, final UUID clusterId) {
136     //CopyOnWriteArrayList is thread-safe.
137     //Generally, reading is more than modifying.
138     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
139     this.replicationQueues = replicationQueues;
140     this.replicationPeers = replicationPeers;
141     this.replicationTracker = replicationTracker;
142     this.server = server;
143     this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
144     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
145     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
146     this.conf = conf;
147     this.fs = fs;
148     this.logDir = logDir;
149     this.oldLogDir = oldLogDir;
150     this.sleepBeforeFailover =
151         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
152     this.clusterId = clusterId;
153     this.replicationTracker.registerListener(this);
154     this.replicationPeers.getAllPeerIds();
155     // It's preferable to failover 1 RS at a time, but with good zk servers
156     // more could be processed at the same time.
157     int nbWorkers = conf.getInt("replication.executor.workers", 1);
158     // use a short 100ms sleep since this could be done inline with a RS startup
159     // even if we fail, other region servers can take care of it
160     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
161         100, TimeUnit.MILLISECONDS,
162         new LinkedBlockingQueue<Runnable>());
163     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
164     tfb.setNameFormat("ReplicationExecutor-%d");
165     tfb.setDaemon(true);
166     this.executor.setThreadFactory(tfb.build());
167     this.rand = new Random();
168     this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
169     replicationForBulkLoadDataEnabled =
170         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
171           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
172   }
173
174   /**
175    * Provide the id of the peer and a log key and this method will figure which
176    * wal it belongs to and will log, for this region server, the current
177    * position. It will also clean old logs from the queue.
178    * @param log Path to the log currently being replicated from
179    * replication status in zookeeper. It will also delete older entries.
180    * @param id id of the peer cluster
181    * @param position current location in the log
182    * @param queueRecovered indicates if this queue comes from another region server
183    * @param holdLogInZK if true then the log is retained in ZK
184    */
185   public void logPositionAndCleanOldLogs(Path log, String id, long position,
186       boolean queueRecovered, boolean holdLogInZK) {
187     String fileName = log.getName();
188     this.replicationQueues.setLogPosition(id, fileName, position);
189     if (holdLogInZK) {
190      return;
191     }
192     cleanOldLogs(fileName, id, queueRecovered);
193   }
194
195   /**
196    * Cleans a log file and all older files from ZK. Called when we are sure that a
197    * log file is closed and has no more entries.
198    * @param key Path to the log
199    * @param id id of the peer cluster
200    * @param queueRecovered Whether this is a recovered queue
201    */
202   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
203     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
204     if (queueRecovered) {
205       SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
206       if (wals != null && !wals.first().equals(key)) {
207         cleanOldLogs(wals, key, id);
208       }
209     } else {
210       synchronized (this.walsById) {
211         SortedSet<String> wals = walsById.get(id).get(logPrefix);
212         if (wals != null && !wals.first().equals(key)) {
213           cleanOldLogs(wals, key, id);
214         }
215       }
216     }
217  }
218
219   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
220     SortedSet<String> walSet = wals.headSet(key);
221     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
222     for (String wal : walSet) {
223       this.replicationQueues.removeLog(id, wal);
224     }
225     walSet.clear();
226   }
227
228   /**
229    * Adds a normal source per registered peer cluster and tries to process all
230    * old region server wal queues
231    */
232   protected void init() throws IOException, ReplicationException {
233     for (String id : this.replicationPeers.getConnectedPeerIds()) {
234       addSource(id);
235       if (replicationForBulkLoadDataEnabled) {
236         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
237         // when a peer was added before replication for bulk loaded data was enabled.
238         this.replicationQueues.addPeerToHFileRefs(id);
239       }
240     }
241     AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
242     try {
243       this.executor.execute(adoptionWorker);
244     } catch (RejectedExecutionException ex) {
245       LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
246     }
247   }
248 
249   /**
250    * Add sources for the given peer cluster on this region server. For the newly added peer, we only
251    * need to enqueue the latest log of each wal group and do replication
252    * @param id the id of the peer cluster
253    * @return the source that was created
254    * @throws IOException
255    */
256   protected ReplicationSourceInterface addSource(String id) throws IOException,
257       ReplicationException {
258     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
259     ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
260     ReplicationSourceInterface src =
261         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
262           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
263     synchronized (this.walsById) {
264       this.sources.add(src);
265       Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
266       this.walsById.put(id, walsByGroup);
267       // Add the latest wal to that source's queue
268       synchronized (latestPaths) {
269         if (this.latestPaths.size() > 0) {
270           for (Path logPath : latestPaths) {
271             String name = logPath.getName();
272             String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
273             SortedSet<String> logs = new TreeSet<String>();
274             logs.add(name);
275             walsByGroup.put(walPrefix, logs);
276             try {
277               this.replicationQueues.addLog(id, name);
278             } catch (ReplicationException e) {
279               String message =
280                   "Cannot add log to queue when creating a new source, queueId=" + id
281                       + ", filename=" + name;
282               server.stop(message);
283               throw e;
284             }
285             src.enqueueLog(logPath);
286           }
287         }
288       }
289     }
290     src.startup();
291     return src;
292   }
293
294   /**
295    * Delete a complete queue of wals associated with a peer cluster
296    * @param peerId Id of the peer cluster queue of wals to delete
297    */
298   public void deleteSource(String peerId, boolean closeConnection) {
299     this.replicationQueues.removeQueue(peerId);
300     if (closeConnection) {
301       this.replicationPeers.peerDisconnected(peerId);
302     }
303   }
304
305   /**
306    * Terminate the replication on this region server
307    */
308   public void join() {
309     this.executor.shutdown();
310     for (ReplicationSourceInterface source : this.sources) {
311       source.terminate("Region server is closing");
312     }
313   }
314
315   /**
316    * Get a copy of the wals of the first source on this rs
317    * @return a sorted set of wal names
318    */
319   protected Map<String, Map<String, SortedSet<String>>> getWALs() {
320     return Collections.unmodifiableMap(walsById);
321   }
322
323   /**
324    * Get a copy of the wals of the recovered sources on this rs
325    * @return a sorted set of wal names
326    */
327   protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
328     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
329   }
330
331   /**
332    * Get a list of all the normal sources of this rs
333    * @return lis of all sources
334    */
335   public List<ReplicationSourceInterface> getSources() {
336     return this.sources;
337   }
338
339   /**
340    * Get a list of all the old sources of this rs
341    * @return list of all old sources
342    */
343   public List<ReplicationSourceInterface> getOldSources() {
344     return this.oldsources;
345   }
346
347   void preLogRoll(Path newLog) throws IOException {
348     recordLog(newLog);
349     String logName = newLog.getName();
350     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
351     synchronized (latestPaths) {
352       Iterator<Path> iterator = latestPaths.iterator();
353       while (iterator.hasNext()) {
354         Path path = iterator.next();
355         if (path.getName().contains(logPrefix)) {
356           iterator.remove();
357           break;
358         }
359       }
360       this.latestPaths.add(newLog);
361     }
362   }
363
364   /**
365    * Check and enqueue the given log to the correct source. If there's still no source for the
366    * group to which the given log belongs, create one
367    * @param logPath the log path to check and enqueue
368    * @throws IOException
369    */
370   private void recordLog(Path logPath) throws IOException {
371     String logName = logPath.getName();
372     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
373     // update replication queues on ZK
374     synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
375                                      // the to-be-removed peer
376       for (String id : replicationPeers.getConnectedPeerIds()) {
377         try {
378           this.replicationQueues.addLog(id, logName);
379         } catch (ReplicationException e) {
380           throw new IOException("Cannot add log to replication queue"
381               + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
382         }
383       }
384     }
385     // update walsById map
386     synchronized (walsById) {
387       for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
388         String peerId = entry.getKey();
389         Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
390         boolean existingPrefix = false;
391         for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
392           SortedSet<String> wals = walsEntry.getValue();
393           if (this.sources.isEmpty()) {
394             // If there's no slaves, don't need to keep the old wals since
395             // we only consider the last one when a new slave comes in
396             wals.clear();
397           }
398           if (logPrefix.equals(walsEntry.getKey())) {
399             wals.add(logName);
400             existingPrefix = true;
401           }
402         }
403         if (!existingPrefix) {
404           // The new log belongs to a new group, add it into this peer
405           LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
406           SortedSet<String> wals = new TreeSet<String>();
407           wals.add(logName);
408           walsByPrefix.put(logPrefix, wals);
409         }
410       }
411     }
412   }
413
414   void postLogRoll(Path newLog) throws IOException {
415     // This only updates the sources we own, not the recovered ones
416     for (ReplicationSourceInterface source : this.sources) {
417       source.enqueueLog(newLog);
418     }
419   }
420
421   /**
422    * Factory method to create a replication source
423    * @param conf the configuration to use
424    * @param fs the file system to use
425    * @param manager the manager to use
426    * @param server the server object for this region server
427    * @param peerId the id of the peer cluster
428    * @return the created source
429    * @throws IOException
430    */
431   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
432       final FileSystem fs, final ReplicationSourceManager manager,
433       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
434       final Server server, final String peerId, final UUID clusterId,
435       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
436       throws IOException {
437     RegionServerCoprocessorHost rsServerHost = null;
438     TableDescriptors tableDescriptors = null;
439     if (server instanceof HRegionServer) {
440       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
441       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
442     }
443     ReplicationSourceInterface src;
444     try {
445       @SuppressWarnings("rawtypes")
446       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
447           ReplicationSource.class.getCanonicalName()));
448       src = (ReplicationSourceInterface) c.newInstance();
449     } catch (Exception e) {
450       LOG.warn("Passed replication source implementation throws errors, " +
451           "defaulting to ReplicationSource", e);
452       src = new ReplicationSource();
453     }
454
455     ReplicationEndpoint replicationEndpoint = null;
456     try {
457       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
458       if (replicationEndpointImpl == null) {
459         // Default to HBase inter-cluster replication endpoint
460         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
461       }
462       @SuppressWarnings("rawtypes")
463       Class c = Class.forName(replicationEndpointImpl);
464       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
465       if(rsServerHost != null) {
466         ReplicationEndpoint newReplicationEndPoint = rsServerHost
467             .postCreateReplicationEndPoint(replicationEndpoint);
468         if(newReplicationEndPoint != null) {
469           // Override the newly created endpoint from the hook with configured end point
470           replicationEndpoint = newReplicationEndPoint;
471         }
472       }
473     } catch (Exception e) {
474       LOG.warn("Passed replication endpoint implementation throws errors"
475           + " while initializing ReplicationSource for peer: " + peerId, e);
476       throw new IOException(e);
477     }
478
479     MetricsSource metrics = new MetricsSource(peerId);
480     // init replication source
481     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
482       clusterId, replicationEndpoint, metrics);
483
484     // init replication endpoint
485     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
486       fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
487
488     return src;
489   }
490
491   /**
492    * Transfer all the queues of the specified to this region server.
493    * First it tries to grab a lock and if it works it will move the
494    * znodes and finally will delete the old znodes.
495    *
496    * It creates one old source for any type of source of the old rs.
497    * @param rsZnode
498    */
499   private void transferQueues(String rsZnode) {
500     NodeFailoverWorker transfer =
501         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
502             this.clusterId);
503     try {
504       this.executor.execute(transfer);
505     } catch (RejectedExecutionException ex) {
506       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
507     }
508   }
509
510   /**
511    * Clear the references to the specified old source
512    * @param src source to clear
513    */
514   public void closeRecoveredQueue(ReplicationSourceInterface src) {
515     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
516     this.oldsources.remove(src);
517     deleteSource(src.getPeerClusterZnode(), false);
518     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
519   }
520
521   /**
522    * Thie method first deletes all the recovered sources for the specified
523    * id, then deletes the normal source (deleting all related data in ZK).
524    * @param id The id of the peer cluster
525    */
526   public void removePeer(String id) {
527     LOG.info("Closing the following queue " + id + ", currently have "
528         + sources.size() + " and another "
529         + oldsources.size() + " that were recovered");
530     String terminateMessage = "Replication stream was removed by a user";
531     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
532     List<ReplicationSourceInterface> oldSourcesToDelete =
533         new ArrayList<ReplicationSourceInterface>();
534     // First close all the recovered sources for this peer
535     for (ReplicationSourceInterface src : oldsources) {
536       if (id.equals(src.getPeerClusterId())) {
537         oldSourcesToDelete.add(src);
538       }
539     }
540     for (ReplicationSourceInterface src : oldSourcesToDelete) {
541       src.terminate(terminateMessage);
542       closeRecoveredQueue((src));
543     }
544     LOG.info("Number of deleted recovered sources for " + id + ": "
545         + oldSourcesToDelete.size());
546     // Now look for the one on this cluster
547     synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
548                                           // for the to-be-removed peer
549       for (ReplicationSourceInterface src : this.sources) {
550         if (id.equals(src.getPeerClusterId())) {
551           srcToRemove.add(src);
552         }
553       }
554       if (srcToRemove.size() == 0) {
555         LOG.error("The queue we wanted to close is missing " + id);
556         return;
557       }
558       for (ReplicationSourceInterface toRemove : srcToRemove) {
559         toRemove.terminate(terminateMessage);
560         this.sources.remove(toRemove);
561       }
562       deleteSource(id, true);
563     }
564   }
565
566   @Override
567   public void regionServerRemoved(String regionserver) {
568     transferQueues(regionserver);
569   }
570
571   @Override
572   public void peerRemoved(String peerId) {
573     removePeer(peerId);
574     this.replicationQueues.removePeerFromHFileRefs(peerId);
575   }
576
577   @Override
578   public void peerListChanged(List<String> peerIds) {
579     for (String id : peerIds) {
580       try {
581         boolean added = this.replicationPeers.peerConnected(id);
582         if (added) {
583           addSource(id);
584           if (replicationForBulkLoadDataEnabled) {
585             this.replicationQueues.addPeerToHFileRefs(id);
586           }
587         }
588       } catch (Exception e) {
589         LOG.error("Error while adding a new peer", e);
590       }
591     }
592   }
593
594   /**
595    * Class responsible to setup new ReplicationSources to take care of the
596    * queues from dead region servers.
597    */
598   class NodeFailoverWorker extends Thread {
599
600     private String rsZnode;
601     private final ReplicationQueues rq;
602     private final ReplicationPeers rp;
603     private final UUID clusterId;
604
605     /**
606      *
607      * @param rsZnode
608      */
609     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
610         final ReplicationPeers replicationPeers, final UUID clusterId) {
611       super("Failover-for-"+rsZnode);
612       this.rsZnode = rsZnode;
613       this.rq = replicationQueues;
614       this.rp = replicationPeers;
615       this.clusterId = clusterId;
616     }
617
618     @Override
619     public void run() {
620       if (this.rq.isThisOurRegionServer(rsZnode)) {
621         return;
622       }
623       // Wait a bit before transferring the queues, we may be shutting down.
624       // This sleep may not be enough in some cases.
625       try {
626         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
627       } catch (InterruptedException e) {
628         LOG.warn("Interrupted while waiting before transferring a queue.");
629         Thread.currentThread().interrupt();
630       }
631       // We try to lock that rs' queue directory
632       if (server.isStopped()) {
633         LOG.info("Not transferring queue since we are shutting down");
634         return;
635       }
636       Map<String, Set<String>> newQueues = null;
637
638       newQueues = this.rq.claimQueues(rsZnode);
639
640       // Copying over the failed queue is completed.
641       if (newQueues.isEmpty()) {
642         // We either didn't get the lock or the failed region server didn't have any outstanding
643         // WALs to replicate, so we are done.
644         return;
645       }
646
647       for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
648         String peerId = entry.getKey();
649         Set<String> walsSet = entry.getValue();
650         try {
651           // there is not an actual peer defined corresponding to peerId for the failover.
652           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
653           String actualPeerId = replicationQueueInfo.getPeerId();
654           ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
655           ReplicationPeerConfig peerConfig = null;
656           try {
657             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
658           } catch (ReplicationException ex) {
659             LOG.warn("Received exception while getting replication peer config, skipping replay"
660                 + ex);
661           }
662           if (peer == null || peerConfig == null) {
663             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
664             continue;
665           }
666           // track sources in walsByIdRecoveredQueues
667           Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
668           walsByIdRecoveredQueues.put(peerId, walsByGroup);
669           for (String wal : walsSet) {
670             String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
671             SortedSet<String> wals = walsByGroup.get(walPrefix);
672             if (wals == null) {
673               wals = new TreeSet<String>();
674               walsByGroup.put(walPrefix, wals);
675             }
676             wals.add(wal);
677           }
678
679           // enqueue sources
680           ReplicationSourceInterface src =
681               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
682                 server, peerId, this.clusterId, peerConfig, peer);
683           if (!this.rp.getConnectedPeerIds().contains((src.getPeerClusterId()))) {
684             src.terminate("Recovered queue doesn't belong to any current peer");
685             break;
686           }
687           oldsources.add(src);
688           for (String wal : walsSet) {
689             src.enqueueLog(new Path(oldLogDir, wal));
690           }
691           src.startup();
692         } catch (IOException e) {
693           // TODO manage it
694           LOG.error("Failed creating a source", e);
695         }
696       }
697     }
698   }
699
700   class AdoptAbandonedQueuesWorker extends Thread{
701
702     public AdoptAbandonedQueuesWorker() {}
703
704     @Override
705     public void run() {
706       List<String> currentReplicators = replicationQueues.getListOfReplicators();
707       if (currentReplicators == null || currentReplicators.size() == 0) {
708         return;
709       }
710       List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
711       LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
712         + otherRegionServers);
713
714       // Look if there's anything to process after a restart
715       for (String rs : currentReplicators) {
716         if (!otherRegionServers.contains(rs)) {
717           transferQueues(rs);
718         }
719       }
720     }
721   }
722
723
724
725   /**
726    * Get the directory where wals are archived
727    * @return the directory where wals are archived
728    */
729   public Path getOldLogDir() {
730     return this.oldLogDir;
731   }
732
733   /**
734    * Get the directory where wals are stored by their RSs
735    * @return the directory where wals are stored by their RSs
736    */
737   public Path getLogDir() {
738     return this.logDir;
739   }
740
741   /**
742    * Get the handle on the local file system
743    * @return Handle on the local file system
744    */
745   public FileSystem getFs() {
746     return this.fs;
747   }
748
749   /**
750    * Get a string representation of all the sources' metrics
751    */
752   public String getStats() {
753     StringBuffer stats = new StringBuffer();
754     for (ReplicationSourceInterface source : sources) {
755       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
756       stats.append(source.getStats() + "\n");
757     }
758     for (ReplicationSourceInterface oldSource : oldsources) {
759       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
760       stats.append(oldSource.getStats()+ "\n");
761     }
762     return stats.toString();
763   }
764
765   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
766       throws ReplicationException {
767     for (ReplicationSourceInterface source : this.sources) {
768       source.addHFileRefs(tableName, family, files);
769     }
770   }
771
772   public void cleanUpHFileRefs(String peerId, List<String> files) {
773     this.replicationQueues.removeHFileRefs(peerId, files);
774   }
775 }