001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableSet;
029import java.util.Set;
030import java.util.SortedSet;
031import java.util.TreeSet;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.Future;
036import java.util.concurrent.LinkedBlockingQueue;
037import java.util.concurrent.RejectedExecutionException;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.Server;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.replication.ReplicationException;
052import org.apache.hadoop.hbase.replication.ReplicationListener;
053import org.apache.hadoop.hbase.replication.ReplicationPeer;
054import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
055import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
056import org.apache.hadoop.hbase.replication.ReplicationPeers;
057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.ReplicationTracker;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.apache.zookeeper.KeeperException;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
069
070/**
071 * This class is responsible to manage all the replication sources. There are two classes of
072 * sources:
073 * <ul>
074 * <li>Normal sources are persistent and one per peer cluster</li>
075 * <li>Old sources are recovered from a failed region server and our only goal is to finish
076 * replicating the WAL queue it had</li>
077 * </ul>
078 * <p>
079 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
080 * in order to transfer all the queues in a local old source.
081 * <p>
082 * Synchronization specification:
083 * <ul>
084 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
085 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
086 * operations.</li>
087 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
088 * {@link #addPeer(String)}, {@link #removePeer(String)},
089 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
090 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
091 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
092 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
093 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
094 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
095 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
096 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
097 * {@link #preLogRoll(Path)}.</li>
098 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
099 * modify it, {@link #removePeer(String)} ,
100 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
101 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
102 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
103 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
104 * {@link ReplicationSourceInterface} firstly, then remove the wals from
105 * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
106 * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
107 * {@link ReplicationSourceInterface}. So there is no race here. For
108 * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
109 * is already synchronized on {@link #oldsources}. So no need synchronized on
110 * {@link #walsByIdRecoveredQueues}.</li>
111 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
112 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
113 * to-be-removed peer.</li>
114 * </ul>
115 */
116@InterfaceAudience.Private
117public class ReplicationSourceManager implements ReplicationListener {
118  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
119  // all the sources that read this RS's logs and every peer only has one replication source
120  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
121  // List of all the sources we got from died RSs
122  private final List<ReplicationSourceInterface> oldsources;
123  private final ReplicationQueueStorage queueStorage;
124  private final ReplicationTracker replicationTracker;
125  private final ReplicationPeers replicationPeers;
126  // UUID for this cluster
127  private final UUID clusterId;
128  // All about stopping
129  private final Server server;
130
131  // All logs we are currently tracking
132  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
133  // For normal replication source, the peer id is same with the queue id
134  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
135  // Logs for recovered sources we are currently tracking
136  // the map is: queue_id->logPrefix/logGroup->logs
137  // For recovered source, the queue id's format is peer_id-servername-*
138  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
139
140  private final Configuration conf;
141  private final FileSystem fs;
142  // The paths to the latest log of each wal group, for new coming peers
143  private final Set<Path> latestPaths;
144  // Path to the wals directories
145  private final Path logDir;
146  // Path to the wal archive
147  private final Path oldLogDir;
148  private final WALFileLengthProvider walFileLengthProvider;
149  // The number of ms that we wait before moving znodes, HBASE-3596
150  private final long sleepBeforeFailover;
151  // Homemade executer service for replication
152  private final ThreadPoolExecutor executor;
153
154  private final boolean replicationForBulkLoadDataEnabled;
155
156
157  private AtomicLong totalBufferUsed = new AtomicLong();
158
159  /**
160   * Creates a replication manager and sets the watch on all the other registered region servers
161   * @param queueStorage the interface for manipulating replication queues
162   * @param replicationPeers
163   * @param replicationTracker
164   * @param conf the configuration to use
165   * @param server the server for this region server
166   * @param fs the file system to use
167   * @param logDir the directory that contains all wal directories of live RSs
168   * @param oldLogDir the directory where old logs are archived
169   * @param clusterId
170   */
171  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
172      ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
173      Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
174      WALFileLengthProvider walFileLengthProvider) throws IOException {
175    // CopyOnWriteArrayList is thread-safe.
176    // Generally, reading is more than modifying.
177    this.sources = new ConcurrentHashMap<>();
178    this.queueStorage = queueStorage;
179    this.replicationPeers = replicationPeers;
180    this.replicationTracker = replicationTracker;
181    this.server = server;
182    this.walsById = new ConcurrentHashMap<>();
183    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
184    this.oldsources = new ArrayList<>();
185    this.conf = conf;
186    this.fs = fs;
187    this.logDir = logDir;
188    this.oldLogDir = oldLogDir;
189    this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
190                                                                                         // seconds
191    this.clusterId = clusterId;
192    this.walFileLengthProvider = walFileLengthProvider;
193    this.replicationTracker.registerListener(this);
194    // It's preferable to failover 1 RS at a time, but with good zk servers
195    // more could be processed at the same time.
196    int nbWorkers = conf.getInt("replication.executor.workers", 1);
197    // use a short 100ms sleep since this could be done inline with a RS startup
198    // even if we fail, other region servers can take care of it
199    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100,
200        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
201    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
202    tfb.setNameFormat("ReplicationExecutor-%d");
203    tfb.setDaemon(true);
204    this.executor.setThreadFactory(tfb.build());
205    this.latestPaths = new HashSet<Path>();
206    replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
207      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
208  }
209
210  /**
211   * Adds a normal source per registered peer cluster and tries to process all old region server wal
212   * queues
213   * <p>
214   * The returned future is for adoptAbandonedQueues task.
215   */
216  Future<?> init() throws IOException {
217    for (String id : this.replicationPeers.getAllPeerIds()) {
218      addSource(id);
219      if (replicationForBulkLoadDataEnabled) {
220        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
221        // when a peer was added before replication for bulk loaded data was enabled.
222        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id));
223      }
224    }
225    return this.executor.submit(this::adoptAbandonedQueues);
226  }
227
228  private void adoptAbandonedQueues() {
229    List<ServerName> currentReplicators = null;
230    try {
231      currentReplicators = queueStorage.getListOfReplicators();
232    } catch (ReplicationException e) {
233      server.abort("Failed to get all replicators", e);
234      return;
235    }
236    if (currentReplicators == null || currentReplicators.isEmpty()) {
237      return;
238    }
239    List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
240        .map(ServerName::valueOf).collect(Collectors.toList());
241    LOG.info(
242      "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
243
244    // Look if there's anything to process after a restart
245    for (ServerName rs : currentReplicators) {
246      if (!otherRegionServers.contains(rs)) {
247        transferQueues(rs);
248      }
249    }
250  }
251
252  /**
253   * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
254   * HFile Refs
255   * @param peerId the id of replication peer
256   */
257  public void addPeer(String peerId) throws IOException {
258    boolean added = false;
259    try {
260      added = this.replicationPeers.addPeer(peerId);
261    } catch (ReplicationException e) {
262      throw new IOException(e);
263    }
264    if (added) {
265      addSource(peerId);
266      if (replicationForBulkLoadDataEnabled) {
267        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId));
268      }
269    }
270  }
271
272  /**
273   * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
274   * and related replication queues 3. Remove the normal source and related replication queue 4.
275   * Remove HFile Refs
276   * @param peerId the id of the replication peer
277   */
278  public void removePeer(String peerId) {
279    replicationPeers.removePeer(peerId);
280    String terminateMessage = "Replication stream was removed by a user";
281    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
282    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
283    // see NodeFailoverWorker.run
284    synchronized (this.oldsources) {
285      // First close all the recovered sources for this peer
286      for (ReplicationSourceInterface src : oldsources) {
287        if (peerId.equals(src.getPeerId())) {
288          oldSourcesToDelete.add(src);
289        }
290      }
291      for (ReplicationSourceInterface src : oldSourcesToDelete) {
292        src.terminate(terminateMessage);
293        removeRecoveredSource(src);
294      }
295    }
296    LOG.info(
297      "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size());
298    // Now close the normal source for this peer
299    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
300    if (srcToRemove != null) {
301      srcToRemove.terminate(terminateMessage);
302      removeSource(srcToRemove);
303    } else {
304      // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup
305      // Delete queue from storage and memory and queue id is same with peer id for normal
306      // source
307      deleteQueue(peerId);
308      this.walsById.remove(peerId);
309    }
310
311    // Remove HFile Refs
312    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
313  }
314
315  /**
316   * Factory method to create a replication source
317   * @param queueId the id of the replication queue
318   * @return the created source
319   */
320  private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
321      throws IOException {
322    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
323
324    MetricsSource metrics = new MetricsSource(queueId);
325    // init replication source
326    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
327      walFileLengthProvider, metrics);
328    return src;
329  }
330
331  /**
332   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
333   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
334   * group and do replication
335   * @param peerId the id of the replication peer
336   * @return the source that was created
337   */
338  @VisibleForTesting
339  ReplicationSourceInterface addSource(String peerId) throws IOException {
340    ReplicationPeer peer = replicationPeers.getPeer(peerId);
341    ReplicationSourceInterface src = createSource(peerId, peer);
342    // synchronized on latestPaths to avoid missing the new log
343    synchronized (this.latestPaths) {
344      this.sources.put(peerId, src);
345      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
346      this.walsById.put(peerId, walsByGroup);
347      // Add the latest wal to that source's queue
348      if (this.latestPaths.size() > 0) {
349        for (Path logPath : latestPaths) {
350          String name = logPath.getName();
351          String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
352          NavigableSet<String> logs = new TreeSet<>();
353          logs.add(name);
354          walsByGroup.put(walPrefix, logs);
355          // Abort RS and throw exception to make add peer failed
356          abortAndThrowIOExceptionWhenFail(
357            () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
358          src.enqueueLog(logPath);
359        }
360      }
361    }
362    src.startup();
363    return src;
364  }
365
366  /**
367   * Close the previous replication sources of this peer id and open new sources to trigger the new
368   * replication state changes or new replication config changes. Here we don't need to change
369   * replication queue storage and only to enqueue all logs to the new replication source
370   * @param peerId the id of the replication peer
371   * @throws IOException
372   */
373  public void refreshSources(String peerId) throws IOException {
374    String terminateMessage = "Peer " + peerId +
375      " state or config changed. Will close the previous replication source and open a new one";
376    ReplicationPeer peer = replicationPeers.getPeer(peerId);
377    ReplicationSourceInterface src = createSource(peerId, peer);
378    // synchronized on latestPaths to avoid missing the new log
379    synchronized (this.latestPaths) {
380      ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
381      if (toRemove != null) {
382        LOG.info("Terminate replication source for " + toRemove.getPeerId());
383        // Do not clear metrics
384        toRemove.terminate(terminateMessage, null, false);
385      }
386      for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
387        walsByGroup.forEach(wal -> {
388          Path walPath = new Path(this.logDir, wal);
389          src.enqueueLog(walPath);
390          LOG.trace("Enqueued {} to source {} during source creation.",
391            walPath, src.getQueueId());
392        });
393
394      }
395    }
396    LOG.info("Startup replication source for " + src.getPeerId());
397    src.startup();
398
399    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
400    // synchronized on oldsources to avoid race with NodeFailoverWorker
401    synchronized (this.oldsources) {
402      List<String> previousQueueIds = new ArrayList<>();
403      for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter
404          .hasNext();) {
405        ReplicationSourceInterface oldSource = iter.next();
406        if (oldSource.getPeerId().equals(peerId)) {
407          previousQueueIds.add(oldSource.getQueueId());
408          oldSource.terminate(terminateMessage);
409          iter.remove();
410        }
411      }
412      for (String queueId : previousQueueIds) {
413        ReplicationSourceInterface replicationSource = createSource(queueId, peer);
414        this.oldsources.add(replicationSource);
415        LOG.trace("Added source for recovered queue: " + src.getQueueId());
416        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
417          walsByGroup.forEach(wal -> {
418            LOG.trace("Enqueueing log from recovered queue for source: {}",
419              src.getQueueId());
420            src.enqueueLog(new Path(wal));
421          });
422        }
423        toStartup.add(replicationSource);
424      }
425    }
426    for (ReplicationSourceInterface replicationSource : toStartup) {
427      replicationSource.startup();
428    }
429  }
430
431  /**
432   * Clear the metrics and related replication queue of the specified old source
433   * @param src source to clear
434   */
435  void removeRecoveredSource(ReplicationSourceInterface src) {
436    LOG.info("Done with the recovered queue " + src.getQueueId());
437    this.oldsources.remove(src);
438    // Delete queue from storage and memory
439    deleteQueue(src.getQueueId());
440    this.walsByIdRecoveredQueues.remove(src.getQueueId());
441  }
442
443  /**
444   * Clear the metrics and related replication queue of the specified old source
445   * @param src source to clear
446   */
447  void removeSource(ReplicationSourceInterface src) {
448    LOG.info("Done with the queue " + src.getQueueId());
449    this.sources.remove(src.getPeerId());
450    // Delete queue from storage and memory
451    deleteQueue(src.getQueueId());
452    this.walsById.remove(src.getQueueId());
453  }
454
455  /**
456   * Delete a complete queue of wals associated with a replication source
457   * @param queueId the id of replication queue to delete
458   */
459  private void deleteQueue(String queueId) {
460    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId));
461  }
462
463  @FunctionalInterface
464  private interface ReplicationQueueOperation {
465    void exec() throws ReplicationException;
466  }
467
468  /**
469   * Refresh replication source will terminate the old source first, then the source thread will be
470   * interrupted. Need to handle it instead of abort the region server.
471   */
472  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
473    try {
474      op.exec();
475    } catch (ReplicationException e) {
476      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
477          && e.getCause().getCause() != null && e.getCause()
478          .getCause() instanceof InterruptedException) {
479        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
480        // that thread is interrupted deep down in the stack, it should pass the following
481        // processing logic and propagate to the most top layer which can handle this exception
482        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
483        throw new ReplicationRuntimeException(
484          "Thread is interrupted, the replication source may be terminated",
485          e.getCause().getCause());
486      }
487      server.abort("Failed to operate on replication queue", e);
488    }
489  }
490
491  private void abortWhenFail(ReplicationQueueOperation op) {
492    try {
493      op.exec();
494    } catch (ReplicationException e) {
495      server.abort("Failed to operate on replication queue", e);
496    }
497  }
498
499  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
500    try {
501      op.exec();
502    } catch (ReplicationException e) {
503      throw new IOException(e);
504    }
505  }
506
507  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
508    try {
509      op.exec();
510    } catch (ReplicationException e) {
511      server.abort("Failed to operate on replication queue", e);
512      throw new IOException(e);
513    }
514  }
515
516  /**
517   * This method will log the current position to storage. And also clean old logs from the
518   * replication queue.
519   * @param queueId id of the replication queue
520   * @param queueRecovered indicates if this queue comes from another region server
521   * @param entryBatch the wal entry batch we just shipped
522   */
523  public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
524      WALEntryBatch entryBatch) {
525    String fileName = entryBatch.getLastWalPath().getName();
526    interruptOrAbortWhenFail(() -> this.queueStorage
527        .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(),
528            entryBatch.getLastSeqIds()));
529    cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
530  }
531
532  /**
533   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
534   * file is closed and has no more entries.
535   * @param log Path to the log
536   * @param inclusive whether we should also remove the given log file
537   * @param queueId id of the replication queue
538   * @param queueRecovered Whether this is a recovered queue
539   */
540  @VisibleForTesting
541  void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
542    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
543    if (queueRecovered) {
544      NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
545      if (wals != null) {
546        cleanOldLogs(wals, log, inclusive, queueId);
547      }
548    } else {
549      // synchronized on walsById to avoid race with preLogRoll
550      synchronized (this.walsById) {
551        NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
552        if (wals != null) {
553          cleanOldLogs(wals, log, inclusive, queueId);
554        }
555      }
556    }
557  }
558
559  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
560    NavigableSet<String> walSet = wals.headSet(key, inclusive);
561    if (walSet.isEmpty()) {
562      return;
563    }
564    LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
565    for (String wal : walSet) {
566      interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
567    }
568    walSet.clear();
569  }
570
571  // public because of we call it in TestReplicationEmptyWALRecovery
572  @VisibleForTesting
573  public void preLogRoll(Path newLog) throws IOException {
574    String logName = newLog.getName();
575    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
576    // synchronized on latestPaths to avoid the new open source miss the new log
577    synchronized (this.latestPaths) {
578      // Add log to queue storage
579      for (ReplicationSourceInterface source : this.sources.values()) {
580        // If record log to queue storage failed, abort RS and throw exception to make log roll
581        // failed
582        abortAndThrowIOExceptionWhenFail(
583          () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
584      }
585
586      // synchronized on walsById to avoid race with cleanOldLogs
587      synchronized (this.walsById) {
588        // Update walsById map
589        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
590          .entrySet()) {
591          String peerId = entry.getKey();
592          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
593          boolean existingPrefix = false;
594          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
595            SortedSet<String> wals = walsEntry.getValue();
596            if (this.sources.isEmpty()) {
597              // If there's no slaves, don't need to keep the old wals since
598              // we only consider the last one when a new slave comes in
599              wals.clear();
600            }
601            if (logPrefix.equals(walsEntry.getKey())) {
602              wals.add(logName);
603              existingPrefix = true;
604            }
605          }
606          if (!existingPrefix) {
607            // The new log belongs to a new group, add it into this peer
608            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
609            NavigableSet<String> wals = new TreeSet<>();
610            wals.add(logName);
611            walsByPrefix.put(logPrefix, wals);
612          }
613        }
614      }
615
616      // Add to latestPaths
617      Iterator<Path> iterator = latestPaths.iterator();
618      while (iterator.hasNext()) {
619        Path path = iterator.next();
620        if (path.getName().contains(logPrefix)) {
621          iterator.remove();
622          break;
623        }
624      }
625      this.latestPaths.add(newLog);
626    }
627  }
628
629  // public because of we call it in TestReplicationEmptyWALRecovery
630  @VisibleForTesting
631  public void postLogRoll(Path newLog) throws IOException {
632    // This only updates the sources we own, not the recovered ones
633    for (ReplicationSourceInterface source : this.sources.values()) {
634      source.enqueueLog(newLog);
635      LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.",
636          newLog, source.getQueueId());
637    }
638  }
639
640  @Override
641  public void regionServerRemoved(String regionserver) {
642    transferQueues(ServerName.valueOf(regionserver));
643  }
644
645  /**
646   * Transfer all the queues of the specified to this region server. First it tries to grab a lock
647   * and if it works it will move the old queues and finally will delete the old queues.
648   * <p>
649   * It creates one old source for any type of source of the old rs.
650   */
651  private void transferQueues(ServerName deadRS) {
652    if (server.getServerName().equals(deadRS)) {
653      // it's just us, give up
654      return;
655    }
656    NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
657    try {
658      this.executor.execute(transfer);
659    } catch (RejectedExecutionException ex) {
660      CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
661          .getGlobalSource().incrFailedRecoveryQueue();
662      LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
663    }
664  }
665
666  /**
667   * Class responsible to setup new ReplicationSources to take care of the queues from dead region
668   * servers.
669   */
670  class NodeFailoverWorker extends Thread {
671
672    private final ServerName deadRS;
673    // After claim the queues from dead region server, the NodeFailoverWorker will skip to start
674    // the RecoveredReplicationSource if the peer has been removed. but there's possible that
675    // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
676    // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
677    // should start the RecoveredReplicationSource. If the latest peer is not the old peer when
678    // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
679    // the rs will abort (See HBASE-20475).
680    private final Map<String, ReplicationPeerImpl> peersSnapshot;
681
682    @VisibleForTesting
683    public NodeFailoverWorker(ServerName deadRS) {
684      super("Failover-for-" + deadRS);
685      this.deadRS = deadRS;
686      peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
687    }
688
689    private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
690      ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
691      return oldPeerRef != null && oldPeerRef == newPeerRef;
692    }
693
694    @Override
695    public void run() {
696      // Wait a bit before transferring the queues, we may be shutting down.
697      // This sleep may not be enough in some cases.
698      try {
699        Thread.sleep(sleepBeforeFailover +
700          (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
701      } catch (InterruptedException e) {
702        LOG.warn("Interrupted while waiting before transferring a queue.");
703        Thread.currentThread().interrupt();
704      }
705      // We try to lock that rs' queue directory
706      if (server.isStopped()) {
707        LOG.info("Not transferring queue since we are shutting down");
708        return;
709      }
710      Map<String, Set<String>> newQueues = new HashMap<>();
711      try {
712        List<String> queues = queueStorage.getAllQueues(deadRS);
713        while (!queues.isEmpty()) {
714          Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
715            queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName());
716          long sleep = sleepBeforeFailover / 2;
717          if (!peer.getSecond().isEmpty()) {
718            newQueues.put(peer.getFirst(), peer.getSecond());
719            sleep = sleepBeforeFailover;
720          }
721          try {
722            Thread.sleep(sleep);
723          } catch (InterruptedException e) {
724            LOG.warn("Interrupted while waiting before transferring a queue.");
725            Thread.currentThread().interrupt();
726          }
727          queues = queueStorage.getAllQueues(deadRS);
728        }
729        if (queues.isEmpty()) {
730          queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
731        }
732      } catch (ReplicationException e) {
733        LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
734            "replication queue. Znode : (%s)" +
735            " Possible solution: check if znode size exceeds jute.maxBuffer value. " +
736            " If so, increase it for both client and server side." + e),  deadRS,
737            queueStorage.getRsNode(deadRS));
738        server.abort("Failed to claim queue from dead regionserver.", e);
739        return;
740      }
741      // Copying over the failed queue is completed.
742      if (newQueues.isEmpty()) {
743        // We either didn't get the lock or the failed region server didn't have any outstanding
744        // WALs to replicate, so we are done.
745        return;
746      }
747
748      for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
749        String queueId = entry.getKey();
750        Set<String> walsSet = entry.getValue();
751        try {
752          // there is not an actual peer defined corresponding to peerId for the failover.
753          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
754          String actualPeerId = replicationQueueInfo.getPeerId();
755
756          ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
757          if (peer == null || !isOldPeer(actualPeerId, peer)) {
758            LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
759              deadRS);
760            abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
761            continue;
762          }
763          if (server instanceof ReplicationSyncUp.DummyServer
764              && peer.getPeerState().equals(PeerState.DISABLED)) {
765            LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
766                + "replicating data to this peer.",
767              actualPeerId);
768            continue;
769          }
770          // track sources in walsByIdRecoveredQueues
771          Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
772          walsByIdRecoveredQueues.put(queueId, walsByGroup);
773          for (String wal : walsSet) {
774            String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
775            NavigableSet<String> wals = walsByGroup.get(walPrefix);
776            if (wals == null) {
777              wals = new TreeSet<>();
778              walsByGroup.put(walPrefix, wals);
779            }
780            wals.add(wal);
781          }
782
783          ReplicationSourceInterface src = createSource(queueId, peer);
784          // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
785          synchronized (oldsources) {
786            peer = replicationPeers.getPeer(src.getPeerId());
787            if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
788              src.terminate("Recovered queue doesn't belong to any current peer");
789              removeRecoveredSource(src);
790              continue;
791            }
792            oldsources.add(src);
793            for (String wal : walsSet) {
794              src.enqueueLog(new Path(oldLogDir, wal));
795            }
796            src.startup();
797          }
798        } catch (IOException e) {
799          // TODO manage it
800          LOG.error("Failed creating a source", e);
801        }
802      }
803    }
804  }
805
806  /**
807   * Terminate the replication on this region server
808   */
809  public void join() {
810    this.executor.shutdown();
811    for (ReplicationSourceInterface source : this.sources.values()) {
812      source.terminate("Region server is closing");
813    }
814  }
815
816  /**
817   * Get a copy of the wals of the normal sources on this rs
818   * @return a sorted set of wal names
819   */
820  @VisibleForTesting
821  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
822    return Collections.unmodifiableMap(walsById);
823  }
824
825  /**
826   * Get a copy of the wals of the recovered sources on this rs
827   * @return a sorted set of wal names
828   */
829  @VisibleForTesting
830  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
831    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
832  }
833
834  /**
835   * Get a list of all the normal sources of this rs
836   * @return list of all normal sources
837   */
838  public List<ReplicationSourceInterface> getSources() {
839    return new ArrayList<>(this.sources.values());
840  }
841
842  /**
843   * Get a list of all the recovered sources of this rs
844   * @return list of all recovered sources
845   */
846  public List<ReplicationSourceInterface> getOldSources() {
847    return this.oldsources;
848  }
849
850  /**
851   * Get the normal source for a given peer
852   * @return the normal source for the give peer if it exists, otherwise null.
853   */
854  @VisibleForTesting
855  public ReplicationSourceInterface getSource(String peerId) {
856    return this.sources.get(peerId);
857  }
858
859  @VisibleForTesting
860  List<String> getAllQueues() throws IOException {
861    List<String> allQueues = Collections.emptyList();
862    try {
863      allQueues = queueStorage.getAllQueues(server.getServerName());
864    } catch (ReplicationException e) {
865      throw new IOException(e);
866    }
867    return allQueues;
868  }
869
870  @VisibleForTesting
871  int getSizeOfLatestPath() {
872    synchronized (latestPaths) {
873      return latestPaths.size();
874    }
875  }
876
877  @VisibleForTesting
878  public AtomicLong getTotalBufferUsed() {
879    return totalBufferUsed;
880  }
881
882  /**
883   * Get the directory where wals are archived
884   * @return the directory where wals are archived
885   */
886  public Path getOldLogDir() {
887    return this.oldLogDir;
888  }
889
890  /**
891   * Get the directory where wals are stored by their RSs
892   * @return the directory where wals are stored by their RSs
893   */
894  public Path getLogDir() {
895    return this.logDir;
896  }
897
898  /**
899   * Get the handle on the local file system
900   * @return Handle on the local file system
901   */
902  public FileSystem getFs() {
903    return this.fs;
904  }
905
906  /**
907   * Get the ReplicationPeers used by this ReplicationSourceManager
908   * @return the ReplicationPeers used by this ReplicationSourceManager
909   */
910  public ReplicationPeers getReplicationPeers() {
911    return this.replicationPeers;
912  }
913
914  /**
915   * Get a string representation of all the sources' metrics
916   */
917  public String getStats() {
918    StringBuilder stats = new StringBuilder();
919    for (ReplicationSourceInterface source : this.sources.values()) {
920      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
921      stats.append(source.getStats() + "\n");
922    }
923    for (ReplicationSourceInterface oldSource : oldsources) {
924      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
925      stats.append(oldSource.getStats() + "\n");
926    }
927    return stats.toString();
928  }
929
930  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
931      throws IOException {
932    for (ReplicationSourceInterface source : this.sources.values()) {
933      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
934    }
935  }
936
937  public void cleanUpHFileRefs(String peerId, List<String> files) {
938    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
939  }
940
941  int activeFailoverTaskCount() {
942    return executor.getActiveCount();
943  }
944}