001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URLEncoder;
024import java.nio.charset.StandardCharsets;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.NavigableSet;
033import java.util.OptionalLong;
034import java.util.PriorityQueue;
035import java.util.Set;
036import java.util.SortedSet;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.ThreadLocalRandom;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.Server;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.replication.ReplicationException;
056import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
057import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
058import org.apache.hadoop.hbase.replication.ReplicationPeer;
059import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
060import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
061import org.apache.hadoop.hbase.replication.ReplicationPeers;
062import org.apache.hadoop.hbase.replication.ReplicationQueueData;
063import org.apache.hadoop.hbase.replication.ReplicationQueueId;
064import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
065import org.apache.hadoop.hbase.replication.ReplicationUtils;
066import org.apache.hadoop.hbase.replication.SyncReplicationState;
067import org.apache.hadoop.hbase.util.Pair;
068import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
069import org.apache.hadoop.hbase.wal.AbstractWALProvider;
070import org.apache.hadoop.hbase.wal.WAL.Entry;
071import org.apache.hadoop.hbase.wal.WALFactory;
072import org.apache.yetus.audience.InterfaceAudience;
073import org.apache.zookeeper.KeeperException;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
078import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
079import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
080
081/**
082 * This class is responsible to manage all the replication sources. There are two classes of
083 * sources:
084 * <ul>
085 * <li>Normal sources are persistent and one per peer cluster</li>
086 * <li>Old sources are recovered from a failed region server and our only goal is to finish
087 * replicating the WAL queue it had</li>
088 * </ul>
089 * <p>
090 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
091 * in order to transfer all the queues in a local old source.
092 * <p>
093 * Synchronization specification:
094 * <ul>
095 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
096 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
097 * operations.</li>
098 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
099 * {@link #addPeer(String)}, {@link #removePeer(String)},
100 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
101 * {@link #postLogRoll(Path)}. {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer
102 * id in {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
103 * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
104 * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
105 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
106 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
107 * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
108 * {@link #postLogRoll(Path)}.</li>
109 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
110 * modify it, {@link #removePeer(String)} ,
111 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
112 * {@link #claimQueue(ReplicationQueueId)}.
113 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
114 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
115 * {@link ReplicationSourceInterface} firstly, then remove the wals from
116 * {@link #walsByIdRecoveredQueues}. And {@link #claimQueue(ReplicationQueueId)} will add the wals
117 * to {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}.
118 * So there is no race here. For {@link #claimQueue(ReplicationQueueId)} and
119 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
120 * synchronized on {@link #walsByIdRecoveredQueues}.</li>
121 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
122 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
123 * to-be-removed peer.</li>
124 * </ul>
125 */
126@InterfaceAudience.Private
127public class ReplicationSourceManager {
128
129  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
130  // all the sources that read this RS's logs and every peer only has one replication source
131  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
132  // List of all the sources we got from died RSs
133  private final List<ReplicationSourceInterface> oldsources;
134
135  /**
136   * Storage for queues that need persistance; e.g. Replication state so can be recovered after a
137   * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances
138   * for these to do updates themselves. Not all ReplicationSource instances keep state.
139   */
140  private final ReplicationQueueStorage queueStorage;
141
142  private final ReplicationPeers replicationPeers;
143  // UUID for this cluster
144  private final UUID clusterId;
145  // All about stopping
146  private final Server server;
147
148  // All logs we are currently tracking
149  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
150  private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById;
151  // Logs for recovered sources we are currently tracking
152  // the map is: queue_id->logPrefix/logGroup->logs
153  // for recovered source, the WAL files should already been moved to oldLogDir, and we have
154  // different layout of old WAL files, for example, with server name sub directories or not, so
155  // here we record the full path instead of just the name, so when refreshing we can enqueue the
156  // WAL file again, without trying to guess the real path of the WAL files.
157  private final ConcurrentMap<ReplicationQueueId,
158    Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues;
159
160  private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
161
162  private final Configuration conf;
163  private final FileSystem fs;
164  // The paths to the latest log of each wal group, for new coming peers
165  private final Map<String, Path> latestPaths;
166  // Path to the wals directories
167  private final Path logDir;
168  // Path to the wal archive
169  private final Path oldLogDir;
170  private final WALFactory walFactory;
171  // The number of ms that we wait before moving znodes, HBASE-3596
172  private final long sleepBeforeFailover;
173  // Homemade executer service for replication
174  private final ThreadPoolExecutor executor;
175
176  private AtomicLong totalBufferUsed = new AtomicLong();
177
178  // How long should we sleep for each retry when deleting remote wal files for sync replication
179  // peer.
180  private final long sleepForRetries;
181  // Maximum number of retries before taking bold actions when deleting remote wal files for sync
182  // replication peer.
183  private final int maxRetriesMultiplier;
184  // Total buffer size on this RegionServer for holding batched edits to be shipped.
185  private final long totalBufferLimit;
186  private final MetricsReplicationGlobalSourceSource globalMetrics;
187
188  /**
189   * Creates a replication manager and sets the watch on all the other registered region servers
190   * @param queueStorage the interface for manipulating replication queues
191   * @param conf         the configuration to use
192   * @param server       the server for this region server
193   * @param fs           the file system to use
194   * @param logDir       the directory that contains all wal directories of live RSs
195   * @param oldLogDir    the directory where old logs are archived
196   */
197  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
198    ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs,
199    Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory,
200    SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
201    MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
202    this.sources = new ConcurrentHashMap<>();
203    this.queueStorage = queueStorage;
204    this.replicationPeers = replicationPeers;
205    this.server = server;
206    this.walsById = new ConcurrentHashMap<>();
207    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
208    this.oldsources = new ArrayList<>();
209    this.conf = conf;
210    this.fs = fs;
211    this.logDir = logDir;
212    this.oldLogDir = oldLogDir;
213    // 30 seconds
214    this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000);
215    this.clusterId = clusterId;
216    this.walFactory = walFactory;
217    this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
218    // It's preferable to failover 1 RS at a time, but with good zk servers
219    // more could be processed at the same time.
220    int nbWorkers = conf.getInt("replication.executor.workers", 1);
221    // use a short 100ms sleep since this could be done inline with a RS startup
222    // even if we fail, other region servers can take care of it
223    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
224      new LinkedBlockingQueue<>());
225    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
226    tfb.setNameFormat("ReplicationExecutor-%d");
227    tfb.setDaemon(true);
228    this.executor.setThreadFactory(tfb.build());
229    this.latestPaths = new HashMap<>();
230    this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
231    this.maxRetriesMultiplier =
232      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
233    this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
234      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
235    this.globalMetrics = globalMetrics;
236  }
237
238  /**
239   * Adds a normal source per registered peer cluster.
240   */
241  void init() throws IOException {
242    for (String id : this.replicationPeers.getAllPeerIds()) {
243      addSource(id, true);
244    }
245  }
246
247  /**
248   * <ol>
249   * <li>Add peer to replicationPeers</li>
250   * <li>Add the normal source and related replication queue</li>
251   * <li>Add HFile Refs</li>
252   * </ol>
253   * @param peerId the id of replication peer
254   */
255  public void addPeer(String peerId) throws IOException {
256    boolean added = false;
257    try {
258      added = this.replicationPeers.addPeer(peerId);
259    } catch (ReplicationException e) {
260      throw new IOException(e);
261    }
262    if (added) {
263      addSource(peerId, false);
264    }
265  }
266
267  /**
268   * <ol>
269   * <li>Remove peer for replicationPeers</li>
270   * <li>Remove all the recovered sources for the specified id and related replication queues</li>
271   * <li>Remove the normal source and related replication queue</li>
272   * <li>Remove HFile Refs</li>
273   * </ol>
274   * @param peerId the id of the replication peer
275   */
276  public void removePeer(String peerId) {
277    ReplicationPeer peer = replicationPeers.removePeer(peerId);
278    String terminateMessage = "Replication stream was removed by a user";
279    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
280    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
281    // see NodeFailoverWorker.run
282    synchronized (this.oldsources) {
283      // First close all the recovered sources for this peer
284      for (ReplicationSourceInterface src : oldsources) {
285        if (peerId.equals(src.getPeerId())) {
286          oldSourcesToDelete.add(src);
287        }
288      }
289      for (ReplicationSourceInterface src : oldSourcesToDelete) {
290        src.terminate(terminateMessage);
291        removeRecoveredSource(src);
292      }
293    }
294    LOG.info("Number of deleted recovered sources for {}: {}", peerId, oldSourcesToDelete.size());
295    // Now close the normal source for this peer
296    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
297    if (srcToRemove != null) {
298      srcToRemove.terminate(terminateMessage);
299      removeSource(srcToRemove);
300    }
301    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
302    if (peerConfig.isSyncReplication()) {
303      syncReplicationPeerMappingManager.remove(peerId, peerConfig);
304    }
305  }
306
307  /**
308   * @return a new 'classic' user-space replication source.
309   * @param queueId the id of the replication queue to associate the ReplicationSource with.
310   * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta.
311   */
312  private ReplicationSourceInterface createSource(ReplicationQueueData queueData,
313    ReplicationPeer replicationPeer) throws IOException {
314    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueData.getId());
315    // Init the just created replication source. Pass the default walProvider's wal file length
316    // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
317    // replication, see #createCatalogReplicationSource().
318    WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null
319      ? this.walFactory.getWALProvider().getWALFileLengthProvider()
320      : p -> OptionalLong.empty();
321    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueData, clusterId,
322      walFileLengthProvider, new MetricsSource(queueData.getId().toString()));
323    return src;
324  }
325
326  /**
327   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
328   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
329   * group and do replication.
330   * <p/>
331   * We add a {@code init} parameter to indicate whether this is part of the initialization process.
332   * If so, we should skip adding the replication queues as this may introduce dead lock on region
333   * server start up and hbase:replication table online.
334   * @param peerId the id of the replication peer
335   * @param init   whether this call is part of the initialization process
336   * @return the source that was created
337   */
338  void addSource(String peerId, boolean init) throws IOException {
339    ReplicationPeer peer = replicationPeers.getPeer(peerId);
340    if (
341      ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
342        .equals(peer.getPeerConfig().getReplicationEndpointImpl())
343    ) {
344      // we do not use this endpoint for region replication any more, see HBASE-26233
345      LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
346      return;
347    }
348    ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId);
349    ReplicationSourceInterface src =
350      createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
351    // synchronized on latestPaths to avoid missing the new log
352    synchronized (this.latestPaths) {
353      this.sources.put(peerId, src);
354      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
355      this.walsById.put(queueId, walsByGroup);
356      // Add the latest wal to that source's queue
357      if (!latestPaths.isEmpty()) {
358        for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
359          Path walPath = walPrefixAndPath.getValue();
360          NavigableSet<String> wals = new TreeSet<>();
361          wals.add(walPath.getName());
362          walsByGroup.put(walPrefixAndPath.getKey(), wals);
363          if (!init) {
364            // Abort RS and throw exception to make add peer failed
365            // Ideally we'd better use the current file size as offset so we can skip replicating
366            // the data before adding replication peer, but the problem is that the file may not end
367            // at a valid entry's ending, and the current WAL Reader implementation can not deal
368            // with reading from the middle of a WAL entry. Can improve later.
369            abortAndThrowIOExceptionWhenFail(
370              () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
371                new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
372          }
373          src.enqueueLog(walPath);
374          LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
375        }
376      }
377    }
378    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
379    if (peerConfig.isSyncReplication()) {
380      syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
381    }
382    src.startup();
383  }
384
385  /**
386   * <p>
387   * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}.
388   * </p>
389   * <p>
390   * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal
391   * files for a replication peer as we do not need to replicate them any more. And this is
392   * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE}
393   * later, the stale data will be replicated again and cause inconsistency.
394   * </p>
395   * <p>
396   * See HBASE-20426 for more details.
397   * </p>
398   * @param peerId the id of the sync replication peer
399   */
400  public void drainSources(String peerId) throws IOException, ReplicationException {
401    String terminateMessage = "Sync replication peer " + peerId
402      + " is transiting to STANDBY. Will close the previous replication source and open a new one";
403    ReplicationPeer peer = replicationPeers.getPeer(peerId);
404    assert peer.getPeerConfig().isSyncReplication();
405    ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId);
406    // TODO: use empty initial offsets for now, revisit when adding support for sync replication
407    ReplicationSourceInterface src =
408      createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
409    // synchronized here to avoid race with postLogRoll where we add new log to source and also
410    // walsById.
411    ReplicationSourceInterface toRemove;
412    ReplicationQueueData queueData;
413    synchronized (latestPaths) {
414      // Here we make a copy of all the remaining wal files and then delete them from the
415      // replication queue storage after releasing the lock. It is not safe to just remove the old
416      // map from walsById since later we may fail to update the replication queue storage, and when
417      // we retry next time, we can not know the wal files that needs to be set to the replication
418      // queue storage
419      ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder();
420      synchronized (walsById) {
421        walsById.get(queueId).forEach((group, wals) -> {
422          if (!wals.isEmpty()) {
423            builder.put(group, new ReplicationGroupOffset(wals.last(), -1));
424          }
425        });
426      }
427      queueData = new ReplicationQueueData(queueId, builder.build());
428      src = createSource(queueData, peer);
429      toRemove = sources.put(peerId, src);
430      if (toRemove != null) {
431        LOG.info("Terminate replication source for " + toRemove.getPeerId());
432        toRemove.terminate(terminateMessage);
433        toRemove.getSourceMetrics().clear();
434      }
435    }
436    for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) {
437      queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap());
438    }
439    LOG.info("Startup replication source for " + src.getPeerId());
440    src.startup();
441    synchronized (walsById) {
442      Map<String, NavigableSet<String>> wals = walsById.get(queueId);
443      queueData.getOffsets().forEach((group, offset) -> {
444        NavigableSet<String> walsByGroup = wals.get(group);
445        if (walsByGroup != null) {
446          walsByGroup.headSet(offset.getWal(), true).clear();
447        }
448      });
449    }
450    // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
451    // a background task, we will delete the file from replication queue storage under the lock to
452    // simplify the logic.
453    synchronized (this.oldsources) {
454      for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) {
455        ReplicationSourceInterface oldSource = iter.next();
456        if (oldSource.getPeerId().equals(peerId)) {
457          ReplicationQueueId oldSourceQueueId = oldSource.getQueueId();
458          oldSource.terminate(terminateMessage);
459          oldSource.getSourceMetrics().clear();
460          queueStorage.removeQueue(oldSourceQueueId);
461          walsByIdRecoveredQueues.remove(oldSourceQueueId);
462          iter.remove();
463        }
464      }
465    }
466  }
467
468  private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId,
469    ReplicationPeer peer) throws IOException, ReplicationException {
470    Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
471    return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer);
472  }
473
474  /**
475   * Close the previous replication sources of this peer id and open new sources to trigger the new
476   * replication state changes or new replication config changes. Here we don't need to change
477   * replication queue storage and only to enqueue all logs to the new replication source
478   * @param peerId the id of the replication peer
479   */
480  public void refreshSources(String peerId) throws ReplicationException, IOException {
481    String terminateMessage = "Peer " + peerId
482      + " state or config changed. Will close the previous replication source and open a new one";
483    ReplicationPeer peer = replicationPeers.getPeer(peerId);
484    ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId);
485    ReplicationSourceInterface src;
486    // synchronized on latestPaths to avoid missing the new log
487    synchronized (this.latestPaths) {
488      ReplicationSourceInterface toRemove = this.sources.remove(peerId);
489      if (toRemove != null) {
490        LOG.info("Terminate replication source for " + toRemove.getPeerId());
491        // Do not clear metrics
492        toRemove.terminate(terminateMessage, null, false);
493      }
494      src = createRefreshedSource(queueId, peer);
495      this.sources.put(peerId, src);
496      for (NavigableSet<String> walsByGroup : walsById.get(queueId).values()) {
497        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
498      }
499    }
500    LOG.info("Startup replication source for " + src.getPeerId());
501    src.startup();
502
503    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
504    // synchronized on oldsources to avoid race with NodeFailoverWorker
505    synchronized (this.oldsources) {
506      List<ReplicationQueueId> oldSourceQueueIds = new ArrayList<>();
507      for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter
508        .hasNext();) {
509        ReplicationSourceInterface oldSource = iter.next();
510        if (oldSource.getPeerId().equals(peerId)) {
511          oldSourceQueueIds.add(oldSource.getQueueId());
512          oldSource.terminate(terminateMessage);
513          iter.remove();
514        }
515      }
516      for (ReplicationQueueId oldSourceQueueId : oldSourceQueueIds) {
517        ReplicationSourceInterface recoveredReplicationSource =
518          createRefreshedSource(oldSourceQueueId, peer);
519        this.oldsources.add(recoveredReplicationSource);
520        for (NavigableSet<Path> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId)
521          .values()) {
522          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal));
523        }
524        toStartup.add(recoveredReplicationSource);
525      }
526    }
527    for (ReplicationSourceInterface replicationSource : toStartup) {
528      replicationSource.startup();
529    }
530  }
531
532  /**
533   * Clear the metrics and related replication queue of the specified old source
534   * @param src source to clear
535   */
536  private boolean removeRecoveredSource(ReplicationSourceInterface src) {
537    if (!this.oldsources.remove(src)) {
538      return false;
539    }
540    LOG.info("Done with the recovered queue {}", src.getQueueId());
541    // Delete queue from storage and memory
542    deleteQueue(src.getQueueId());
543    this.walsByIdRecoveredQueues.remove(src.getQueueId());
544    return true;
545  }
546
547  void finishRecoveredSource(ReplicationSourceInterface src) {
548    synchronized (oldsources) {
549      if (!removeRecoveredSource(src)) {
550        return;
551      }
552    }
553    LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
554      src.getStats());
555  }
556
557  /**
558   * Clear the metrics and related replication queue of the specified old source
559   * @param src source to clear
560   */
561  void removeSource(ReplicationSourceInterface src) {
562    LOG.info("Done with the queue " + src.getQueueId());
563    this.sources.remove(src.getPeerId());
564    // Delete queue from storage and memory
565    deleteQueue(src.getQueueId());
566    this.walsById.remove(src.getQueueId());
567
568  }
569
570  /**
571   * Delete a complete queue of wals associated with a replication source
572   * @param queueId the id of replication queue to delete
573   */
574  private void deleteQueue(ReplicationQueueId queueId) {
575    abortWhenFail(() -> this.queueStorage.removeQueue(queueId));
576  }
577
578  @FunctionalInterface
579  private interface ReplicationQueueOperation {
580    void exec() throws ReplicationException;
581  }
582
583  /**
584   * Refresh replication source will terminate the old source first, then the source thread will be
585   * interrupted. Need to handle it instead of abort the region server.
586   */
587  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
588    try {
589      op.exec();
590    } catch (ReplicationException e) {
591      if (
592        e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
593          && e.getCause().getCause() != null
594          && e.getCause().getCause() instanceof InterruptedException
595      ) {
596        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
597        // that thread is interrupted deep down in the stack, it should pass the following
598        // processing logic and propagate to the most top layer which can handle this exception
599        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
600        throw new ReplicationRuntimeException(
601          "Thread is interrupted, the replication source may be terminated",
602          e.getCause().getCause());
603      }
604      server.abort("Failed to operate on replication queue", e);
605    }
606  }
607
608  private void abortWhenFail(ReplicationQueueOperation op) {
609    try {
610      op.exec();
611    } catch (ReplicationException e) {
612      server.abort("Failed to operate on replication queue", e);
613    }
614  }
615
616  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
617    try {
618      op.exec();
619    } catch (ReplicationException e) {
620      throw new IOException(e);
621    }
622  }
623
624  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
625    try {
626      op.exec();
627    } catch (ReplicationException e) {
628      server.abort("Failed to operate on replication queue", e);
629      throw new IOException(e);
630    }
631  }
632
633  /**
634   * This method will log the current position to storage. And also clean old logs from the
635   * replication queue.
636   * @param source     the replication source
637   * @param entryBatch the wal entry batch we just shipped
638   */
639  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
640    WALEntryBatch entryBatch) {
641    String walName = entryBatch.getLastWalPath().getName();
642    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName);
643    // if end of file, we just set the offset to -1 so we know that this file has already been fully
644    // replicated, otherwise we need to compare the file length
645    ReplicationGroupOffset offset = new ReplicationGroupOffset(walName,
646      entryBatch.isEndOfFile() ? -1 : entryBatch.getLastWalPosition());
647    interruptOrAbortWhenFail(() -> this.queueStorage.setOffset(source.getQueueId(), walPrefix,
648      offset, entryBatch.getLastSeqIds()));
649    cleanOldLogs(walName, entryBatch.isEndOfFile(), source);
650  }
651
652  /**
653   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
654   * file is closed and has no more entries.
655   * @param log       Path to the log
656   * @param inclusive whether we should also remove the given log file
657   * @param source    the replication source
658   */
659  void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
660    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
661    if (source.isRecovered()) {
662      NavigableSet<Path> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
663      if (wals != null) {
664        // here we just want to compare the timestamp, so it is OK to just create a fake WAL path
665        NavigableSet<String> walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive)
666          .stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new));
667        if (walsToRemove.isEmpty()) {
668          return;
669        }
670        cleanOldLogs(walsToRemove, source);
671        walsToRemove.clear();
672      }
673    } else {
674      NavigableSet<String> wals;
675      NavigableSet<String> walsToRemove;
676      // synchronized on walsById to avoid race with postLogRoll
677      synchronized (this.walsById) {
678        wals = walsById.get(source.getQueueId()).get(logPrefix);
679        if (wals == null) {
680          return;
681        }
682        walsToRemove = wals.headSet(log, inclusive);
683        if (walsToRemove.isEmpty()) {
684          return;
685        }
686        walsToRemove = new TreeSet<>(walsToRemove);
687      }
688      // cleanOldLogs may spend some time, especially for sync replication where we may want to
689      // remove remote wals as the remote cluster may have already been down, so we do it outside
690      // the lock to avoid block preLogRoll
691      cleanOldLogs(walsToRemove, source);
692      // now let's remove the files in the set
693      synchronized (this.walsById) {
694        wals.removeAll(walsToRemove);
695      }
696    }
697  }
698
699  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
700    throws IOException {
701    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
702    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
703    for (String wal : wals) {
704      Path walFile = new Path(remoteWALDirForPeer, wal);
705      try {
706        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
707          throw new IOException("Can not delete " + walFile);
708        }
709      } catch (FileNotFoundException e) {
710        // Just ignore since this means the file has already been deleted.
711        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
712        // inexistent file, so here we deal with both, i.e, check the return value of the
713        // FileSystem.delete, and also catch FNFE.
714        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
715      }
716    }
717  }
718
719  private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
720    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
721    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
722    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
723    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
724    // not contact with the HBase cluster either, so the replication will be blocked either.
725    if (source.isSyncReplication()) {
726      String peerId = source.getPeerId();
727      String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
728      // Filter out the wals need to be removed from the remote directory. Its name should be the
729      // special format, and also, the peer id in its name should match the peer id for the
730      // replication source.
731      List<String> remoteWals =
732        wals.stream().filter(w -> AbstractWALProvider.getSyncReplicationPeerIdFromWALName(w)
733          .map(peerId::equals).orElse(false)).collect(Collectors.toList());
734      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
735        remoteWALDir, remoteWals);
736      if (!remoteWals.isEmpty()) {
737        for (int sleepMultiplier = 0;;) {
738          try {
739            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
740            break;
741          } catch (IOException e) {
742            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
743              peerId);
744          }
745          if (!source.isSourceActive()) {
746            // skip the following operations
747            return;
748          }
749          if (
750            ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
751              sleepMultiplier, maxRetriesMultiplier)
752          ) {
753            sleepMultiplier++;
754          }
755        }
756      }
757    }
758  }
759
760  // public because of we call it in TestReplicationEmptyWALRecovery
761  public void postLogRoll(Path newLog) throws IOException {
762    String logName = newLog.getName();
763    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
764    // synchronized on latestPaths to avoid the new open source miss the new log
765    synchronized (this.latestPaths) {
766      // synchronized on walsById to avoid race with cleanOldLogs
767      synchronized (this.walsById) {
768        // Update walsById map
769        for (Map.Entry<ReplicationQueueId, Map<String, NavigableSet<String>>> entry : this.walsById
770          .entrySet()) {
771          ReplicationQueueId queueId = entry.getKey();
772          String peerId = queueId.getPeerId();
773          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
774          boolean existingPrefix = false;
775          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
776            SortedSet<String> wals = walsEntry.getValue();
777            if (this.sources.isEmpty()) {
778              // If there's no slaves, don't need to keep the old wals since
779              // we only consider the last one when a new slave comes in
780              wals.clear();
781            }
782            if (logPrefix.equals(walsEntry.getKey())) {
783              wals.add(logName);
784              existingPrefix = true;
785            }
786          }
787          if (!existingPrefix) {
788            // The new log belongs to a new group, add it into this peer
789            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
790            NavigableSet<String> wals = new TreeSet<>();
791            wals.add(logName);
792            walsByPrefix.put(logPrefix, wals);
793          }
794        }
795      }
796
797      // Add to latestPaths
798      latestPaths.put(logPrefix, newLog);
799    }
800    // This only updates the sources we own, not the recovered ones
801    for (ReplicationSourceInterface source : this.sources.values()) {
802      source.enqueueLog(newLog);
803      LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog,
804        source.getQueueId());
805    }
806  }
807
808  /**
809   * Check whether we should replicate the given {@code wal}.
810   * @param wal the file name of the wal
811   * @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
812   */
813  private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
814    // skip replicating meta wals
815    if (AbstractFSWALProvider.isMetaFile(wal)) {
816      return false;
817    }
818    return ReplicationOffsetUtil.shouldReplicate(offset, wal);
819  }
820
821  void claimQueue(ReplicationQueueId queueId) {
822    claimQueue(queueId, false);
823  }
824
825  // sorted from oldest to newest
826  private PriorityQueue<Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp,
827    Map<String, ReplicationGroupOffset> offsets) throws IOException {
828    List<Path> walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS,
829      URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
830    if (syncUp) {
831      // we also need to list WALs directory for ReplicationSyncUp
832      walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS));
833    }
834    PriorityQueue<Path> walFilesPQ =
835      new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
836    // sort the wal files and also filter out replicated files
837    for (Path file : walFiles) {
838      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
839      ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
840      if (shouldReplicate(groupOffset, file.getName())) {
841        walFilesPQ.add(file);
842      } else {
843        LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
844          groupOffset);
845      }
846    }
847    return walFilesPQ;
848  }
849
850  private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer,
851    ReplicationQueueId claimedQueueId, PriorityQueue<Path> walFiles) {
852    ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId());
853    if (peer == null || peer != oldPeer) {
854      src.terminate("Recovered queue doesn't belong to any current peer");
855      deleteQueue(claimedQueueId);
856      return;
857    }
858    // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
859    // transiting to STANDBY state. The only exception is we are in STANDBY state and
860    // transiting to DA, under this state we will replay the remote WAL and they need to be
861    // replicated back.
862    if (peer.getPeerConfig().isSyncReplication()) {
863      Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
864        peer.getSyncReplicationStateAndNewState();
865      if (
866        (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
867          && stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
868          || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
869      ) {
870        src.terminate("Sync replication peer is in STANDBY state");
871        deleteQueue(claimedQueueId);
872        return;
873      }
874    }
875    // track sources in walsByIdRecoveredQueues
876    Map<String, NavigableSet<Path>> walsByGroup = new HashMap<>();
877    walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
878    for (Path wal : walFiles) {
879      String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
880      NavigableSet<Path> wals = walsByGroup.get(walPrefix);
881      if (wals == null) {
882        wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
883        walsByGroup.put(walPrefix, wals);
884      }
885      wals.add(wal);
886    }
887    oldsources.add(src);
888    LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId,
889      walFiles.size());
890    for (Path wal : walFiles) {
891      LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
892      src.enqueueLog(wal);
893    }
894    src.startup();
895  }
896
897  /**
898   * Claim a replication queue.
899   * <p/>
900   * We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue
901   * operation, we are the last step of a SCP, so we can assume that all the WAL files are under
902   * oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a
903   * region server which has not been processed by SCP yet, so we still need to look at its WALs
904   * directory.
905   * @param queueId the replication queue id we want to claim
906   * @param syncUp  whether we are called by ReplicationSyncUp
907   */
908  void claimQueue(ReplicationQueueId queueId, boolean syncUp) {
909    // Wait a bit before transferring the queues, we may be shutting down.
910    // This sleep may not be enough in some cases.
911    try {
912      Thread.sleep(sleepBeforeFailover
913        + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
914    } catch (InterruptedException e) {
915      LOG.warn("Interrupted while waiting before transferring a queue.");
916      Thread.currentThread().interrupt();
917    }
918    // We try to lock that rs' queue directory
919    if (server.isStopped()) {
920      LOG.info("Not transferring queue since we are shutting down");
921      return;
922    }
923    // After claim the queues from dead region server, we will skip to start the
924    // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a
925    // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get
926    // a copy of the replication peer first to decide whether we should start the
927    // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to
928    // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475).
929    String peerId = queueId.getPeerId();
930    ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
931    if (oldPeer == null) {
932      LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist",
933        peerId, queueId);
934      return;
935    }
936    Map<String, ReplicationGroupOffset> offsets;
937    try {
938      offsets = queueStorage.claimQueue(queueId, server.getServerName());
939    } catch (ReplicationException e) {
940      LOG.error("ReplicationException: cannot claim dead region ({})'s replication queue",
941        queueId.getServerName(), e);
942      server.abort("Failed to claim queue from dead regionserver.", e);
943      return;
944    }
945    if (offsets.isEmpty()) {
946      // someone else claimed the queue
947      return;
948    }
949    ServerName sourceRS = queueId.getServerWALsBelongTo();
950    ReplicationQueueId claimedQueueId = queueId.claim(server.getServerName());
951    ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
952    if (peer == null || peer != oldPeer) {
953      LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, sourceRS);
954      deleteQueue(claimedQueueId);
955      return;
956    }
957    ReplicationSourceInterface src;
958    try {
959      src =
960        createSource(new ReplicationQueueData(claimedQueueId, ImmutableMap.copyOf(offsets)), peer);
961    } catch (IOException e) {
962      LOG.error("Can not create replication source for peer {} and queue {}", peerId,
963        claimedQueueId, e);
964      server.abort("Failed to create replication source after claiming queue.", e);
965      return;
966    }
967    PriorityQueue<Path> walFiles;
968    try {
969      walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets);
970    } catch (IOException e) {
971      LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e);
972      server.abort("Can not list wal files after claiming queue.", e);
973      return;
974    }
975    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
976    synchronized (oldsources) {
977      addRecoveredSource(src, oldPeer, claimedQueueId, walFiles);
978    }
979  }
980
981  /**
982   * Terminate the replication on this region server
983   */
984  public void join() {
985    this.executor.shutdown();
986    for (ReplicationSourceInterface source : this.sources.values()) {
987      source.terminate("Region server is closing");
988    }
989    synchronized (oldsources) {
990      for (ReplicationSourceInterface source : this.oldsources) {
991        source.terminate("Region server is closing");
992      }
993    }
994  }
995
996  /**
997   * Get a copy of the wals of the normal sources on this rs
998   * @return a sorted set of wal names
999   */
1000  @RestrictedApi(explanation = "Should only be called in tests", link = "",
1001      allowedOnPath = ".*/src/test/.*")
1002  public Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWALs() {
1003    return Collections.unmodifiableMap(walsById);
1004  }
1005
1006  /**
1007   * Get a list of all the normal sources of this rs
1008   * @return list of all normal sources
1009   */
1010  public List<ReplicationSourceInterface> getSources() {
1011    return new ArrayList<>(this.sources.values());
1012  }
1013
1014  /**
1015   * Get a list of all the recovered sources of this rs
1016   * @return list of all recovered sources
1017   */
1018  public List<ReplicationSourceInterface> getOldSources() {
1019    return this.oldsources;
1020  }
1021
1022  /**
1023   * Get the normal source for a given peer
1024   * @return the normal source for the give peer if it exists, otherwise null.
1025   */
1026  public ReplicationSourceInterface getSource(String peerId) {
1027    return this.sources.get(peerId);
1028  }
1029
1030  int getSizeOfLatestPath() {
1031    synchronized (latestPaths) {
1032      return latestPaths.size();
1033    }
1034  }
1035
1036  Set<Path> getLastestPath() {
1037    synchronized (latestPaths) {
1038      return Sets.newHashSet(latestPaths.values());
1039    }
1040  }
1041
1042  public long getTotalBufferUsed() {
1043    return totalBufferUsed.get();
1044  }
1045
1046  /**
1047   * Returns the maximum size in bytes of edits held in memory which are pending replication across
1048   * all sources inside this RegionServer.
1049   */
1050  public long getTotalBufferLimit() {
1051    return totalBufferLimit;
1052  }
1053
1054  /**
1055   * Get the directory where wals are archived
1056   * @return the directory where wals are archived
1057   */
1058  public Path getOldLogDir() {
1059    return this.oldLogDir;
1060  }
1061
1062  /**
1063   * Get the directory where wals are stored by their RSs
1064   * @return the directory where wals are stored by their RSs
1065   */
1066  public Path getLogDir() {
1067    return this.logDir;
1068  }
1069
1070  /**
1071   * Get the handle on the local file system
1072   * @return Handle on the local file system
1073   */
1074  public FileSystem getFs() {
1075    return this.fs;
1076  }
1077
1078  /**
1079   * Get the ReplicationPeers used by this ReplicationSourceManager
1080   * @return the ReplicationPeers used by this ReplicationSourceManager
1081   */
1082  public ReplicationPeers getReplicationPeers() {
1083    return this.replicationPeers;
1084  }
1085
1086  /**
1087   * Get a string representation of all the sources' metrics
1088   */
1089  public String getStats() {
1090    StringBuilder stats = new StringBuilder();
1091    // Print stats that apply across all Replication Sources
1092    stats.append("Global stats: ");
1093    stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=")
1094      .append(getTotalBufferLimit()).append("B\n");
1095    for (ReplicationSourceInterface source : this.sources.values()) {
1096      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
1097      stats.append(source.getStats() + "\n");
1098    }
1099    for (ReplicationSourceInterface oldSource : oldsources) {
1100      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
1101      stats.append(oldSource.getStats() + "\n");
1102    }
1103    return stats.toString();
1104  }
1105
1106  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
1107    throws IOException {
1108    for (ReplicationSourceInterface source : this.sources.values()) {
1109      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
1110    }
1111  }
1112
1113  public void cleanUpHFileRefs(String peerId, List<String> files) {
1114    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
1115  }
1116
1117  int activeFailoverTaskCount() {
1118    return executor.getActiveCount();
1119  }
1120
1121  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
1122    return this.globalMetrics;
1123  }
1124
1125  ReplicationQueueStorage getQueueStorage() {
1126    return queueStorage;
1127  }
1128
1129  /**
1130   * Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}.
1131   * @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer
1132   *              quota.
1133   * @return true if we should clear buffer and push all
1134   */
1135  boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) {
1136    long entrySize = walEntryBatch.incrementUsedBufferSize(entry);
1137    return this.acquireBufferQuota(entrySize);
1138  }
1139
1140  /**
1141   * To release the buffer quota of {@link WALEntryBatch} which acquired by
1142   * {@link ReplicationSourceManager#acquireWALEntryBufferQuota}.
1143   * @return the released buffer quota size.
1144   */
1145  long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) {
1146    long usedBufferSize = walEntryBatch.getUsedBufferSize();
1147    if (usedBufferSize > 0) {
1148      this.releaseBufferQuota(usedBufferSize);
1149    }
1150    return usedBufferSize;
1151  }
1152
1153  /**
1154   * Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds
1155   * {@link ReplicationSourceManager#totalBufferLimit}.
1156   * @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds
1157   *         {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and
1158   *         ship all.
1159   */
1160  boolean acquireBufferQuota(long size) {
1161    if (size < 0) {
1162      throw new IllegalArgumentException("size should not less than 0");
1163    }
1164    long newBufferUsed = addTotalBufferUsed(size);
1165    return newBufferUsed >= totalBufferLimit;
1166  }
1167
1168  /**
1169   * To release the buffer quota which acquired by
1170   * {@link ReplicationSourceManager#acquireBufferQuota}.
1171   */
1172  void releaseBufferQuota(long size) {
1173    if (size < 0) {
1174      throw new IllegalArgumentException("size should not less than 0");
1175    }
1176    addTotalBufferUsed(-size);
1177  }
1178
1179  private long addTotalBufferUsed(long size) {
1180    if (size == 0) {
1181      return totalBufferUsed.get();
1182    }
1183    long newBufferUsed = totalBufferUsed.addAndGet(size);
1184    // Record the new buffer usage
1185    this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed);
1186    return newBufferUsed;
1187  }
1188
1189  /**
1190   * Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds
1191   * {@link ReplicationSourceManager#totalBufferLimit} for peer.
1192   * @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than
1193   *         {@link ReplicationSourceManager#totalBufferLimit}.
1194   */
1195  boolean checkBufferQuota(String peerId) {
1196    // try not to go over total quota
1197    if (totalBufferUsed.get() > totalBufferLimit) {
1198      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
1199        peerId, totalBufferUsed.get(), totalBufferLimit);
1200      return false;
1201    }
1202    return true;
1203  }
1204}