View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.SortedSet;
36  import java.util.TreeSet;
37  import java.util.UUID;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.CopyOnWriteArrayList;
40  import java.util.concurrent.LinkedBlockingQueue;
41  import java.util.concurrent.RejectedExecutionException;
42  import java.util.concurrent.ThreadPoolExecutor;
43  import java.util.concurrent.TimeUnit;
44
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.TableDescriptors;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.classification.InterfaceAudience;
55  import org.apache.hadoop.hbase.regionserver.HRegionServer;
56  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
57  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
58  import org.apache.hadoop.hbase.replication.ReplicationException;
59  import org.apache.hadoop.hbase.replication.ReplicationListener;
60  import org.apache.hadoop.hbase.replication.ReplicationPeer;
61  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
62  import org.apache.hadoop.hbase.replication.ReplicationPeers;
63  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
64  import org.apache.hadoop.hbase.replication.ReplicationQueues;
65  import org.apache.hadoop.hbase.replication.ReplicationTracker;
66  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
67
68  /**
69   * This class is responsible to manage all the replication
70   * sources. There are two classes of sources:
71   * <ul>
72   * <li> Normal sources are persistent and one per peer cluster</li>
73   * <li> Old sources are recovered from a failed region server and our
74   * only goal is to finish replicating the WAL queue it had up in ZK</li>
75   * </ul>
76   *
77   * When a region server dies, this class uses a watcher to get notified and it
78   * tries to grab a lock in order to transfer all the queues in a local
79   * old source.
80   *
81   * This class implements the ReplicationListener interface so that it can track changes in
82   * replication state.
83   */
84  @InterfaceAudience.Private
85  public class ReplicationSourceManager implements ReplicationListener {
86    private static final Log LOG =
87        LogFactory.getLog(ReplicationSourceManager.class);
88    // List of all the sources that read this RS's logs
89    private final List<ReplicationSourceInterface> sources;
90    // List of all the sources we got from died RSs
91    private final List<ReplicationSourceInterface> oldsources;
92    private final ReplicationQueues replicationQueues;
93    private final ReplicationTracker replicationTracker;
94    private final ReplicationPeers replicationPeers;
95    // UUID for this cluster
96    private final UUID clusterId;
97    // All about stopping
98    private final Server server;
99    // All logs we are currently tracking
100   // Index structure of the map is: peer_id->logPrefix/logGroup->logs
101   private final Map<String, Map<String, SortedSet<String>>> walsById;
102   // Logs for recovered sources we are currently tracking
103   private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
104   private final Configuration conf;
105   private final FileSystem fs;
106   // The paths to the latest log of each wal group, for new coming peers
107   private Set<Path> latestPaths;
108   // Path to the wals directories
109   private final Path logDir;
110   // Path to the wal archive
111   private final Path oldLogDir;
112   // The number of ms that we wait before moving znodes, HBASE-3596
113   private final long sleepBeforeFailover;
114   // Homemade executer service for replication
115   private final ThreadPoolExecutor executor;
116
117   private final Random rand;
118   private final boolean replicationForBulkLoadDataEnabled;
119 
120
121   /**
122    * Creates a replication manager and sets the watch on all the other registered region servers
123    * @param replicationQueues the interface for manipulating replication queues
124    * @param replicationPeers
125    * @param replicationTracker
126    * @param conf the configuration to use
127    * @param server the server for this region server
128    * @param fs the file system to use
129    * @param logDir the directory that contains all wal directories of live RSs
130    * @param oldLogDir the directory where old logs are archived
131    * @param clusterId
132    */
133   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
134       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
135       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
136       final Path oldLogDir, final UUID clusterId) {
137     //CopyOnWriteArrayList is thread-safe.
138     //Generally, reading is more than modifying.
139     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
140     this.replicationQueues = replicationQueues;
141     this.replicationPeers = replicationPeers;
142     this.replicationTracker = replicationTracker;
143     this.server = server;
144     this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
145     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
146     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
147     this.conf = conf;
148     this.fs = fs;
149     this.logDir = logDir;
150     this.oldLogDir = oldLogDir;
151     this.sleepBeforeFailover =
152         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
153     this.clusterId = clusterId;
154     this.replicationTracker.registerListener(this);
155     this.replicationPeers.getAllPeerIds();
156     // It's preferable to failover 1 RS at a time, but with good zk servers
157     // more could be processed at the same time.
158     int nbWorkers = conf.getInt("replication.executor.workers", 1);
159     // use a short 100ms sleep since this could be done inline with a RS startup
160     // even if we fail, other region servers can take care of it
161     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
162         100, TimeUnit.MILLISECONDS,
163         new LinkedBlockingQueue<Runnable>());
164     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
165     tfb.setNameFormat("ReplicationExecutor-%d");
166     tfb.setDaemon(true);
167     this.executor.setThreadFactory(tfb.build());
168     this.rand = new Random();
169     this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
170     replicationForBulkLoadDataEnabled =
171         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
172           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
173   }
174
175   /**
176    * Provide the id of the peer and a log key and this method will figure which
177    * wal it belongs to and will log, for this region server, the current
178    * position. It will also clean old logs from the queue.
179    * @param log Path to the log currently being replicated from
180    * replication status in zookeeper. It will also delete older entries.
181    * @param id id of the peer cluster
182    * @param position current location in the log
183    * @param queueRecovered indicates if this queue comes from another region server
184    * @param holdLogInZK if true then the log is retained in ZK
185    */
186   public void logPositionAndCleanOldLogs(Path log, String id, long position,
187       boolean queueRecovered, boolean holdLogInZK) {
188     String fileName = log.getName();
189     this.replicationQueues.setLogPosition(id, fileName, position);
190     if (holdLogInZK) {
191      return;
192     }
193     cleanOldLogs(fileName, id, queueRecovered);
194   }
195
196   /**
197    * Cleans a log file and all older files from ZK. Called when we are sure that a
198    * log file is closed and has no more entries.
199    * @param key Path to the log
200    * @param id id of the peer cluster
201    * @param queueRecovered Whether this is a recovered queue
202    */
203   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
204     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
205     if (queueRecovered) {
206       SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
207       if (wals != null && !wals.first().equals(key)) {
208         cleanOldLogs(wals, key, id);
209       }
210     } else {
211       synchronized (this.walsById) {
212         SortedSet<String> wals = walsById.get(id).get(logPrefix);
213         if (wals != null && !wals.first().equals(key)) {
214           cleanOldLogs(wals, key, id);
215         }
216       }
217     }
218  }
219
220   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
221     SortedSet<String> walSet = wals.headSet(key);
222     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
223     for (String wal : walSet) {
224       this.replicationQueues.removeLog(id, wal);
225     }
226     walSet.clear();
227   }
228
229   /**
230    * Adds a normal source per registered peer cluster and tries to process all
231    * old region server wal queues
232    */
233   protected void init() throws IOException, ReplicationException {
234     for (String id : this.replicationPeers.getConnectedPeerIds()) {
235       addSource(id);
236       if (replicationForBulkLoadDataEnabled) {
237         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
238         // when a peer was added before replication for bulk loaded data was enabled.
239         this.replicationQueues.addPeerToHFileRefs(id);
240       }
241     }
242     AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
243     try {
244       this.executor.execute(adoptionWorker);
245     } catch (RejectedExecutionException ex) {
246       LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
247     }
248   }
249
250   /**
251    * Add sources for the given peer cluster on this region server. For the newly added peer, we only
252    * need to enqueue the latest log of each wal group and do replication
253    * @param id the id of the peer cluster
254    * @return the source that was created
255    * @throws IOException
256    */
257   protected ReplicationSourceInterface addSource(String id) throws IOException,
258       ReplicationException {
259     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
260     ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
261     ReplicationSourceInterface src =
262         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
263           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
264     synchronized (this.walsById) {
265       this.sources.add(src);
266       Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
267       this.walsById.put(id, walsByGroup);
268       // Add the latest wal to that source's queue
269       synchronized (latestPaths) {
270         if (this.latestPaths.size() > 0) {
271           for (Path logPath : latestPaths) {
272             String name = logPath.getName();
273             String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
274             SortedSet<String> logs = new TreeSet<String>();
275             logs.add(name);
276             walsByGroup.put(walPrefix, logs);
277             try {
278               this.replicationQueues.addLog(id, name);
279             } catch (ReplicationException e) {
280               String message =
281                   "Cannot add log to queue when creating a new source, queueId=" + id
282                       + ", filename=" + name;
283               server.stop(message);
284               throw e;
285             }
286             src.enqueueLog(logPath);
287           }
288         }
289       }
290     }
291     src.startup();
292     return src;
293   }
294
295   /**
296    * Delete a complete queue of wals associated with a peer cluster
297    * @param peerId Id of the peer cluster queue of wals to delete
298    */
299   public void deleteSource(String peerId, boolean closeConnection) {
300     this.replicationQueues.removeQueue(peerId);
301     if (closeConnection) {
302       this.replicationPeers.peerDisconnected(peerId);
303     }
304   }
305
306   /**
307    * Terminate the replication on this region server
308    */
309   public void join() {
310     this.executor.shutdown();
311     for (ReplicationSourceInterface source : this.sources) {
312       source.terminate("Region server is closing");
313     }
314   }
315
316   /**
317    * Get a copy of the wals of the first source on this rs
318    * @return a sorted set of wal names
319    */
320   protected Map<String, Map<String, SortedSet<String>>> getWALs() {
321     return Collections.unmodifiableMap(walsById);
322   }
323
324   /**
325    * Get a copy of the wals of the recovered sources on this rs
326    * @return a sorted set of wal names
327    */
328   protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
329     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
330   }
331
332   /**
333    * Get a list of all the normal sources of this rs
334    * @return lis of all sources
335    */
336   public List<ReplicationSourceInterface> getSources() {
337     return this.sources;
338   }
339
340   /**
341    * Get a list of all the old sources of this rs
342    * @return list of all old sources
343    */
344   public List<ReplicationSourceInterface> getOldSources() {
345     return this.oldsources;
346   }
347
348   @VisibleForTesting
349   List<String> getAllQueues() {
350     return replicationQueues.getAllQueues();
351   }
352
353   void preLogRoll(Path newLog) throws IOException {
354     recordLog(newLog);
355     String logName = newLog.getName();
356     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
357     synchronized (latestPaths) {
358       Iterator<Path> iterator = latestPaths.iterator();
359       while (iterator.hasNext()) {
360         Path path = iterator.next();
361         if (path.getName().contains(logPrefix)) {
362           iterator.remove();
363           break;
364         }
365       }
366       this.latestPaths.add(newLog);
367     }
368   }
369
370   /**
371    * Check and enqueue the given log to the correct source. If there's still no source for the
372    * group to which the given log belongs, create one
373    * @param logPath the log path to check and enqueue
374    * @throws IOException
375    */
376   private void recordLog(Path logPath) throws IOException {
377     String logName = logPath.getName();
378     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
379     // update replication queues on ZK
380     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
381     synchronized (replicationPeers) {
382       for (String id : replicationPeers.getConnectedPeerIds()) {
383         try {
384           this.replicationQueues.addLog(id, logName);
385         } catch (ReplicationException e) {
386           throw new IOException("Cannot add log to replication queue"
387               + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
388         }
389       }
390     }
391     // update walsById map
392     synchronized (walsById) {
393       for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
394         String peerId = entry.getKey();
395         Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
396         boolean existingPrefix = false;
397         for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
398           SortedSet<String> wals = walsEntry.getValue();
399           if (this.sources.isEmpty()) {
400             // If there's no slaves, don't need to keep the old wals since
401             // we only consider the last one when a new slave comes in
402             wals.clear();
403           }
404           if (logPrefix.equals(walsEntry.getKey())) {
405             wals.add(logName);
406             existingPrefix = true;
407           }
408         }
409         if (!existingPrefix) {
410           // The new log belongs to a new group, add it into this peer
411           LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
412           SortedSet<String> wals = new TreeSet<String>();
413           wals.add(logName);
414           walsByPrefix.put(logPrefix, wals);
415         }
416       }
417     }
418   }
419
420   void postLogRoll(Path newLog) throws IOException {
421     // This only updates the sources we own, not the recovered ones
422     for (ReplicationSourceInterface source : this.sources) {
423       source.enqueueLog(newLog);
424     }
425   }
426
427   /**
428    * Factory method to create a replication source
429    * @param conf the configuration to use
430    * @param fs the file system to use
431    * @param manager the manager to use
432    * @param server the server object for this region server
433    * @param peerId the id of the peer cluster
434    * @return the created source
435    * @throws IOException
436    */
437   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
438       final FileSystem fs, final ReplicationSourceManager manager,
439       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
440       final Server server, final String peerId, final UUID clusterId,
441       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
442       throws IOException {
443     RegionServerCoprocessorHost rsServerHost = null;
444     TableDescriptors tableDescriptors = null;
445     if (server instanceof HRegionServer) {
446       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
447       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
448     }
449     ReplicationSourceInterface src;
450     try {
451       @SuppressWarnings("rawtypes")
452       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
453           ReplicationSource.class.getCanonicalName()));
454       src = (ReplicationSourceInterface) c.newInstance();
455     } catch (Exception e) {
456       LOG.warn("Passed replication source implementation throws errors, " +
457           "defaulting to ReplicationSource", e);
458       src = new ReplicationSource();
459     }
460
461     ReplicationEndpoint replicationEndpoint = null;
462     try {
463       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
464       if (replicationEndpointImpl == null) {
465         // Default to HBase inter-cluster replication endpoint
466         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
467       }
468       @SuppressWarnings("rawtypes")
469       Class c = Class.forName(replicationEndpointImpl);
470       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
471       if(rsServerHost != null) {
472         ReplicationEndpoint newReplicationEndPoint = rsServerHost
473             .postCreateReplicationEndPoint(replicationEndpoint);
474         if(newReplicationEndPoint != null) {
475           // Override the newly created endpoint from the hook with configured end point
476           replicationEndpoint = newReplicationEndPoint;
477         }
478       }
479     } catch (Exception e) {
480       LOG.warn("Passed replication endpoint implementation throws errors"
481           + " while initializing ReplicationSource for peer: " + peerId, e);
482       throw new IOException(e);
483     }
484
485     MetricsSource metrics = new MetricsSource(peerId);
486     // init replication source
487     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
488       clusterId, replicationEndpoint, metrics);
489
490     // init replication endpoint
491     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
492       fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
493
494     return src;
495   }
496
497   /**
498    * Transfer all the queues of the specified to this region server.
499    * First it tries to grab a lock and if it works it will move the
500    * znodes and finally will delete the old znodes.
501    *
502    * It creates one old source for any type of source of the old rs.
503    * @param rsZnode
504    */
505   private void transferQueues(String rsZnode) {
506     NodeFailoverWorker transfer =
507         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
508             this.clusterId);
509     try {
510       this.executor.execute(transfer);
511     } catch (RejectedExecutionException ex) {
512       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
513     }
514   }
515
516   /**
517    * Clear the references to the specified old source
518    * @param src source to clear
519    */
520   public void closeRecoveredQueue(ReplicationSourceInterface src) {
521     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
522     this.oldsources.remove(src);
523     deleteSource(src.getPeerClusterZnode(), false);
524     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
525   }
526
527   /**
528    * Thie method first deletes all the recovered sources for the specified
529    * id, then deletes the normal source (deleting all related data in ZK).
530    * @param id The id of the peer cluster
531    */
532   public void removePeer(String id) {
533     LOG.info("Closing the following queue " + id + ", currently have "
534         + sources.size() + " and another "
535         + oldsources.size() + " that were recovered");
536     String terminateMessage = "Replication stream was removed by a user";
537     List<ReplicationSourceInterface> oldSourcesToDelete =
538         new ArrayList<ReplicationSourceInterface>();
539     // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
540     // see NodeFailoverWorker.run
541     synchronized (oldsources) {
542       // First close all the recovered sources for this peer
543       for (ReplicationSourceInterface src : oldsources) {
544         if (id.equals(src.getPeerClusterId())) {
545           oldSourcesToDelete.add(src);
546         }
547       }
548       for (ReplicationSourceInterface src : oldSourcesToDelete) {
549         src.terminate(terminateMessage);
550         closeRecoveredQueue(src);
551       }
552     }
553     LOG.info("Number of deleted recovered sources for " + id + ": "
554         + oldSourcesToDelete.size());
555     // Now look for the one on this cluster
556     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
557     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
558     synchronized (this.replicationPeers) {
559       for (ReplicationSourceInterface src : this.sources) {
560         if (id.equals(src.getPeerClusterId())) {
561           srcToRemove.add(src);
562         }
563       }
564       if (srcToRemove.size() == 0) {
565         LOG.error("The queue we wanted to close is missing " + id);
566         return;
567       }
568       for (ReplicationSourceInterface toRemove : srcToRemove) {
569         toRemove.terminate(terminateMessage);
570         this.sources.remove(toRemove);
571       }
572       deleteSource(id, true);
573     }
574   }
575
576   @Override
577   public void regionServerRemoved(String regionserver) {
578     transferQueues(regionserver);
579   }
580
581   @Override
582   public void peerRemoved(String peerId) {
583     removePeer(peerId);
584     this.replicationQueues.removePeerFromHFileRefs(peerId);
585   }
586
587   @Override
588   public void peerListChanged(List<String> peerIds) {
589     for (String id : peerIds) {
590       try {
591         boolean added = this.replicationPeers.peerConnected(id);
592         if (added) {
593           addSource(id);
594           if (replicationForBulkLoadDataEnabled) {
595             this.replicationQueues.addPeerToHFileRefs(id);
596           }
597         }
598       } catch (Exception e) {
599         LOG.error("Error while adding a new peer", e);
600       }
601     }
602   }
603
604   /**
605    * Class responsible to setup new ReplicationSources to take care of the
606    * queues from dead region servers.
607    */
608   class NodeFailoverWorker extends Thread {
609
610     private String rsZnode;
611     private final ReplicationQueues rq;
612     private final ReplicationPeers rp;
613     private final UUID clusterId;
614
615     /**
616      * @param rsZnode
617      */
618     public NodeFailoverWorker(String rsZnode) {
619       this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
620     }
621
622     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
623         final ReplicationPeers replicationPeers, final UUID clusterId) {
624       super("Failover-for-"+rsZnode);
625       this.rsZnode = rsZnode;
626       this.rq = replicationQueues;
627       this.rp = replicationPeers;
628       this.clusterId = clusterId;
629     }
630
631     @Override
632     public void run() {
633       if (this.rq.isThisOurRegionServer(rsZnode)) {
634         return;
635       }
636       // Wait a bit before transferring the queues, we may be shutting down.
637       // This sleep may not be enough in some cases.
638       try {
639         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
640       } catch (InterruptedException e) {
641         LOG.warn("Interrupted while waiting before transferring a queue.");
642         Thread.currentThread().interrupt();
643       }
644       // We try to lock that rs' queue directory
645       if (server.isStopped()) {
646         LOG.info("Not transferring queue since we are shutting down");
647         return;
648       }
649       Map<String, Set<String>> newQueues = null;
650
651       newQueues = this.rq.claimQueues(rsZnode);
652
653       // Copying over the failed queue is completed.
654       if (newQueues.isEmpty()) {
655         // We either didn't get the lock or the failed region server didn't have any outstanding
656         // WALs to replicate, so we are done.
657         return;
658       }
659
660       for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
661         String peerId = entry.getKey();
662         Set<String> walsSet = entry.getValue();
663         try {
664           // there is not an actual peer defined corresponding to peerId for the failover.
665           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
666           String actualPeerId = replicationQueueInfo.getPeerId();
667           ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
668           ReplicationPeerConfig peerConfig = null;
669           try {
670             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
671           } catch (ReplicationException ex) {
672             LOG.warn("Received exception while getting replication peer config, skipping replay"
673                 + ex);
674           }
675           if (peer == null || peerConfig == null) {
676             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
677             replicationQueues.removeQueue(peerId);
678             continue;
679           }
680           // track sources in walsByIdRecoveredQueues
681           Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
682           walsByIdRecoveredQueues.put(peerId, walsByGroup);
683           for (String wal : walsSet) {
684             String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
685             SortedSet<String> wals = walsByGroup.get(walPrefix);
686             if (wals == null) {
687               wals = new TreeSet<String>();
688               walsByGroup.put(walPrefix, wals);
689             }
690             wals.add(wal);
691           }
692
693           // enqueue sources
694           ReplicationSourceInterface src =
695               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
696                 server, peerId, this.clusterId, peerConfig, peer);
697           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
698           // see removePeer
699           synchronized (oldsources) {
700             if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) {
701               src.terminate("Recovered queue doesn't belong to any current peer");
702               closeRecoveredQueue(src);
703               continue;
704             }
705             oldsources.add(src);
706             for (String wal : walsSet) {
707               src.enqueueLog(new Path(oldLogDir, wal));
708             }
709             src.startup();
710           }
711         } catch (IOException e) {
712           // TODO manage it
713           LOG.error("Failed creating a source", e);
714         }
715       }
716     }
717   }
718
719   class AdoptAbandonedQueuesWorker extends Thread{
720
721     public AdoptAbandonedQueuesWorker() {}
722
723     @Override
724     public void run() {
725       List<String> currentReplicators = replicationQueues.getListOfReplicators();
726       if (currentReplicators == null || currentReplicators.size() == 0) {
727         return;
728       }
729       List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
730       LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
731         + otherRegionServers);
732
733       // Look if there's anything to process after a restart
734       for (String rs : currentReplicators) {
735         if (!otherRegionServers.contains(rs)) {
736           transferQueues(rs);
737         }
738       }
739     }
740   }
741
742
743
744   /**
745    * Get the directory where wals are archived
746    * @return the directory where wals are archived
747    */
748   public Path getOldLogDir() {
749     return this.oldLogDir;
750   }
751
752   /**
753    * Get the directory where wals are stored by their RSs
754    * @return the directory where wals are stored by their RSs
755    */
756   public Path getLogDir() {
757     return this.logDir;
758   }
759
760   /**
761    * Get the handle on the local file system
762    * @return Handle on the local file system
763    */
764   public FileSystem getFs() {
765     return this.fs;
766   }
767
768   /**
769    * Get a string representation of all the sources' metrics
770    */
771   public String getStats() {
772     StringBuffer stats = new StringBuffer();
773     for (ReplicationSourceInterface source : sources) {
774       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
775       stats.append(source.getStats() + "\n");
776     }
777     for (ReplicationSourceInterface oldSource : oldsources) {
778       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
779       stats.append(oldSource.getStats()+ "\n");
780     }
781     return stats.toString();
782   }
783
784   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
785       throws ReplicationException {
786     for (ReplicationSourceInterface source : this.sources) {
787       source.addHFileRefs(tableName, family, files);
788     }
789   }
790
791   public void cleanUpHFileRefs(String peerId, List<String> files) {
792     this.replicationQueues.removeHFileRefs(peerId, files);
793   }
794 }