001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableSet;
029import java.util.Set;
030import java.util.SortedSet;
031import java.util.TreeSet;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.Future;
036import java.util.concurrent.LinkedBlockingQueue;
037import java.util.concurrent.RejectedExecutionException;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.Server;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.replication.ReplicationException;
052import org.apache.hadoop.hbase.replication.ReplicationListener;
053import org.apache.hadoop.hbase.replication.ReplicationPeer;
054import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
055import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
056import org.apache.hadoop.hbase.replication.ReplicationPeers;
057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.ReplicationTracker;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.apache.zookeeper.KeeperException;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
069
070/**
071 * This class is responsible to manage all the replication sources. There are two classes of
072 * sources:
073 * <ul>
074 * <li>Normal sources are persistent and one per peer cluster</li>
075 * <li>Old sources are recovered from a failed region server and our only goal is to finish
076 * replicating the WAL queue it had</li>
077 * </ul>
078 * <p>
079 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
080 * in order to transfer all the queues in a local old source.
081 * <p>
082 * Synchronization specification:
083 * <ul>
084 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
085 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
086 * operations.</li>
087 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
088 * {@link #addPeer(String)}, {@link #removePeer(String)},
089 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
090 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
091 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
092 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
093 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
094 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
095 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
096 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
097 * {@link #preLogRoll(Path)}.</li>
098 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
099 * modify it, {@link #removePeer(String)} ,
100 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
101 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
102 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
103 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
104 * {@link ReplicationSourceInterface} firstly, then remove the wals from
105 * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
106 * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
107 * {@link ReplicationSourceInterface}. So there is no race here. For
108 * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
109 * is already synchronized on {@link #oldsources}. So no need synchronized on
110 * {@link #walsByIdRecoveredQueues}.</li>
111 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
112 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
113 * to-be-removed peer.</li>
114 * </ul>
115 */
116@InterfaceAudience.Private
117public class ReplicationSourceManager implements ReplicationListener {
118  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
119  // all the sources that read this RS's logs and every peer only has one replication source
120  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
121  // List of all the sources we got from died RSs
122  private final List<ReplicationSourceInterface> oldsources;
123  private final ReplicationQueueStorage queueStorage;
124  private final ReplicationTracker replicationTracker;
125  private final ReplicationPeers replicationPeers;
126  // UUID for this cluster
127  private final UUID clusterId;
128  // All about stopping
129  private final Server server;
130
131  // All logs we are currently tracking
132  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
133  // For normal replication source, the peer id is same with the queue id
134  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
135  // Logs for recovered sources we are currently tracking
136  // the map is: queue_id->logPrefix/logGroup->logs
137  // For recovered source, the queue id's format is peer_id-servername-*
138  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
139
140  private final Configuration conf;
141  private final FileSystem fs;
142  // The paths to the latest log of each wal group, for new coming peers
143  private final Set<Path> latestPaths;
144  // Path to the wals directories
145  private final Path logDir;
146  // Path to the wal archive
147  private final Path oldLogDir;
148  private final WALFileLengthProvider walFileLengthProvider;
149  // The number of ms that we wait before moving znodes, HBASE-3596
150  private final long sleepBeforeFailover;
151  // Homemade executer service for replication
152  private final ThreadPoolExecutor executor;
153
154  private final boolean replicationForBulkLoadDataEnabled;
155
156
157  private AtomicLong totalBufferUsed = new AtomicLong();
158
159  /**
160   * Creates a replication manager and sets the watch on all the other registered region servers
161   * @param queueStorage the interface for manipulating replication queues
162   * @param replicationPeers
163   * @param replicationTracker
164   * @param conf the configuration to use
165   * @param server the server for this region server
166   * @param fs the file system to use
167   * @param logDir the directory that contains all wal directories of live RSs
168   * @param oldLogDir the directory where old logs are archived
169   * @param clusterId
170   */
171  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
172      ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
173      Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
174      WALFileLengthProvider walFileLengthProvider) throws IOException {
175    // CopyOnWriteArrayList is thread-safe.
176    // Generally, reading is more than modifying.
177    this.sources = new ConcurrentHashMap<>();
178    this.queueStorage = queueStorage;
179    this.replicationPeers = replicationPeers;
180    this.replicationTracker = replicationTracker;
181    this.server = server;
182    this.walsById = new ConcurrentHashMap<>();
183    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
184    this.oldsources = new ArrayList<>();
185    this.conf = conf;
186    this.fs = fs;
187    this.logDir = logDir;
188    this.oldLogDir = oldLogDir;
189    this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
190                                                                                         // seconds
191    this.clusterId = clusterId;
192    this.walFileLengthProvider = walFileLengthProvider;
193    this.replicationTracker.registerListener(this);
194    // It's preferable to failover 1 RS at a time, but with good zk servers
195    // more could be processed at the same time.
196    int nbWorkers = conf.getInt("replication.executor.workers", 1);
197    // use a short 100ms sleep since this could be done inline with a RS startup
198    // even if we fail, other region servers can take care of it
199    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100,
200        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
201    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
202    tfb.setNameFormat("ReplicationExecutor-%d");
203    tfb.setDaemon(true);
204    this.executor.setThreadFactory(tfb.build());
205    this.latestPaths = new HashSet<Path>();
206    replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
207      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
208  }
209
210  /**
211   * Adds a normal source per registered peer cluster and tries to process all old region server wal
212   * queues
213   * <p>
214   * The returned future is for adoptAbandonedQueues task.
215   */
216  Future<?> init() throws IOException {
217    for (String id : this.replicationPeers.getAllPeerIds()) {
218      addSource(id);
219      if (replicationForBulkLoadDataEnabled) {
220        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
221        // when a peer was added before replication for bulk loaded data was enabled.
222        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id));
223      }
224    }
225    return this.executor.submit(this::adoptAbandonedQueues);
226  }
227
228  private void adoptAbandonedQueues() {
229    List<ServerName> currentReplicators = null;
230    try {
231      currentReplicators = queueStorage.getListOfReplicators();
232    } catch (ReplicationException e) {
233      server.abort("Failed to get all replicators", e);
234      return;
235    }
236    if (currentReplicators == null || currentReplicators.isEmpty()) {
237      return;
238    }
239    List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
240        .map(ServerName::valueOf).collect(Collectors.toList());
241    LOG.info(
242      "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
243
244    // Look if there's anything to process after a restart
245    for (ServerName rs : currentReplicators) {
246      if (!otherRegionServers.contains(rs)) {
247        transferQueues(rs);
248      }
249    }
250  }
251
252  /**
253   * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
254   * HFile Refs
255   * @param peerId the id of replication peer
256   */
257  public void addPeer(String peerId) throws IOException {
258    boolean added = false;
259    try {
260      added = this.replicationPeers.addPeer(peerId);
261    } catch (ReplicationException e) {
262      throw new IOException(e);
263    }
264    if (added) {
265      addSource(peerId);
266      if (replicationForBulkLoadDataEnabled) {
267        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId));
268      }
269    }
270  }
271
272  /**
273   * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
274   * and related replication queues 3. Remove the normal source and related replication queue 4.
275   * Remove HFile Refs
276   * @param peerId the id of the replication peer
277   */
278  public void removePeer(String peerId) {
279    replicationPeers.removePeer(peerId);
280    String terminateMessage = "Replication stream was removed by a user";
281    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
282    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
283    // see NodeFailoverWorker.run
284    synchronized (this.oldsources) {
285      // First close all the recovered sources for this peer
286      for (ReplicationSourceInterface src : oldsources) {
287        if (peerId.equals(src.getPeerId())) {
288          oldSourcesToDelete.add(src);
289        }
290      }
291      for (ReplicationSourceInterface src : oldSourcesToDelete) {
292        src.terminate(terminateMessage);
293        removeRecoveredSource(src);
294      }
295    }
296    LOG.info(
297      "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size());
298    // Now close the normal source for this peer
299    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
300    if (srcToRemove != null) {
301      srcToRemove.terminate(terminateMessage);
302      removeSource(srcToRemove);
303    } else {
304      // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup
305      // Delete queue from storage and memory and queue id is same with peer id for normal
306      // source
307      deleteQueue(peerId);
308      this.walsById.remove(peerId);
309    }
310
311    // Remove HFile Refs
312    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
313  }
314
315  /**
316   * Factory method to create a replication source
317   * @param queueId the id of the replication queue
318   * @return the created source
319   */
320  private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
321      throws IOException {
322    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
323
324    MetricsSource metrics = new MetricsSource(queueId);
325    // init replication source
326    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
327      walFileLengthProvider, metrics);
328    return src;
329  }
330
331  /**
332   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
333   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
334   * group and do replication
335   * @param peerId the id of the replication peer
336   * @return the source that was created
337   */
338  @VisibleForTesting
339  ReplicationSourceInterface addSource(String peerId) throws IOException {
340    ReplicationPeer peer = replicationPeers.getPeer(peerId);
341    ReplicationSourceInterface src = createSource(peerId, peer);
342    // synchronized on latestPaths to avoid missing the new log
343    synchronized (this.latestPaths) {
344      this.sources.put(peerId, src);
345      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
346      this.walsById.put(peerId, walsByGroup);
347      // Add the latest wal to that source's queue
348      if (this.latestPaths.size() > 0) {
349        for (Path logPath : latestPaths) {
350          String name = logPath.getName();
351          String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
352          NavigableSet<String> logs = new TreeSet<>();
353          logs.add(name);
354          walsByGroup.put(walPrefix, logs);
355          // Abort RS and throw exception to make add peer failed
356          abortAndThrowIOExceptionWhenFail(
357            () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
358          src.enqueueLog(logPath);
359        }
360      }
361    }
362    src.startup();
363    return src;
364  }
365
366  /**
367   * Close the previous replication sources of this peer id and open new sources to trigger the new
368   * replication state changes or new replication config changes. Here we don't need to change
369   * replication queue storage and only to enqueue all logs to the new replication source
370   * @param peerId the id of the replication peer
371   * @throws IOException
372   */
373  public void refreshSources(String peerId) throws IOException {
374    String terminateMessage = "Peer " + peerId +
375      " state or config changed. Will close the previous replication source and open a new one";
376    ReplicationPeer peer = replicationPeers.getPeer(peerId);
377    ReplicationSourceInterface src = createSource(peerId, peer);
378    // synchronized on latestPaths to avoid missing the new log
379    synchronized (this.latestPaths) {
380      ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
381      if (toRemove != null) {
382        LOG.info("Terminate replication source for " + toRemove.getPeerId());
383        // Do not clear metrics
384        toRemove.terminate(terminateMessage, null, false);
385      }
386      for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
387        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
388      }
389    }
390    LOG.info("Startup replication source for " + src.getPeerId());
391    src.startup();
392
393    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
394    // synchronized on oldsources to avoid race with NodeFailoverWorker
395    synchronized (this.oldsources) {
396      List<String> previousQueueIds = new ArrayList<>();
397      for (ReplicationSourceInterface oldSource : this.oldsources) {
398        if (oldSource.getPeerId().equals(peerId)) {
399          previousQueueIds.add(oldSource.getQueueId());
400          oldSource.terminate(terminateMessage);
401          this.oldsources.remove(oldSource);
402        }
403      }
404      for (String queueId : previousQueueIds) {
405        ReplicationSourceInterface replicationSource = createSource(queueId, peer);
406        this.oldsources.add(replicationSource);
407        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
408          walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
409        }
410        toStartup.add(replicationSource);
411      }
412    }
413    for (ReplicationSourceInterface replicationSource : toStartup) {
414      replicationSource.startup();
415    }
416  }
417
418  /**
419   * Clear the metrics and related replication queue of the specified old source
420   * @param src source to clear
421   */
422  void removeRecoveredSource(ReplicationSourceInterface src) {
423    LOG.info("Done with the recovered queue " + src.getQueueId());
424    this.oldsources.remove(src);
425    // Delete queue from storage and memory
426    deleteQueue(src.getQueueId());
427    this.walsByIdRecoveredQueues.remove(src.getQueueId());
428  }
429
430  /**
431   * Clear the metrics and related replication queue of the specified old source
432   * @param src source to clear
433   */
434  void removeSource(ReplicationSourceInterface src) {
435    LOG.info("Done with the queue " + src.getQueueId());
436    this.sources.remove(src.getPeerId());
437    // Delete queue from storage and memory
438    deleteQueue(src.getQueueId());
439    this.walsById.remove(src.getQueueId());
440  }
441
442  /**
443   * Delete a complete queue of wals associated with a replication source
444   * @param queueId the id of replication queue to delete
445   */
446  private void deleteQueue(String queueId) {
447    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId));
448  }
449
450  @FunctionalInterface
451  private interface ReplicationQueueOperation {
452    void exec() throws ReplicationException;
453  }
454
455  /**
456   * Refresh replication source will terminate the old source first, then the source thread will be
457   * interrupted. Need to handle it instead of abort the region server.
458   */
459  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
460    try {
461      op.exec();
462    } catch (ReplicationException e) {
463      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
464          && e.getCause().getCause() != null && e.getCause()
465          .getCause() instanceof InterruptedException) {
466        throw new RuntimeException(
467            "Thread is interrupted, the replication source may be terminated");
468      }
469      server.abort("Failed to operate on replication queue", e);
470    }
471  }
472
473  private void abortWhenFail(ReplicationQueueOperation op) {
474    try {
475      op.exec();
476    } catch (ReplicationException e) {
477      server.abort("Failed to operate on replication queue", e);
478    }
479  }
480
481  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
482    try {
483      op.exec();
484    } catch (ReplicationException e) {
485      throw new IOException(e);
486    }
487  }
488
489  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
490    try {
491      op.exec();
492    } catch (ReplicationException e) {
493      server.abort("Failed to operate on replication queue", e);
494      throw new IOException(e);
495    }
496  }
497
498  /**
499   * This method will log the current position to storage. And also clean old logs from the
500   * replication queue.
501   * @param queueId id of the replication queue
502   * @param queueRecovered indicates if this queue comes from another region server
503   * @param entryBatch the wal entry batch we just shipped
504   */
505  public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
506      WALEntryBatch entryBatch) {
507    String fileName = entryBatch.getLastWalPath().getName();
508    interruptOrAbortWhenFail(() -> this.queueStorage
509        .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(),
510            entryBatch.getLastSeqIds()));
511    cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
512  }
513
514  /**
515   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
516   * file is closed and has no more entries.
517   * @param log Path to the log
518   * @param inclusive whether we should also remove the given log file
519   * @param queueId id of the replication queue
520   * @param queueRecovered Whether this is a recovered queue
521   */
522  @VisibleForTesting
523  void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
524    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
525    if (queueRecovered) {
526      NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
527      if (wals != null) {
528        cleanOldLogs(wals, log, inclusive, queueId);
529      }
530    } else {
531      // synchronized on walsById to avoid race with preLogRoll
532      synchronized (this.walsById) {
533        NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
534        if (wals != null) {
535          cleanOldLogs(wals, log, inclusive, queueId);
536        }
537      }
538    }
539  }
540
541  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
542    NavigableSet<String> walSet = wals.headSet(key, inclusive);
543    if (walSet.isEmpty()) {
544      return;
545    }
546    LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
547    for (String wal : walSet) {
548      interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
549    }
550    walSet.clear();
551  }
552
553  // public because of we call it in TestReplicationEmptyWALRecovery
554  @VisibleForTesting
555  public void preLogRoll(Path newLog) throws IOException {
556    String logName = newLog.getName();
557    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
558    // synchronized on latestPaths to avoid the new open source miss the new log
559    synchronized (this.latestPaths) {
560      // Add log to queue storage
561      for (ReplicationSourceInterface source : this.sources.values()) {
562        // If record log to queue storage failed, abort RS and throw exception to make log roll
563        // failed
564        abortAndThrowIOExceptionWhenFail(
565          () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
566      }
567
568      // synchronized on walsById to avoid race with cleanOldLogs
569      synchronized (this.walsById) {
570        // Update walsById map
571        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
572          .entrySet()) {
573          String peerId = entry.getKey();
574          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
575          boolean existingPrefix = false;
576          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
577            SortedSet<String> wals = walsEntry.getValue();
578            if (this.sources.isEmpty()) {
579              // If there's no slaves, don't need to keep the old wals since
580              // we only consider the last one when a new slave comes in
581              wals.clear();
582            }
583            if (logPrefix.equals(walsEntry.getKey())) {
584              wals.add(logName);
585              existingPrefix = true;
586            }
587          }
588          if (!existingPrefix) {
589            // The new log belongs to a new group, add it into this peer
590            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
591            NavigableSet<String> wals = new TreeSet<>();
592            wals.add(logName);
593            walsByPrefix.put(logPrefix, wals);
594          }
595        }
596      }
597
598      // Add to latestPaths
599      Iterator<Path> iterator = latestPaths.iterator();
600      while (iterator.hasNext()) {
601        Path path = iterator.next();
602        if (path.getName().contains(logPrefix)) {
603          iterator.remove();
604          break;
605        }
606      }
607      this.latestPaths.add(newLog);
608    }
609  }
610
611  // public because of we call it in TestReplicationEmptyWALRecovery
612  @VisibleForTesting
613  public void postLogRoll(Path newLog) throws IOException {
614    // This only updates the sources we own, not the recovered ones
615    for (ReplicationSourceInterface source : this.sources.values()) {
616      source.enqueueLog(newLog);
617    }
618  }
619
620  @Override
621  public void regionServerRemoved(String regionserver) {
622    transferQueues(ServerName.valueOf(regionserver));
623  }
624
625  /**
626   * Transfer all the queues of the specified to this region server. First it tries to grab a lock
627   * and if it works it will move the old queues and finally will delete the old queues.
628   * <p>
629   * It creates one old source for any type of source of the old rs.
630   */
631  private void transferQueues(ServerName deadRS) {
632    if (server.getServerName().equals(deadRS)) {
633      // it's just us, give up
634      return;
635    }
636    NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
637    try {
638      this.executor.execute(transfer);
639    } catch (RejectedExecutionException ex) {
640      CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
641          .getGlobalSource().incrFailedRecoveryQueue();
642      LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
643    }
644  }
645
646  /**
647   * Class responsible to setup new ReplicationSources to take care of the queues from dead region
648   * servers.
649   */
650  class NodeFailoverWorker extends Thread {
651
652    private final ServerName deadRS;
653    // After claim the queues from dead region server, the NodeFailoverWorker will skip to start
654    // the RecoveredReplicationSource if the peer has been removed. but there's possible that
655    // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
656    // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
657    // should start the RecoveredReplicationSource. If the latest peer is not the old peer when
658    // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
659    // the rs will abort (See HBASE-20475).
660    private final Map<String, ReplicationPeerImpl> peersSnapshot;
661
662    @VisibleForTesting
663    public NodeFailoverWorker(ServerName deadRS) {
664      super("Failover-for-" + deadRS);
665      this.deadRS = deadRS;
666      peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
667    }
668
669    private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
670      ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
671      return oldPeerRef != null && oldPeerRef == newPeerRef;
672    }
673
674    @Override
675    public void run() {
676      // Wait a bit before transferring the queues, we may be shutting down.
677      // This sleep may not be enough in some cases.
678      try {
679        Thread.sleep(sleepBeforeFailover +
680          (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
681      } catch (InterruptedException e) {
682        LOG.warn("Interrupted while waiting before transferring a queue.");
683        Thread.currentThread().interrupt();
684      }
685      // We try to lock that rs' queue directory
686      if (server.isStopped()) {
687        LOG.info("Not transferring queue since we are shutting down");
688        return;
689      }
690      Map<String, Set<String>> newQueues = new HashMap<>();
691      try {
692        List<String> queues = queueStorage.getAllQueues(deadRS);
693        while (!queues.isEmpty()) {
694          Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
695            queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName());
696          long sleep = sleepBeforeFailover / 2;
697          if (!peer.getSecond().isEmpty()) {
698            newQueues.put(peer.getFirst(), peer.getSecond());
699            sleep = sleepBeforeFailover;
700          }
701          try {
702            Thread.sleep(sleep);
703          } catch (InterruptedException e) {
704            LOG.warn("Interrupted while waiting before transferring a queue.");
705            Thread.currentThread().interrupt();
706          }
707          queues = queueStorage.getAllQueues(deadRS);
708        }
709        if (queues.isEmpty()) {
710          queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
711        }
712      } catch (ReplicationException e) {
713        LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
714            "replication queue. Znode : (%s)" +
715            " Possible solution: check if znode size exceeds jute.maxBuffer value. " +
716            " If so, increase it for both client and server side." + e),  deadRS,
717            queueStorage.getRsNode(deadRS));
718        server.abort("Failed to claim queue from dead regionserver.", e);
719        return;
720      }
721      // Copying over the failed queue is completed.
722      if (newQueues.isEmpty()) {
723        // We either didn't get the lock or the failed region server didn't have any outstanding
724        // WALs to replicate, so we are done.
725        return;
726      }
727
728      for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
729        String queueId = entry.getKey();
730        Set<String> walsSet = entry.getValue();
731        try {
732          // there is not an actual peer defined corresponding to peerId for the failover.
733          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
734          String actualPeerId = replicationQueueInfo.getPeerId();
735
736          ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
737          if (peer == null || !isOldPeer(actualPeerId, peer)) {
738            LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
739              deadRS);
740            abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
741            continue;
742          }
743          if (server instanceof ReplicationSyncUp.DummyServer
744              && peer.getPeerState().equals(PeerState.DISABLED)) {
745            LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
746                + "replicating data to this peer.",
747              actualPeerId);
748            continue;
749          }
750          // track sources in walsByIdRecoveredQueues
751          Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
752          walsByIdRecoveredQueues.put(queueId, walsByGroup);
753          for (String wal : walsSet) {
754            String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
755            NavigableSet<String> wals = walsByGroup.get(walPrefix);
756            if (wals == null) {
757              wals = new TreeSet<>();
758              walsByGroup.put(walPrefix, wals);
759            }
760            wals.add(wal);
761          }
762
763          ReplicationSourceInterface src = createSource(queueId, peer);
764          // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
765          synchronized (oldsources) {
766            peer = replicationPeers.getPeer(src.getPeerId());
767            if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
768              src.terminate("Recovered queue doesn't belong to any current peer");
769              removeRecoveredSource(src);
770              continue;
771            }
772            oldsources.add(src);
773            for (String wal : walsSet) {
774              src.enqueueLog(new Path(oldLogDir, wal));
775            }
776            src.startup();
777          }
778        } catch (IOException e) {
779          // TODO manage it
780          LOG.error("Failed creating a source", e);
781        }
782      }
783    }
784  }
785
786  /**
787   * Terminate the replication on this region server
788   */
789  public void join() {
790    this.executor.shutdown();
791    for (ReplicationSourceInterface source : this.sources.values()) {
792      source.terminate("Region server is closing");
793    }
794  }
795
796  /**
797   * Get a copy of the wals of the normal sources on this rs
798   * @return a sorted set of wal names
799   */
800  @VisibleForTesting
801  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
802    return Collections.unmodifiableMap(walsById);
803  }
804
805  /**
806   * Get a copy of the wals of the recovered sources on this rs
807   * @return a sorted set of wal names
808   */
809  @VisibleForTesting
810  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
811    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
812  }
813
814  /**
815   * Get a list of all the normal sources of this rs
816   * @return list of all normal sources
817   */
818  public List<ReplicationSourceInterface> getSources() {
819    return new ArrayList<>(this.sources.values());
820  }
821
822  /**
823   * Get a list of all the recovered sources of this rs
824   * @return list of all recovered sources
825   */
826  public List<ReplicationSourceInterface> getOldSources() {
827    return this.oldsources;
828  }
829
830  /**
831   * Get the normal source for a given peer
832   * @return the normal source for the give peer if it exists, otherwise null.
833   */
834  @VisibleForTesting
835  public ReplicationSourceInterface getSource(String peerId) {
836    return this.sources.get(peerId);
837  }
838
839  @VisibleForTesting
840  List<String> getAllQueues() throws IOException {
841    List<String> allQueues = Collections.emptyList();
842    try {
843      allQueues = queueStorage.getAllQueues(server.getServerName());
844    } catch (ReplicationException e) {
845      throw new IOException(e);
846    }
847    return allQueues;
848  }
849
850  @VisibleForTesting
851  int getSizeOfLatestPath() {
852    synchronized (latestPaths) {
853      return latestPaths.size();
854    }
855  }
856
857  @VisibleForTesting
858  public AtomicLong getTotalBufferUsed() {
859    return totalBufferUsed;
860  }
861
862  /**
863   * Get the directory where wals are archived
864   * @return the directory where wals are archived
865   */
866  public Path getOldLogDir() {
867    return this.oldLogDir;
868  }
869
870  /**
871   * Get the directory where wals are stored by their RSs
872   * @return the directory where wals are stored by their RSs
873   */
874  public Path getLogDir() {
875    return this.logDir;
876  }
877
878  /**
879   * Get the handle on the local file system
880   * @return Handle on the local file system
881   */
882  public FileSystem getFs() {
883    return this.fs;
884  }
885
886  /**
887   * Get the ReplicationPeers used by this ReplicationSourceManager
888   * @return the ReplicationPeers used by this ReplicationSourceManager
889   */
890  public ReplicationPeers getReplicationPeers() {
891    return this.replicationPeers;
892  }
893
894  /**
895   * Get a string representation of all the sources' metrics
896   */
897  public String getStats() {
898    StringBuilder stats = new StringBuilder();
899    for (ReplicationSourceInterface source : this.sources.values()) {
900      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
901      stats.append(source.getStats() + "\n");
902    }
903    for (ReplicationSourceInterface oldSource : oldsources) {
904      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
905      stats.append(oldSource.getStats() + "\n");
906    }
907    return stats.toString();
908  }
909
910  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
911      throws IOException {
912    for (ReplicationSourceInterface source : this.sources.values()) {
913      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
914    }
915  }
916
917  public void cleanUpHFileRefs(String peerId, List<String> files) {
918    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
919  }
920
921  int activeFailoverTaskCount() {
922    return executor.getActiveCount();
923  }
924}