View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Random;
35  import java.util.Set;
36  import java.util.SortedSet;
37  import java.util.TreeSet;
38  import java.util.UUID;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.CopyOnWriteArrayList;
41  import java.util.concurrent.LinkedBlockingQueue;
42  import java.util.concurrent.RejectedExecutionException;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.Server;
54  import org.apache.hadoop.hbase.TableDescriptors;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.classification.InterfaceAudience;
57  import org.apache.hadoop.hbase.client.Connection;
58  import org.apache.hadoop.hbase.client.ConnectionFactory;
59  import org.apache.hadoop.hbase.regionserver.HRegionServer;
60  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
61  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
62  import org.apache.hadoop.hbase.replication.ReplicationException;
63  import org.apache.hadoop.hbase.replication.ReplicationListener;
64  import org.apache.hadoop.hbase.replication.ReplicationPeer;
65  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
66  import org.apache.hadoop.hbase.replication.ReplicationPeers;
67  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
68  import org.apache.hadoop.hbase.replication.ReplicationQueues;
69  import org.apache.hadoop.hbase.replication.ReplicationTracker;
70  import org.apache.hadoop.hbase.util.Pair;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
73
74  /**
75   * This class is responsible to manage all the replication
76   * sources. There are two classes of sources:
77   * <ul>
78   * <li> Normal sources are persistent and one per peer cluster</li>
79   * <li> Old sources are recovered from a failed region server and our
80   * only goal is to finish replicating the WAL queue it had up in ZK</li>
81   * </ul>
82   *
83   * When a region server dies, this class uses a watcher to get notified and it
84   * tries to grab a lock in order to transfer all the queues in a local
85   * old source.
86   *
87   * This class implements the ReplicationListener interface so that it can track changes in
88   * replication state.
89   */
90  @InterfaceAudience.Private
91  public class ReplicationSourceManager implements ReplicationListener {
92    private static final Log LOG =
93        LogFactory.getLog(ReplicationSourceManager.class);
94    // List of all the sources that read this RS's logs
95    private final List<ReplicationSourceInterface> sources;
96    // List of all the sources we got from died RSs
97    private final List<ReplicationSourceInterface> oldsources;
98    private final ReplicationQueues replicationQueues;
99    private final ReplicationTracker replicationTracker;
100   private final ReplicationPeers replicationPeers;
101   // UUID for this cluster
102   private final UUID clusterId;
103   // All about stopping
104   private final Server server;
105   // All logs we are currently tracking
106   // Index structure of the map is: peer_id->logPrefix/logGroup->logs
107   private final Map<String, Map<String, SortedSet<String>>> walsById;
108   // Logs for recovered sources we are currently tracking
109   private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
110   private final Configuration conf;
111   private final FileSystem fs;
112   // The paths to the latest log of each wal group, for new coming peers
113   private Set<Path> latestPaths;
114   // Path to the wals directories
115   private final Path logDir;
116   // Path to the wal archive
117   private final Path oldLogDir;
118   // The number of ms that we wait before moving znodes, HBASE-3596
119   private final long sleepBeforeFailover;
120   // Homemade executer service for replication
121   private final ThreadPoolExecutor executor;
122
123   private final Random rand;
124   private final boolean replicationForBulkLoadDataEnabled;
125
126   private Connection connection;
127   private long replicationWaitTime;
128
129   /**
130    * Creates a replication manager and sets the watch on all the other registered region servers
131    * @param replicationQueues the interface for manipulating replication queues
132    * @param replicationPeers
133    * @param replicationTracker
134    * @param conf the configuration to use
135    * @param server the server for this region server
136    * @param fs the file system to use
137    * @param logDir the directory that contains all wal directories of live RSs
138    * @param oldLogDir the directory where old logs are archived
139    * @param clusterId
140    */
141   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
142       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
143       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
144       final Path oldLogDir, final UUID clusterId) throws IOException {
145     //CopyOnWriteArrayList is thread-safe.
146     //Generally, reading is more than modifying.
147     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
148     this.replicationQueues = replicationQueues;
149     this.replicationPeers = replicationPeers;
150     this.replicationTracker = replicationTracker;
151     this.server = server;
152     this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
153     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
154     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
155     this.conf = conf;
156     this.fs = fs;
157     this.logDir = logDir;
158     this.oldLogDir = oldLogDir;
159     this.sleepBeforeFailover =
160         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
161     this.clusterId = clusterId;
162     this.replicationTracker.registerListener(this);
163     this.replicationPeers.getAllPeerIds();
164     // It's preferable to failover 1 RS at a time, but with good zk servers
165     // more could be processed at the same time.
166     int nbWorkers = conf.getInt("replication.executor.workers", 1);
167     // use a short 100ms sleep since this could be done inline with a RS startup
168     // even if we fail, other region servers can take care of it
169     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
170         100, TimeUnit.MILLISECONDS,
171         new LinkedBlockingQueue<Runnable>());
172     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
173     tfb.setNameFormat("ReplicationExecutor-%d");
174     tfb.setDaemon(true);
175     this.executor.setThreadFactory(tfb.build());
176     this.rand = new Random();
177     this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
178     replicationForBulkLoadDataEnabled =
179         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
180           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
181     this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
182           HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
183     connection = ConnectionFactory.createConnection(conf);
184   }
185
186   /**
187    * Provide the id of the peer and a log key and this method will figure which
188    * wal it belongs to and will log, for this region server, the current
189    * position. It will also clean old logs from the queue.
190    * @param log Path to the log currently being replicated from
191    * replication status in zookeeper. It will also delete older entries.
192    * @param id id of the peer cluster
193    * @param position current location in the log
194    * @param queueRecovered indicates if this queue comes from another region server
195    * @param holdLogInZK if true then the log is retained in ZK
196    */
197   public void logPositionAndCleanOldLogs(Path log, String id, long position,
198       boolean queueRecovered, boolean holdLogInZK) {
199     String fileName = log.getName();
200     this.replicationQueues.setLogPosition(id, fileName, position);
201     if (holdLogInZK) {
202      return;
203     }
204     cleanOldLogs(fileName, id, queueRecovered);
205   }
206
207   /**
208    * Cleans a log file and all older files from ZK. Called when we are sure that a
209    * log file is closed and has no more entries.
210    * @param key Path to the log
211    * @param id id of the peer cluster
212    * @param queueRecovered Whether this is a recovered queue
213    */
214   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
215     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
216     if (queueRecovered) {
217       SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
218       if (wals != null && !wals.first().equals(key)) {
219         cleanOldLogs(wals, key, id);
220       }
221     } else {
222       synchronized (this.walsById) {
223         SortedSet<String> wals = walsById.get(id).get(logPrefix);
224         if (wals != null && !wals.first().equals(key)) {
225           cleanOldLogs(wals, key, id);
226         }
227       }
228     }
229  }
230
231   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
232     SortedSet<String> walSet = wals.headSet(key);
233     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
234     for (String wal : walSet) {
235       this.replicationQueues.removeLog(id, wal);
236     }
237     walSet.clear();
238   }
239
240   /**
241    * Adds a normal source per registered peer cluster and tries to process all
242    * old region server wal queues
243    */
244   protected void init() throws IOException, ReplicationException {
245     for (String id : this.replicationPeers.getConnectedPeerIds()) {
246       addSource(id);
247       if (replicationForBulkLoadDataEnabled) {
248         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
249         // when a peer was added before replication for bulk loaded data was enabled.
250         this.replicationQueues.addPeerToHFileRefs(id);
251       }
252     }
253     AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
254     try {
255       this.executor.execute(adoptionWorker);
256     } catch (RejectedExecutionException ex) {
257       LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
258     }
259   }
260
261   /**
262    * Add sources for the given peer cluster on this region server. For the newly added peer, we only
263    * need to enqueue the latest log of each wal group and do replication
264    * @param id the id of the peer cluster
265    * @return the source that was created
266    * @throws IOException
267    */
268   protected ReplicationSourceInterface addSource(String id) throws IOException,
269       ReplicationException {
270     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
271     ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
272     ReplicationSourceInterface src =
273         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
274           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
275     synchronized (this.walsById) {
276       this.sources.add(src);
277       Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
278       this.walsById.put(id, walsByGroup);
279       // Add the latest wal to that source's queue
280       synchronized (latestPaths) {
281         if (this.latestPaths.size() > 0) {
282           for (Path logPath : latestPaths) {
283             String name = logPath.getName();
284             String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
285             SortedSet<String> logs = new TreeSet<String>();
286             logs.add(name);
287             walsByGroup.put(walPrefix, logs);
288             try {
289               this.replicationQueues.addLog(id, name);
290             } catch (ReplicationException e) {
291               String message =
292                   "Cannot add log to queue when creating a new source, queueId=" + id
293                       + ", filename=" + name;
294               server.stop(message);
295               throw e;
296             }
297             src.enqueueLog(logPath);
298           }
299         }
300       }
301     }
302     src.startup();
303     return src;
304   }
305
306   /**
307    * Delete a complete queue of wals associated with a peer cluster
308    * @param peerId Id of the peer cluster queue of wals to delete
309    */
310   public void deleteSource(String peerId, boolean closeConnection) {
311     this.replicationQueues.removeQueue(peerId);
312     if (closeConnection) {
313       this.replicationPeers.peerDisconnected(peerId);
314     }
315   }
316
317   /**
318    * Terminate the replication on this region server
319    */
320   public void join() {
321     this.executor.shutdown();
322     for (ReplicationSourceInterface source : this.sources) {
323       source.terminate("Region server is closing");
324     }
325   }
326
327   /**
328    * Get a copy of the wals of the first source on this rs
329    * @return a sorted set of wal names
330    */
331   protected Map<String, Map<String, SortedSet<String>>> getWALs() {
332     return Collections.unmodifiableMap(walsById);
333   }
334
335   /**
336    * Get a copy of the wals of the recovered sources on this rs
337    * @return a sorted set of wal names
338    */
339   protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
340     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
341   }
342
343   /**
344    * Get a list of all the normal sources of this rs
345    * @return lis of all sources
346    */
347   public List<ReplicationSourceInterface> getSources() {
348     return this.sources;
349   }
350
351   /**
352    * Get a list of all the old sources of this rs
353    * @return list of all old sources
354    */
355   public List<ReplicationSourceInterface> getOldSources() {
356     return this.oldsources;
357   }
358
359   @VisibleForTesting
360   List<String> getAllQueues() {
361     return replicationQueues.getAllQueues();
362   }
363
364   void preLogRoll(Path newLog) throws IOException {
365     recordLog(newLog);
366     String logName = newLog.getName();
367     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
368     synchronized (latestPaths) {
369       Iterator<Path> iterator = latestPaths.iterator();
370       while (iterator.hasNext()) {
371         Path path = iterator.next();
372         if (path.getName().contains(logPrefix)) {
373           iterator.remove();
374           break;
375         }
376       }
377       this.latestPaths.add(newLog);
378     }
379   }
380
381   /**
382    * Check and enqueue the given log to the correct source. If there's still no source for the
383    * group to which the given log belongs, create one
384    * @param logPath the log path to check and enqueue
385    * @throws IOException
386    */
387   private void recordLog(Path logPath) throws IOException {
388     String logName = logPath.getName();
389     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
390     // update replication queues on ZK
391     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
392     synchronized (replicationPeers) {
393       for (String id : replicationPeers.getConnectedPeerIds()) {
394         try {
395           this.replicationQueues.addLog(id, logName);
396         } catch (ReplicationException e) {
397           throw new IOException("Cannot add log to replication queue"
398               + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
399         }
400       }
401     }
402     // update walsById map
403     synchronized (walsById) {
404       for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
405         String peerId = entry.getKey();
406         Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
407         boolean existingPrefix = false;
408         for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
409           SortedSet<String> wals = walsEntry.getValue();
410           if (this.sources.isEmpty()) {
411             // If there's no slaves, don't need to keep the old wals since
412             // we only consider the last one when a new slave comes in
413             wals.clear();
414           }
415           if (logPrefix.equals(walsEntry.getKey())) {
416             wals.add(logName);
417             existingPrefix = true;
418           }
419         }
420         if (!existingPrefix) {
421           // The new log belongs to a new group, add it into this peer
422           LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
423           SortedSet<String> wals = new TreeSet<String>();
424           wals.add(logName);
425           walsByPrefix.put(logPrefix, wals);
426         }
427       }
428     }
429   }
430
431   void postLogRoll(Path newLog) throws IOException {
432     // This only updates the sources we own, not the recovered ones
433     for (ReplicationSourceInterface source : this.sources) {
434       source.enqueueLog(newLog);
435     }
436   }
437
438   /**
439    * Factory method to create a replication source
440    * @param conf the configuration to use
441    * @param fs the file system to use
442    * @param manager the manager to use
443    * @param server the server object for this region server
444    * @param peerId the id of the peer cluster
445    * @return the created source
446    * @throws IOException
447    */
448   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
449       final FileSystem fs, final ReplicationSourceManager manager,
450       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
451       final Server server, final String peerId, final UUID clusterId,
452       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
453       throws IOException {
454     RegionServerCoprocessorHost rsServerHost = null;
455     TableDescriptors tableDescriptors = null;
456     if (server instanceof HRegionServer) {
457       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
458       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
459     }
460     ReplicationSourceInterface src;
461     try {
462       @SuppressWarnings("rawtypes")
463       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
464           ReplicationSource.class.getCanonicalName()));
465       src = (ReplicationSourceInterface) c.newInstance();
466     } catch (Exception e) {
467       LOG.warn("Passed replication source implementation throws errors, " +
468           "defaulting to ReplicationSource", e);
469       src = new ReplicationSource();
470     }
471
472     ReplicationEndpoint replicationEndpoint = null;
473     try {
474       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
475       if (replicationEndpointImpl == null) {
476         // Default to HBase inter-cluster replication endpoint
477         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
478       }
479       @SuppressWarnings("rawtypes")
480       Class c = Class.forName(replicationEndpointImpl);
481       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
482       if(rsServerHost != null) {
483         ReplicationEndpoint newReplicationEndPoint = rsServerHost
484             .postCreateReplicationEndPoint(replicationEndpoint);
485         if(newReplicationEndPoint != null) {
486           // Override the newly created endpoint from the hook with configured end point
487           replicationEndpoint = newReplicationEndPoint;
488         }
489       }
490     } catch (Exception e) {
491       LOG.warn("Passed replication endpoint implementation throws errors"
492           + " while initializing ReplicationSource for peer: " + peerId, e);
493       throw new IOException(e);
494     }
495
496     MetricsSource metrics = new MetricsSource(peerId);
497     // init replication source
498     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
499       clusterId, replicationEndpoint, metrics);
500
501     // init replication endpoint
502     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
503       fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
504
505     return src;
506   }
507
508   /**
509    * Transfer all the queues of the specified to this region server.
510    * First it tries to grab a lock and if it works it will move the
511    * znodes and finally will delete the old znodes.
512    *
513    * It creates one old source for any type of source of the old rs.
514    * @param rsZnode
515    */
516   private void transferQueues(String rsZnode) {
517     NodeFailoverWorker transfer =
518         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
519             this.clusterId);
520     try {
521       this.executor.execute(transfer);
522     } catch (RejectedExecutionException ex) {
523       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
524     }
525   }
526
527   /**
528    * Clear the references to the specified old source
529    * @param src source to clear
530    */
531   public void closeRecoveredQueue(ReplicationSourceInterface src) {
532     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
533     this.oldsources.remove(src);
534     deleteSource(src.getPeerClusterZnode(), false);
535     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
536   }
537
538   /**
539    * Thie method first deletes all the recovered sources for the specified
540    * id, then deletes the normal source (deleting all related data in ZK).
541    * @param id The id of the peer cluster
542    */
543   public void removePeer(String id) {
544     LOG.info("Closing the following queue " + id + ", currently have "
545         + sources.size() + " and another "
546         + oldsources.size() + " that were recovered");
547     String terminateMessage = "Replication stream was removed by a user";
548     List<ReplicationSourceInterface> oldSourcesToDelete =
549         new ArrayList<ReplicationSourceInterface>();
550     // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
551     // see NodeFailoverWorker.run
552     synchronized (oldsources) {
553       // First close all the recovered sources for this peer
554       for (ReplicationSourceInterface src : oldsources) {
555         if (id.equals(src.getPeerClusterId())) {
556           oldSourcesToDelete.add(src);
557         }
558       }
559       for (ReplicationSourceInterface src : oldSourcesToDelete) {
560         src.terminate(terminateMessage);
561         closeRecoveredQueue(src);
562       }
563     }
564     LOG.info("Number of deleted recovered sources for " + id + ": "
565         + oldSourcesToDelete.size());
566     // Now look for the one on this cluster
567     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
568     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
569     synchronized (this.replicationPeers) {
570       for (ReplicationSourceInterface src : this.sources) {
571         if (id.equals(src.getPeerClusterId())) {
572           srcToRemove.add(src);
573         }
574       }
575       if (srcToRemove.isEmpty()) {
576         LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " +
577             "This could mean that ReplicationSourceInterface initialization failed for this peer " +
578             "and that replication on this peer may not be caught up. peerId=" + id);
579       }
580       for (ReplicationSourceInterface toRemove : srcToRemove) {
581         toRemove.terminate(terminateMessage);
582         this.sources.remove(toRemove);
583       }
584       deleteSource(id, true);
585     }
586   }
587
588   @Override
589   public void regionServerRemoved(String regionserver) {
590     transferQueues(regionserver);
591   }
592
593   @Override
594   public void peerRemoved(String peerId) {
595     removePeer(peerId);
596     this.replicationQueues.removePeerFromHFileRefs(peerId);
597   }
598
599   @Override
600   public void peerListChanged(List<String> peerIds) {
601     for (String id : peerIds) {
602       try {
603         boolean added = this.replicationPeers.peerConnected(id);
604         if (added) {
605           addSource(id);
606           if (replicationForBulkLoadDataEnabled) {
607             this.replicationQueues.addPeerToHFileRefs(id);
608           }
609         }
610       } catch (Exception e) {
611         LOG.error("Error while adding a new peer", e);
612       }
613     }
614   }
615
616   /**
617    * Class responsible to setup new ReplicationSources to take care of the
618    * queues from dead region servers.
619    */
620   class NodeFailoverWorker extends Thread {
621
622     private String rsZnode;
623     private final ReplicationQueues rq;
624     private final ReplicationPeers rp;
625     private final UUID clusterId;
626
627     /**
628      * @param rsZnode
629      */
630     public NodeFailoverWorker(String rsZnode) {
631       this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
632     }
633
634     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
635         final ReplicationPeers replicationPeers, final UUID clusterId) {
636       super("Failover-for-"+rsZnode);
637       this.rsZnode = rsZnode;
638       this.rq = replicationQueues;
639       this.rp = replicationPeers;
640       this.clusterId = clusterId;
641     }
642
643     @Override
644     public void run() {
645       if (this.rq.isThisOurRegionServer(rsZnode)) {
646         return;
647       }
648       // Wait a bit before transferring the queues, we may be shutting down.
649       // This sleep may not be enough in some cases.
650       try {
651         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
652       } catch (InterruptedException e) {
653         LOG.warn("Interrupted while waiting before transferring a queue.");
654         Thread.currentThread().interrupt();
655       }
656       // We try to lock that rs' queue directory
657       if (server.isStopped()) {
658         LOG.info("Not transferring queue since we are shutting down");
659         return;
660       }
661       Map<String, Set<String>> newQueues = new HashMap<>();
662       List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
663       while (peers != null && !peers.isEmpty()) {
664         Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
665             peers.get(rand.nextInt(peers.size())));
666         long sleep = sleepBeforeFailover/2;
667         if (peer != null) {
668           newQueues.put(peer.getFirst(), peer.getSecond());
669           sleep = sleepBeforeFailover;
670         }
671         try {
672           Thread.sleep(sleep);
673         } catch (InterruptedException e) {
674           LOG.warn("Interrupted while waiting before transferring a queue.");
675           Thread.currentThread().interrupt();
676         }
677         peers = rq.getUnClaimedQueueIds(rsZnode);
678       }
679       if (peers != null) {
680         rq.removeReplicatorIfQueueIsEmpty(rsZnode);
681       }
682       // Copying over the failed queue is completed.
683       if (newQueues.isEmpty()) {
684         // We either didn't get the lock or the failed region server didn't have any outstanding
685         // WALs to replicate, so we are done.
686         return;
687       }
688
689       for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
690         String peerId = entry.getKey();
691         Set<String> walsSet = entry.getValue();
692         try {
693           // there is not an actual peer defined corresponding to peerId for the failover.
694           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
695           String actualPeerId = replicationQueueInfo.getPeerId();
696           ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
697           ReplicationPeerConfig peerConfig = null;
698           try {
699             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
700           } catch (ReplicationException ex) {
701             LOG.warn("Received exception while getting replication peer config, skipping replay"
702                 + ex);
703           }
704           if (peer == null || peerConfig == null) {
705             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
706             replicationQueues.removeQueue(peerId);
707             continue;
708           }
709           // track sources in walsByIdRecoveredQueues
710           Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
711           walsByIdRecoveredQueues.put(peerId, walsByGroup);
712           for (String wal : walsSet) {
713             String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
714             SortedSet<String> wals = walsByGroup.get(walPrefix);
715             if (wals == null) {
716               wals = new TreeSet<String>();
717               walsByGroup.put(walPrefix, wals);
718             }
719             wals.add(wal);
720           }
721
722           // enqueue sources
723           ReplicationSourceInterface src =
724               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
725                 server, peerId, this.clusterId, peerConfig, peer);
726           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
727           // see removePeer
728           synchronized (oldsources) {
729             if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) {
730               src.terminate("Recovered queue doesn't belong to any current peer");
731               closeRecoveredQueue(src);
732               continue;
733             }
734             oldsources.add(src);
735             for (String wal : walsSet) {
736               src.enqueueLog(new Path(oldLogDir, wal));
737             }
738             src.startup();
739           }
740         } catch (IOException e) {
741           // TODO manage it
742           LOG.error("Failed creating a source", e);
743         }
744       }
745     }
746   }
747
748   class AdoptAbandonedQueuesWorker extends Thread{
749
750     public AdoptAbandonedQueuesWorker() {}
751
752     @Override
753     public void run() {
754       List<String> currentReplicators = replicationQueues.getListOfReplicators();
755       if (currentReplicators == null || currentReplicators.size() == 0) {
756         return;
757       }
758       List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
759       LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
760         + otherRegionServers);
761
762       // Look if there's anything to process after a restart
763       for (String rs : currentReplicators) {
764         if (!otherRegionServers.contains(rs)) {
765           transferQueues(rs);
766         }
767       }
768     }
769   }
770
771   /**
772    * Get the directory where wals are archived
773    * @return the directory where wals are archived
774    */
775   public Path getOldLogDir() {
776     return this.oldLogDir;
777   }
778
779   /**
780    * Get the directory where wals are stored by their RSs
781    * @return the directory where wals are stored by their RSs
782    */
783   public Path getLogDir() {
784     return this.logDir;
785   }
786
787   /**
788    * Get the handle on the local file system
789    * @return Handle on the local file system
790    */
791   public FileSystem getFs() {
792     return this.fs;
793   }
794
795   public Connection getConnection() {
796     return this.connection;
797   }
798
799   /**
800    * Get the ReplicationPeers used by this ReplicationSourceManager
801    * @return the ReplicationPeers used by this ReplicationSourceManager
802    */
803   public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
804
805   /**
806    * Get a string representation of all the sources' metrics
807    */
808   public String getStats() {
809     StringBuffer stats = new StringBuffer();
810     for (ReplicationSourceInterface source : sources) {
811       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
812       stats.append(source.getStats() + "\n");
813     }
814     for (ReplicationSourceInterface oldSource : oldsources) {
815       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
816       stats.append(oldSource.getStats()+ "\n");
817     }
818     return stats.toString();
819   }
820
821   public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
822       throws ReplicationException {
823     for (ReplicationSourceInterface source : this.sources) {
824       source.addHFileRefs(tableName, family, files);
825     }
826   }
827
828   public void cleanUpHFileRefs(String peerId, List<String> files) {
829     this.replicationQueues.removeHFileRefs(peerId, files);
830   }
831
832   /**
833    * Whether an entry can be pushed to the peer or not right now.
834    * If we enable serial replication, we can not push the entry until all entries in its region
835    * whose sequence numbers are smaller than this entry have been pushed.
836    * For each ReplicationSource, we need only check the first entry in each region, as long as it
837    * can be pushed, we can push all in this ReplicationSource.
838    * This method will be blocked until we can push.
839    * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
840    *         prevent saving positions in the region of no barrier.
841    */
842   void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
843       throws IOException, InterruptedException {
844
845     /**
846      * There are barriers for this region and position for this peer. N barriers form N intervals,
847      * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
848      * the first barrier and the last interval is start from the last barrier.
849      *
850      * There are several conditions that we can push now, otherwise we should block:
851      * 1) "Serial replication" is not enabled, we can push all logs just like before. This case
852      *    should not call this method.
853      * 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
854      *    It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
855      *    order of logs that is written before altering.
856      * 3) This entry is in the first interval of barriers. We can push them because it is the
857      *    start of a region. Splitting/merging regions are also ok because the first section of
858      *    daughter region is in same region of parents and the order in one RS is guaranteed.
859      * 4) If the entry's seq id and the position are in same section, or the pos is the last
860      *    number of previous section. Because when open a region we put a barrier the number
861      *    is the last log's id + 1.
862      * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
863      *    after save replication meta and before save zk offset.
864      */
865     List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
866     if (barriers.isEmpty() || seq <= barriers.get(0)) {
867       // Case 2
868       return;
869     }
870     int interval = Collections.binarySearch(barriers, seq);
871     if (interval < 0) {
872       interval = -interval - 1;// get the insert position if negative
873     }
874     if (interval == 1) {
875       // Case 3
876       return;
877     }
878
879     while (true) {
880       long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
881       if (seq <= pos) {
882         // Case 5
883       }
884       if (pos >= 0) {
885         // Case 4
886         int posInterval = Collections.binarySearch(barriers, pos);
887         if (posInterval < 0) {
888           posInterval = -posInterval - 1;// get the insert position if negative
889         }
890         if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
891           return;
892         }
893       }
894
895       LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
896           + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
897           + " barriers=" + Arrays.toString(barriers.toArray()));
898       Thread.sleep(replicationWaitTime);
899     }
900   }
901
902 }