001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.NavigableSet;
028import java.util.OptionalLong;
029import java.util.Set;
030import java.util.SortedSet;
031import java.util.TreeSet;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.LinkedBlockingQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicLong;
040import java.util.concurrent.atomic.AtomicReference;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
050import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
051import org.apache.hadoop.hbase.replication.ReplicationException;
052import org.apache.hadoop.hbase.replication.ReplicationPeer;
053import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
054import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
055import org.apache.hadoop.hbase.replication.ReplicationPeers;
056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.apache.hadoop.hbase.wal.WALFactory;
062import org.apache.hadoop.hbase.wal.WALProvider;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.apache.zookeeper.KeeperException;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
069import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
070
071/**
072 * This class is responsible to manage all the replication sources. There are two classes of
073 * sources:
074 * <ul>
075 * <li>Normal sources are persistent and one per peer cluster</li>
076 * <li>Old sources are recovered from a failed region server and our only goal is to finish
077 * replicating the WAL queue it had</li>
078 * </ul>
079 * <p>
080 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
081 * in order to transfer all the queues in a local old source.
082 * <p>
083 * Synchronization specification:
084 * <ul>
085 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
086 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
087 * operations.</li>
088 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
089 * {@link #addPeer(String)}, {@link #removePeer(String)},
090 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
091 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
092 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
093 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
094 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
095 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
096 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
097 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
098 * {@link #preLogRoll(Path)}.</li>
099 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
100 * modify it, {@link #removePeer(String)} , <<<<<<< HEAD
101 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
102 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
103 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by =======
104 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
105 * {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
106 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by >>>>>>>
107 * 51893b9ba3... HBASE-26029 It is not reliable to use nodeDeleted event to track region server's
108 * death (#3430) {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
109 * {@link ReplicationSourceInterface} firstly, then remove the wals from
110 * {@link #walsByIdRecoveredQueues}. And
111 * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to
112 * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
113 * there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and
114 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
115 * synchronized on {@link #walsByIdRecoveredQueues}.</li>
116 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
117 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
118 * to-be-removed peer.</li>
119 * </ul>
120 */
121@InterfaceAudience.Private
122public class ReplicationSourceManager {
123  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
124  // all the sources that read this RS's logs and every peer only has one replication source
125  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
126  // List of all the sources we got from died RSs
127  private final List<ReplicationSourceInterface> oldsources;
128
129  /**
130   * Storage for queues that need persistance; e.g. Replication state so can be recovered after a
131   * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances
132   * for these to do updates themselves. Not all ReplicationSource instances keep state.
133   */
134  private final ReplicationQueueStorage queueStorage;
135
136  private final ReplicationPeers replicationPeers;
137  // UUID for this cluster
138  private final UUID clusterId;
139  // All about stopping
140  private final Server server;
141
142  // All logs we are currently tracking
143  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
144  // For normal replication source, the peer id is same with the queue id
145  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
146  // Logs for recovered sources we are currently tracking
147  // the map is: queue_id->logPrefix/logGroup->logs
148  // For recovered source, the queue id's format is peer_id-servername-*
149  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
150
151  private final Configuration conf;
152  private final FileSystem fs;
153  // The paths to the latest log of each wal group, for new coming peers
154  private final Map<String, Path> latestPaths;
155  // Path to the wals directories
156  private final Path logDir;
157  // Path to the wal archive
158  private final Path oldLogDir;
159  private final WALFactory walFactory;
160  // The number of ms that we wait before moving znodes, HBASE-3596
161  private final long sleepBeforeFailover;
162  // Homemade executer service for replication
163  private final ThreadPoolExecutor executor;
164
165  private final boolean replicationForBulkLoadDataEnabled;
166
167  private AtomicLong totalBufferUsed = new AtomicLong();
168  // Total buffer size on this RegionServer for holding batched edits to be shipped.
169  private final long totalBufferLimit;
170  private final MetricsReplicationGlobalSourceSource globalMetrics;
171
172  /**
173   * A special ReplicationSource for hbase:meta Region Read Replicas. Usually this reference remains
174   * empty. If an hbase:meta Region is opened on this server, we will create an instance of a
175   * hbase:meta CatalogReplicationSource and it will live the life of the Server thereafter; i.e. we
176   * will not shut it down even if the hbase:meta moves away from this server (in case it later gets
177   * moved back). We synchronize on this instance testing for presence and if absent, while creating
178   * so only created and started once.
179   */
180  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
181
182  /**
183   * Creates a replication manager and sets the watch on all the other registered region servers
184   * @param queueStorage the interface for manipulating replication queues
185   * @param conf         the configuration to use
186   * @param server       the server for this region server
187   * @param fs           the file system to use
188   * @param logDir       the directory that contains all wal directories of live RSs
189   * @param oldLogDir    the directory where old logs are archived
190   */
191  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
192    ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs,
193    Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory,
194    MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
195    // CopyOnWriteArrayList is thread-safe.
196    // Generally, reading is more than modifying.
197    this.sources = new ConcurrentHashMap<>();
198    this.queueStorage = queueStorage;
199    this.replicationPeers = replicationPeers;
200    this.server = server;
201    this.walsById = new ConcurrentHashMap<>();
202    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
203    this.oldsources = new ArrayList<>();
204    this.conf = conf;
205    this.fs = fs;
206    this.logDir = logDir;
207    this.oldLogDir = oldLogDir;
208    this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
209                                                                                         // seconds
210    this.clusterId = clusterId;
211    this.walFactory = walFactory;
212    // It's preferable to failover 1 RS at a time, but with good zk servers
213    // more could be processed at the same time.
214    int nbWorkers = conf.getInt("replication.executor.workers", 1);
215    // use a short 100ms sleep since this could be done inline with a RS startup
216    // even if we fail, other region servers can take care of it
217    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
218      new LinkedBlockingQueue<>());
219    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
220    tfb.setNameFormat("ReplicationExecutor-%d");
221    tfb.setDaemon(true);
222    this.executor.setThreadFactory(tfb.build());
223    this.latestPaths = new HashMap<>();
224    replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
225      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
226    this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
227      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
228    this.globalMetrics = globalMetrics;
229  }
230
231  /**
232   * Adds a normal source per registered peer cluster.
233   */
234  void init() throws IOException {
235    for (String id : this.replicationPeers.getAllPeerIds()) {
236      addSource(id);
237      if (replicationForBulkLoadDataEnabled) {
238        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
239        // when a peer was added before replication for bulk loaded data was enabled.
240        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id));
241      }
242    }
243  }
244
245  /**
246   * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
247   * HFile Refs
248   * @param peerId the id of replication peer
249   */
250  public void addPeer(String peerId) throws IOException {
251    boolean added = false;
252    try {
253      added = this.replicationPeers.addPeer(peerId);
254    } catch (ReplicationException e) {
255      throw new IOException(e);
256    }
257    if (added) {
258      addSource(peerId);
259      if (replicationForBulkLoadDataEnabled) {
260        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId));
261      }
262    }
263  }
264
265  /**
266   * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
267   * and related replication queues 3. Remove the normal source and related replication queue 4.
268   * Remove HFile Refs
269   * @param peerId the id of the replication peer
270   */
271  public void removePeer(String peerId) {
272    replicationPeers.removePeer(peerId);
273    String terminateMessage = "Replication stream was removed by a user";
274    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
275    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
276    // see NodeFailoverWorker.run
277    synchronized (this.oldsources) {
278      // First close all the recovered sources for this peer
279      for (ReplicationSourceInterface src : oldsources) {
280        if (peerId.equals(src.getPeerId())) {
281          oldSourcesToDelete.add(src);
282        }
283      }
284      for (ReplicationSourceInterface src : oldSourcesToDelete) {
285        src.terminate(terminateMessage);
286        removeRecoveredSource(src);
287      }
288    }
289    LOG
290      .info("Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size());
291    // Now close the normal source for this peer
292    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
293    if (srcToRemove != null) {
294      srcToRemove.terminate(terminateMessage);
295      removeSource(srcToRemove);
296    } else {
297      // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup
298      // Delete queue from storage and memory and queue id is same with peer id for normal
299      // source
300      deleteQueue(peerId);
301      this.walsById.remove(peerId);
302    }
303
304    // Remove HFile Refs
305    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
306  }
307
308  /**
309   * @return a new 'classic' user-space replication source.
310   * @param queueId the id of the replication queue to associate the ReplicationSource with.
311   * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta.
312   */
313  private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
314    throws IOException {
315    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
316    // Init the just created replication source. Pass the default walProvider's wal file length
317    // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
318    // replication, see #createCatalogReplicationSource().
319    WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null
320      ? this.walFactory.getWALProvider().getWALFileLengthProvider()
321      : p -> OptionalLong.empty();
322    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
323      walFileLengthProvider, new MetricsSource(queueId));
324    return src;
325  }
326
327  /**
328   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
329   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
330   * group and do replication
331   * @param peerId the id of the replication peer
332   * @return the source that was created
333   */
334  ReplicationSourceInterface addSource(String peerId) throws IOException {
335    ReplicationPeer peer = replicationPeers.getPeer(peerId);
336    ReplicationSourceInterface src = createSource(peerId, peer);
337    // synchronized on latestPaths to avoid missing the new log
338    synchronized (this.latestPaths) {
339      this.sources.put(peerId, src);
340      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
341      this.walsById.put(peerId, walsByGroup);
342      // Add the latest wal to that source's queue
343      if (!latestPaths.isEmpty()) {
344        for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
345          Path walPath = walPrefixAndPath.getValue();
346          NavigableSet<String> wals = new TreeSet<>();
347          wals.add(walPath.getName());
348          walsByGroup.put(walPrefixAndPath.getKey(), wals);
349          // Abort RS and throw exception to make add peer failed
350          abortAndThrowIOExceptionWhenFail(
351            () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
352          src.enqueueLog(walPath);
353          LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
354        }
355      }
356    }
357    src.startup();
358    return src;
359  }
360
361  /**
362   * Close the previous replication sources of this peer id and open new sources to trigger the new
363   * replication state changes or new replication config changes. Here we don't need to change
364   * replication queue storage and only to enqueue all logs to the new replication source
365   * @param peerId the id of the replication peer n
366   */
367  public void refreshSources(String peerId) throws IOException {
368    String terminateMessage = "Peer " + peerId
369      + " state or config changed. Will close the previous replication source and open a new one";
370    ReplicationPeer peer = replicationPeers.getPeer(peerId);
371    ReplicationSourceInterface src = createSource(peerId, peer);
372    // synchronized on latestPaths to avoid missing the new log
373    synchronized (this.latestPaths) {
374      ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
375      if (toRemove != null) {
376        LOG.info("Terminate replication source for " + toRemove.getPeerId());
377        // Do not clear metrics
378        toRemove.terminate(terminateMessage, null, false);
379      }
380      for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
381        walsByGroup.forEach(wal -> {
382          Path walPath = new Path(this.logDir, wal);
383          src.enqueueLog(walPath);
384          LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
385        });
386
387      }
388    }
389    LOG.info("Startup replication source for " + src.getPeerId());
390    src.startup();
391
392    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
393    // synchronized on oldsources to avoid race with NodeFailoverWorker
394    synchronized (this.oldsources) {
395      List<String> previousQueueIds = new ArrayList<>();
396      for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter
397        .hasNext();) {
398        ReplicationSourceInterface oldSource = iter.next();
399        if (oldSource.getPeerId().equals(peerId)) {
400          previousQueueIds.add(oldSource.getQueueId());
401          oldSource.terminate(terminateMessage);
402          iter.remove();
403        }
404      }
405      for (String queueId : previousQueueIds) {
406        ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
407        this.oldsources.add(recoveredReplicationSource);
408        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
409          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
410        }
411        toStartup.add(recoveredReplicationSource);
412      }
413    }
414    for (ReplicationSourceInterface replicationSource : toStartup) {
415      replicationSource.startup();
416    }
417  }
418
419  /**
420   * Clear the metrics and related replication queue of the specified old source
421   * @param src source to clear
422   */
423  void removeRecoveredSource(ReplicationSourceInterface src) {
424    LOG.info("Done with the recovered queue " + src.getQueueId());
425    this.oldsources.remove(src);
426    // Delete queue from storage and memory
427    deleteQueue(src.getQueueId());
428    this.walsByIdRecoveredQueues.remove(src.getQueueId());
429  }
430
431  /**
432   * Clear the metrics and related replication queue of the specified old source
433   * @param src source to clear
434   */
435  void removeSource(ReplicationSourceInterface src) {
436    LOG.info("Done with the queue " + src.getQueueId());
437    this.sources.remove(src.getPeerId());
438    // Delete queue from storage and memory
439    deleteQueue(src.getQueueId());
440    this.walsById.remove(src.getQueueId());
441  }
442
443  /**
444   * Delete a complete queue of wals associated with a replication source
445   * @param queueId the id of replication queue to delete
446   */
447  private void deleteQueue(String queueId) {
448    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId));
449  }
450
451  @FunctionalInterface
452  private interface ReplicationQueueOperation {
453    void exec() throws ReplicationException;
454  }
455
456  /**
457   * Refresh replication source will terminate the old source first, then the source thread will be
458   * interrupted. Need to handle it instead of abort the region server.
459   */
460  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
461    try {
462      op.exec();
463    } catch (ReplicationException e) {
464      if (
465        e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
466          && e.getCause().getCause() != null
467          && e.getCause().getCause() instanceof InterruptedException
468      ) {
469        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
470        // that thread is interrupted deep down in the stack, it should pass the following
471        // processing logic and propagate to the most top layer which can handle this exception
472        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
473        throw new ReplicationRuntimeException(
474          "Thread is interrupted, the replication source may be terminated",
475          e.getCause().getCause());
476      }
477      server.abort("Failed to operate on replication queue", e);
478    }
479  }
480
481  private void abortWhenFail(ReplicationQueueOperation op) {
482    try {
483      op.exec();
484    } catch (ReplicationException e) {
485      server.abort("Failed to operate on replication queue", e);
486    }
487  }
488
489  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
490    try {
491      op.exec();
492    } catch (ReplicationException e) {
493      throw new IOException(e);
494    }
495  }
496
497  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
498    try {
499      op.exec();
500    } catch (ReplicationException e) {
501      server.abort("Failed to operate on replication queue", e);
502      throw new IOException(e);
503    }
504  }
505
506  /**
507   * This method will log the current position to storage. And also clean old logs from the
508   * replication queue.
509   * @param entryBatch the wal entry batch we just shipped
510   */
511  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
512    WALEntryBatch entryBatch) {
513    String fileName = entryBatch.getLastWalPath().getName();
514    String queueId = source.getQueueId();
515    interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId,
516      fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
517    cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered());
518  }
519
520  /**
521   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
522   * file is closed and has no more entries.
523   * @param log            Path to the log
524   * @param inclusive      whether we should also remove the given log file
525   * @param queueId        id of the replication queue
526   * @param queueRecovered Whether this is a recovered queue
527   */
528  void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
529    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
530    if (queueRecovered) {
531      NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
532      if (wals != null) {
533        cleanOldLogs(wals, log, inclusive, queueId);
534      }
535    } else {
536      // synchronized on walsById to avoid race with preLogRoll
537      synchronized (this.walsById) {
538        NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
539        if (wals != null) {
540          cleanOldLogs(wals, log, inclusive, queueId);
541        }
542      }
543    }
544  }
545
546  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
547    NavigableSet<String> walSet = wals.headSet(key, inclusive);
548    if (walSet.isEmpty()) {
549      return;
550    }
551    LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
552    for (String wal : walSet) {
553      interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
554    }
555    walSet.clear();
556  }
557
558  // public because of we call it in TestReplicationEmptyWALRecovery
559  public void preLogRoll(Path newLog) throws IOException {
560    String logName = newLog.getName();
561    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
562    // synchronized on latestPaths to avoid the new open source miss the new log
563    synchronized (this.latestPaths) {
564      // Add log to queue storage
565      for (ReplicationSourceInterface source : this.sources.values()) {
566        // If record log to queue storage failed, abort RS and throw exception to make log roll
567        // failed
568        abortAndThrowIOExceptionWhenFail(
569          () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
570      }
571
572      // synchronized on walsById to avoid race with cleanOldLogs
573      synchronized (this.walsById) {
574        // Update walsById map
575        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
576          .entrySet()) {
577          String peerId = entry.getKey();
578          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
579          boolean existingPrefix = false;
580          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
581            SortedSet<String> wals = walsEntry.getValue();
582            if (this.sources.isEmpty()) {
583              // If there's no slaves, don't need to keep the old wals since
584              // we only consider the last one when a new slave comes in
585              wals.clear();
586            }
587            if (logPrefix.equals(walsEntry.getKey())) {
588              wals.add(logName);
589              existingPrefix = true;
590            }
591          }
592          if (!existingPrefix) {
593            // The new log belongs to a new group, add it into this peer
594            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
595            NavigableSet<String> wals = new TreeSet<>();
596            wals.add(logName);
597            walsByPrefix.put(logPrefix, wals);
598          }
599        }
600      }
601
602      // Add to latestPaths
603      latestPaths.put(logPrefix, newLog);
604    }
605  }
606
607  // public because of we call it in TestReplicationEmptyWALRecovery
608  public void postLogRoll(Path newLog) throws IOException {
609    // This only updates the sources we own, not the recovered ones
610    for (ReplicationSourceInterface source : this.sources.values()) {
611      source.enqueueLog(newLog);
612      LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog,
613        source.getQueueId());
614    }
615  }
616
617  void claimQueue(ServerName deadRS, String queue) {
618    // Wait a bit before transferring the queues, we may be shutting down.
619    // This sleep may not be enough in some cases.
620    try {
621      Thread.sleep(sleepBeforeFailover
622        + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
623    } catch (InterruptedException e) {
624      LOG.warn("Interrupted while waiting before transferring a queue.");
625      Thread.currentThread().interrupt();
626    }
627    // We try to lock that rs' queue directory
628    if (server.isStopped()) {
629      LOG.info("Not transferring queue since we are shutting down");
630      return;
631    }
632    // After claim the queues from dead region server, wewill skip to start the
633    // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a
634    // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get
635    // a copy of the replication peer first to decide whether we should start the
636    // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to
637    // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475).
638    String peerId = new ReplicationQueueInfo(queue).getPeerId();
639    ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
640    if (oldPeer == null) {
641      LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist",
642        peerId, queue);
643      return;
644    }
645    Pair<String, SortedSet<String>> claimedQueue;
646    try {
647      claimedQueue = queueStorage.claimQueue(deadRS, queue, server.getServerName());
648    } catch (ReplicationException e) {
649      LOG.error(
650        "ReplicationException: cannot claim dead region ({})'s " + "replication queue. Znode : ({})"
651          + " Possible solution: check if znode size exceeds jute.maxBuffer value. "
652          + " If so, increase it for both client and server side.",
653        deadRS, queueStorage.getRsNode(deadRS), e);
654      server.abort("Failed to claim queue from dead regionserver.", e);
655      return;
656    }
657    if (claimedQueue.getSecond().isEmpty()) {
658      return;
659    }
660    String queueId = claimedQueue.getFirst();
661    Set<String> walsSet = claimedQueue.getSecond();
662    ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
663    if (peer == null || peer != oldPeer) {
664      LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, deadRS);
665      abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
666      return;
667    }
668    if (
669      server instanceof ReplicationSyncUp.DummyServer
670        && peer.getPeerState().equals(PeerState.DISABLED)
671    ) {
672      LOG.warn(
673        "Peer {} is disabled. ReplicationSyncUp tool will skip " + "replicating data to this peer.",
674        peerId);
675      return;
676    }
677
678    ReplicationSourceInterface src;
679    try {
680      src = createSource(queueId, peer);
681    } catch (IOException e) {
682      LOG.error("Can not create replication source for peer {} and queue {}", peerId, queueId, e);
683      server.abort("Failed to create replication source after claiming queue.", e);
684      return;
685    }
686    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
687    synchronized (oldsources) {
688      peer = replicationPeers.getPeer(src.getPeerId());
689      if (peer == null || peer != oldPeer) {
690        src.terminate("Recovered queue doesn't belong to any current peer");
691        deleteQueue(queueId);
692        return;
693      }
694      // track sources in walsByIdRecoveredQueues
695      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
696      walsByIdRecoveredQueues.put(queueId, walsByGroup);
697      for (String wal : walsSet) {
698        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
699        NavigableSet<String> wals = walsByGroup.get(walPrefix);
700        if (wals == null) {
701          wals = new TreeSet<>();
702          walsByGroup.put(walPrefix, wals);
703        }
704        wals.add(wal);
705      }
706      oldsources.add(src);
707      LOG.info("Added source for recovered queue {}", src.getQueueId());
708      for (String wal : walsSet) {
709        LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
710        src.enqueueLog(new Path(oldLogDir, wal));
711      }
712      src.startup();
713    }
714  }
715
716  /**
717   * Terminate the replication on this region server
718   */
719  public void join() {
720    this.executor.shutdown();
721    for (ReplicationSourceInterface source : this.sources.values()) {
722      source.terminate("Region server is closing");
723    }
724    for (ReplicationSourceInterface source : this.oldsources) {
725      source.terminate("Region server is closing");
726    }
727  }
728
729  /**
730   * Get a copy of the wals of the normal sources on this rs
731   * @return a sorted set of wal names
732   */
733  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
734    return Collections.unmodifiableMap(walsById);
735  }
736
737  /**
738   * Get a copy of the wals of the recovered sources on this rs
739   * @return a sorted set of wal names
740   */
741  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
742    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
743  }
744
745  /**
746   * Get a list of all the normal sources of this rs
747   * @return list of all normal sources
748   */
749  public List<ReplicationSourceInterface> getSources() {
750    return new ArrayList<>(this.sources.values());
751  }
752
753  /**
754   * Get a list of all the recovered sources of this rs
755   * @return list of all recovered sources
756   */
757  public List<ReplicationSourceInterface> getOldSources() {
758    return this.oldsources;
759  }
760
761  /**
762   * Get the normal source for a given peer
763   * @return the normal source for the give peer if it exists, otherwise null.
764   */
765  public ReplicationSourceInterface getSource(String peerId) {
766    return this.sources.get(peerId);
767  }
768
769  List<String> getAllQueues() throws IOException {
770    List<String> allQueues = Collections.emptyList();
771    try {
772      allQueues = queueStorage.getAllQueues(server.getServerName());
773    } catch (ReplicationException e) {
774      throw new IOException(e);
775    }
776    return allQueues;
777  }
778
779  int getSizeOfLatestPath() {
780    synchronized (latestPaths) {
781      return latestPaths.size();
782    }
783  }
784
785  Set<Path> getLastestPath() {
786    synchronized (latestPaths) {
787      return Sets.newHashSet(latestPaths.values());
788    }
789  }
790
791  public AtomicLong getTotalBufferUsed() {
792    return totalBufferUsed;
793  }
794
795  /**
796   * Returns the maximum size in bytes of edits held in memory which are pending replication across
797   * all sources inside this RegionServer.
798   */
799  public long getTotalBufferLimit() {
800    return totalBufferLimit;
801  }
802
803  /**
804   * Get the directory where wals are archived
805   * @return the directory where wals are archived
806   */
807  public Path getOldLogDir() {
808    return this.oldLogDir;
809  }
810
811  /**
812   * Get the directory where wals are stored by their RSs
813   * @return the directory where wals are stored by their RSs
814   */
815  public Path getLogDir() {
816    return this.logDir;
817  }
818
819  /**
820   * Get the handle on the local file system
821   * @return Handle on the local file system
822   */
823  public FileSystem getFs() {
824    return this.fs;
825  }
826
827  /**
828   * Get the ReplicationPeers used by this ReplicationSourceManager
829   * @return the ReplicationPeers used by this ReplicationSourceManager
830   */
831  public ReplicationPeers getReplicationPeers() {
832    return this.replicationPeers;
833  }
834
835  /**
836   * Get a string representation of all the sources' metrics
837   */
838  public String getStats() {
839    StringBuilder stats = new StringBuilder();
840    // Print stats that apply across all Replication Sources
841    stats.append("Global stats: ");
842    stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
843      .append(getTotalBufferLimit()).append("B\n");
844    for (ReplicationSourceInterface source : this.sources.values()) {
845      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
846      stats.append(source.getStats() + "\n");
847    }
848    for (ReplicationSourceInterface oldSource : oldsources) {
849      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
850      stats.append(oldSource.getStats() + "\n");
851    }
852    return stats.toString();
853  }
854
855  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
856    throws IOException {
857    for (ReplicationSourceInterface source : this.sources.values()) {
858      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
859    }
860  }
861
862  public void cleanUpHFileRefs(String peerId, List<String> files) {
863    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
864  }
865
866  int activeFailoverTaskCount() {
867    return executor.getActiveCount();
868  }
869
870  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
871    return this.globalMetrics;
872  }
873
874  /**
875   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. Create it
876   * once only. If exists already, use the existing one.
877   * @see #removeCatalogReplicationSource(RegionInfo)
878   * @see #addSource(String) This is specialization on the addSource method.
879   */
880  public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
881    throws IOException {
882    // Poor-man's putIfAbsent
883    synchronized (this.catalogReplicationSource) {
884      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
885      return rs != null
886        ? rs
887        : this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
888    }
889  }
890
891  /**
892   * Remove the hbase:meta Catalog replication source. Called when we close hbase:meta.
893   * @see #addCatalogReplicationSource(RegionInfo regionInfo)
894   */
895  public void removeCatalogReplicationSource(RegionInfo regionInfo) {
896    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
897    // comes back to this server.
898  }
899
900  /**
901   * Create, initialize, and start the Catalog ReplicationSource. Presumes called one-time only
902   * (caller must ensure one-time only call). This ReplicationSource is NOT created via
903   * {@link ReplicationSourceFactory}.
904   * @see #addSource(String) This is a specialization of the addSource call.
905   * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
906   *      why the special handling).
907   */
908  private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
909    throws IOException {
910    // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
911    // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
912    WALProvider walProvider = this.walFactory.getMetaWALProvider();
913    boolean instantiate = walProvider == null;
914    if (instantiate) {
915      walProvider = this.walFactory.getMetaProvider();
916    }
917    // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
918    // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
919    // read replicas feature that makes use of the source does a reset on a crash of the WAL
920    // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
921    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
922    CatalogReplicationSourcePeer peer =
923      new CatalogReplicationSourcePeer(this.conf, this.clusterId.toString());
924    final ReplicationSourceInterface crs = new CatalogReplicationSource();
925    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
926      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
927    // Add listener on the provider so we can pick up the WAL to replicate on roll.
928    WALActionsListener listener = new WALActionsListener() {
929      @Override
930      public void postLogRoll(Path oldPath, Path newPath) throws IOException {
931        crs.enqueueLog(newPath);
932      }
933    };
934    walProvider.addWALActionsListener(listener);
935    if (!instantiate) {
936      // If we did not instantiate provider, need to add our listener on already-created WAL
937      // instance too (listeners are passed by provider to WAL instance on creation but if provider
938      // created already, our listener add above is missed). And add the current WAL file to the
939      // Replication Source so it can start replicating it.
940      WAL wal = walProvider.getWAL(regionInfo);
941      wal.registerWALActionsListener(listener);
942      crs.enqueueLog(((AbstractFSWAL) wal).getCurrentFileName());
943    }
944    return crs.startup();
945  }
946
947  ReplicationQueueStorage getQueueStorage() {
948    return queueStorage;
949  }
950}