001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URLEncoder;
024import java.nio.charset.StandardCharsets;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.NavigableSet;
033import java.util.OptionalLong;
034import java.util.PriorityQueue;
035import java.util.Set;
036import java.util.SortedSet;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.ThreadLocalRandom;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.stream.Collectors;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.CompoundConfiguration;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.Server;
053import org.apache.hadoop.hbase.ServerName;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.replication.ReplicationException;
057import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
058import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
059import org.apache.hadoop.hbase.replication.ReplicationPeer;
060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
061import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
062import org.apache.hadoop.hbase.replication.ReplicationPeers;
063import org.apache.hadoop.hbase.replication.ReplicationQueueData;
064import org.apache.hadoop.hbase.replication.ReplicationQueueId;
065import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
066import org.apache.hadoop.hbase.replication.ReplicationUtils;
067import org.apache.hadoop.hbase.replication.SyncReplicationState;
068import org.apache.hadoop.hbase.util.Pair;
069import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
070import org.apache.hadoop.hbase.wal.AbstractWALProvider;
071import org.apache.hadoop.hbase.wal.WAL.Entry;
072import org.apache.hadoop.hbase.wal.WALFactory;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.apache.zookeeper.KeeperException;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
079import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
080import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
081
082/**
083 * This class is responsible to manage all the replication sources. There are two classes of
084 * sources:
085 * <ul>
086 * <li>Normal sources are persistent and one per peer cluster</li>
087 * <li>Old sources are recovered from a failed region server and our only goal is to finish
088 * replicating the WAL queue it had</li>
089 * </ul>
090 * <p>
091 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
092 * in order to transfer all the queues in a local old source.
093 * <p>
094 * Synchronization specification:
095 * <ul>
096 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
097 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
098 * operations.</li>
099 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
100 * {@link #addPeer(String)}, {@link #removePeer(String)},
101 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
102 * {@link #postLogRoll(Path)}. {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer
103 * id in {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
104 * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
105 * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
106 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
107 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
108 * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
109 * {@link #postLogRoll(Path)}.</li>
110 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
111 * modify it, {@link #removePeer(String)} ,
112 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
113 * {@link #claimQueue(ReplicationQueueId)}.
114 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
115 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
116 * {@link ReplicationSourceInterface} firstly, then remove the wals from
117 * {@link #walsByIdRecoveredQueues}. And {@link #claimQueue(ReplicationQueueId)} will add the wals
118 * to {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}.
119 * So there is no race here. For {@link #claimQueue(ReplicationQueueId)} and
120 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
121 * synchronized on {@link #walsByIdRecoveredQueues}.</li>
122 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
123 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
124 * to-be-removed peer.</li>
125 * </ul>
126 */
127@InterfaceAudience.Private
128public class ReplicationSourceManager {
129
130  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
131  // all the sources that read this RS's logs and every peer only has one replication source
132  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
133  // List of all the sources we got from died RSs
134  private final List<ReplicationSourceInterface> oldsources;
135
136  /**
137   * Storage for queues that need persistance; e.g. Replication state so can be recovered after a
138   * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances
139   * for these to do updates themselves. Not all ReplicationSource instances keep state.
140   */
141  private final ReplicationQueueStorage queueStorage;
142
143  private final ReplicationPeers replicationPeers;
144  // UUID for this cluster
145  private final UUID clusterId;
146  // All about stopping
147  private final Server server;
148
149  // All logs we are currently tracking
150  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
151  private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById;
152  // Logs for recovered sources we are currently tracking
153  // the map is: queue_id->logPrefix/logGroup->logs
154  // for recovered source, the WAL files should already been moved to oldLogDir, and we have
155  // different layout of old WAL files, for example, with server name sub directories or not, so
156  // here we record the full path instead of just the name, so when refreshing we can enqueue the
157  // WAL file again, without trying to guess the real path of the WAL files.
158  private final ConcurrentMap<ReplicationQueueId,
159    Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues;
160
161  private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
162
163  private final Configuration conf;
164  private final FileSystem fs;
165  // The paths to the latest log of each wal group, for new coming peers
166  private final Map<String, Path> latestPaths;
167  // Path to the wals directories
168  private final Path logDir;
169  // Path to the wal archive
170  private final Path oldLogDir;
171  private final WALFactory walFactory;
172  // The number of ms that we wait before moving znodes, HBASE-3596
173  private final long sleepBeforeFailover;
174  // Homemade executer service for replication
175  private final ThreadPoolExecutor executor;
176
177  private AtomicLong totalBufferUsed = new AtomicLong();
178
179  // How long should we sleep for each retry when deleting remote wal files for sync replication
180  // peer.
181  private final long sleepForRetries;
182  // Maximum number of retries before taking bold actions when deleting remote wal files for sync
183  // replication peer.
184  private final int maxRetriesMultiplier;
185  // Total buffer size on this RegionServer for holding batched edits to be shipped.
186  private final long totalBufferLimit;
187  private final MetricsReplicationGlobalSourceSource globalMetrics;
188
189  /**
190   * Creates a replication manager and sets the watch on all the other registered region servers
191   * @param queueStorage the interface for manipulating replication queues
192   * @param conf         the configuration to use
193   * @param server       the server for this region server
194   * @param fs           the file system to use
195   * @param logDir       the directory that contains all wal directories of live RSs
196   * @param oldLogDir    the directory where old logs are archived
197   */
198  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
199    ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs,
200    Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory,
201    SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
202    MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
203    this.sources = new ConcurrentHashMap<>();
204    this.queueStorage = queueStorage;
205    this.replicationPeers = replicationPeers;
206    this.server = server;
207    this.walsById = new ConcurrentHashMap<>();
208    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
209    this.oldsources = new ArrayList<>();
210    this.conf = conf;
211    this.fs = fs;
212    this.logDir = logDir;
213    this.oldLogDir = oldLogDir;
214    // 30 seconds
215    this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000);
216    this.clusterId = clusterId;
217    this.walFactory = walFactory;
218    this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
219    // It's preferable to failover 1 RS at a time, but with good zk servers
220    // more could be processed at the same time.
221    int nbWorkers = conf.getInt("replication.executor.workers", 1);
222    // use a short 100ms sleep since this could be done inline with a RS startup
223    // even if we fail, other region servers can take care of it
224    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
225      new LinkedBlockingQueue<>());
226    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
227    tfb.setNameFormat("ReplicationExecutor-%d");
228    tfb.setDaemon(true);
229    this.executor.setThreadFactory(tfb.build());
230    this.latestPaths = new HashMap<>();
231    this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
232    this.maxRetriesMultiplier =
233      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
234    this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
235      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
236    this.globalMetrics = globalMetrics;
237  }
238
239  /**
240   * Adds a normal source per registered peer cluster.
241   */
242  void init() throws IOException {
243    for (String id : this.replicationPeers.getAllPeerIds()) {
244      addSource(id, true);
245    }
246  }
247
248  /**
249   * <ol>
250   * <li>Add peer to replicationPeers</li>
251   * <li>Add the normal source and related replication queue</li>
252   * <li>Add HFile Refs</li>
253   * </ol>
254   * @param peerId the id of replication peer
255   */
256  public void addPeer(String peerId) throws IOException {
257    boolean added = false;
258    try {
259      added = this.replicationPeers.addPeer(peerId);
260    } catch (ReplicationException e) {
261      throw new IOException(e);
262    }
263    if (added) {
264      addSource(peerId, false);
265    }
266  }
267
268  /**
269   * <ol>
270   * <li>Remove peer for replicationPeers</li>
271   * <li>Remove all the recovered sources for the specified id and related replication queues</li>
272   * <li>Remove the normal source and related replication queue</li>
273   * <li>Remove HFile Refs</li>
274   * </ol>
275   * @param peerId the id of the replication peer
276   */
277  public void removePeer(String peerId) {
278    ReplicationPeer peer = replicationPeers.removePeer(peerId);
279    String terminateMessage = "Replication stream was removed by a user";
280    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
281    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
282    // see NodeFailoverWorker.run
283    synchronized (this.oldsources) {
284      // First close all the recovered sources for this peer
285      for (ReplicationSourceInterface src : oldsources) {
286        if (peerId.equals(src.getPeerId())) {
287          oldSourcesToDelete.add(src);
288        }
289      }
290      for (ReplicationSourceInterface src : oldSourcesToDelete) {
291        src.terminate(terminateMessage);
292        removeRecoveredSource(src);
293      }
294    }
295    LOG.info("Number of deleted recovered sources for {}: {}", peerId, oldSourcesToDelete.size());
296    // Now close the normal source for this peer
297    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
298    if (srcToRemove != null) {
299      srcToRemove.terminate(terminateMessage);
300      removeSource(srcToRemove);
301    }
302    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
303    if (peerConfig.isSyncReplication()) {
304      syncReplicationPeerMappingManager.remove(peerId, peerConfig);
305    }
306  }
307
308  /**
309   * @return a new 'classic' user-space replication source.
310   * @param queueId the id of the replication queue to associate the ReplicationSource with.
311   * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta.
312   */
313  private ReplicationSourceInterface createSource(ReplicationQueueData queueData,
314    ReplicationPeer replicationPeer) throws IOException {
315    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueData.getId());
316    // Init the just created replication source. Pass the default walProvider's wal file length
317    // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
318    // replication, see #createCatalogReplicationSource().
319    WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null
320      ? this.walFactory.getWALProvider().getWALFileLengthProvider()
321      : p -> OptionalLong.empty();
322
323    // Create merged configuration with peer overrides as higher priority and
324    // global config as lower priority
325    Configuration mergedConf = conf;
326    if (!replicationPeer.getPeerConfig().getConfiguration().isEmpty()) {
327      CompoundConfiguration compound = new CompoundConfiguration();
328      compound.add(conf);
329      compound.addStringMap(replicationPeer.getPeerConfig().getConfiguration());
330      mergedConf = compound;
331    }
332
333    src.init(mergedConf, fs, this, queueStorage, replicationPeer, server, queueData, clusterId,
334      walFileLengthProvider, new MetricsSource(queueData.getId().toString()));
335    return src;
336  }
337
338  /**
339   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
340   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
341   * group and do replication.
342   * <p/>
343   * We add a {@code init} parameter to indicate whether this is part of the initialization process.
344   * If so, we should skip adding the replication queues as this may introduce dead lock on region
345   * server start up and hbase:replication table online.
346   * @param peerId the id of the replication peer
347   * @param init   whether this call is part of the initialization process
348   * @return the source that was created
349   */
350  void addSource(String peerId, boolean init) throws IOException {
351    ReplicationPeer peer = replicationPeers.getPeer(peerId);
352    if (
353      ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
354        .equals(peer.getPeerConfig().getReplicationEndpointImpl())
355    ) {
356      // we do not use this endpoint for region replication any more, see HBASE-26233
357      LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
358      return;
359    }
360    ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId);
361    ReplicationSourceInterface src =
362      createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
363    // synchronized on latestPaths to avoid missing the new log
364    synchronized (this.latestPaths) {
365      this.sources.put(peerId, src);
366      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
367      this.walsById.put(queueId, walsByGroup);
368      // Add the latest wal to that source's queue
369      if (!latestPaths.isEmpty()) {
370        for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
371          Path walPath = walPrefixAndPath.getValue();
372          NavigableSet<String> wals = new TreeSet<>();
373          wals.add(walPath.getName());
374          walsByGroup.put(walPrefixAndPath.getKey(), wals);
375          if (!init) {
376            // Abort RS and throw exception to make add peer failed
377            // Ideally we'd better use the current file size as offset so we can skip replicating
378            // the data before adding replication peer, but the problem is that the file may not end
379            // at a valid entry's ending, and the current WAL Reader implementation can not deal
380            // with reading from the middle of a WAL entry. Can improve later.
381            abortAndThrowIOExceptionWhenFail(
382              () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
383                new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
384          }
385          src.enqueueLog(walPath);
386          LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
387        }
388      }
389    }
390    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
391    if (peerConfig.isSyncReplication()) {
392      syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
393    }
394    src.startup();
395  }
396
397  /**
398   * <p>
399   * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}.
400   * </p>
401   * <p>
402   * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal
403   * files for a replication peer as we do not need to replicate them any more. And this is
404   * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE}
405   * later, the stale data will be replicated again and cause inconsistency.
406   * </p>
407   * <p>
408   * See HBASE-20426 for more details.
409   * </p>
410   * @param peerId the id of the sync replication peer
411   */
412  public void drainSources(String peerId) throws IOException, ReplicationException {
413    String terminateMessage = "Sync replication peer " + peerId
414      + " is transiting to STANDBY. Will close the previous replication source and open a new one";
415    ReplicationPeer peer = replicationPeers.getPeer(peerId);
416    assert peer.getPeerConfig().isSyncReplication();
417    ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId);
418    // TODO: use empty initial offsets for now, revisit when adding support for sync replication
419    ReplicationSourceInterface src =
420      createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
421    // synchronized here to avoid race with postLogRoll where we add new log to source and also
422    // walsById.
423    ReplicationSourceInterface toRemove;
424    ReplicationQueueData queueData;
425    synchronized (latestPaths) {
426      // Here we make a copy of all the remaining wal files and then delete them from the
427      // replication queue storage after releasing the lock. It is not safe to just remove the old
428      // map from walsById since later we may fail to update the replication queue storage, and when
429      // we retry next time, we can not know the wal files that needs to be set to the replication
430      // queue storage
431      ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder();
432      synchronized (walsById) {
433        walsById.get(queueId).forEach((group, wals) -> {
434          if (!wals.isEmpty()) {
435            builder.put(group, new ReplicationGroupOffset(wals.last(), -1));
436          }
437        });
438      }
439      queueData = new ReplicationQueueData(queueId, builder.build());
440      src = createSource(queueData, peer);
441      toRemove = sources.put(peerId, src);
442      if (toRemove != null) {
443        LOG.info("Terminate replication source for " + toRemove.getPeerId());
444        toRemove.terminate(terminateMessage);
445        toRemove.getSourceMetrics().clear();
446      }
447    }
448    for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) {
449      queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap());
450    }
451    LOG.info("Startup replication source for " + src.getPeerId());
452    src.startup();
453    synchronized (walsById) {
454      Map<String, NavigableSet<String>> wals = walsById.get(queueId);
455      queueData.getOffsets().forEach((group, offset) -> {
456        NavigableSet<String> walsByGroup = wals.get(group);
457        if (walsByGroup != null) {
458          walsByGroup.headSet(offset.getWal(), true).clear();
459        }
460      });
461    }
462    // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
463    // a background task, we will delete the file from replication queue storage under the lock to
464    // simplify the logic.
465    synchronized (this.oldsources) {
466      for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) {
467        ReplicationSourceInterface oldSource = iter.next();
468        if (oldSource.getPeerId().equals(peerId)) {
469          ReplicationQueueId oldSourceQueueId = oldSource.getQueueId();
470          oldSource.terminate(terminateMessage);
471          oldSource.getSourceMetrics().clear();
472          queueStorage.removeQueue(oldSourceQueueId);
473          walsByIdRecoveredQueues.remove(oldSourceQueueId);
474          iter.remove();
475        }
476      }
477    }
478  }
479
480  private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId,
481    ReplicationPeer peer) throws IOException, ReplicationException {
482    Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
483    return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer);
484  }
485
486  /**
487   * Close the previous replication sources of this peer id and open new sources to trigger the new
488   * replication state changes or new replication config changes. Here we don't need to change
489   * replication queue storage and only to enqueue all logs to the new replication source
490   * @param peerId the id of the replication peer
491   */
492  public void refreshSources(String peerId) throws ReplicationException, IOException {
493    String terminateMessage = "Peer " + peerId
494      + " state or config changed. Will close the previous replication source and open a new one";
495    ReplicationPeer peer = replicationPeers.getPeer(peerId);
496    ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId);
497    ReplicationSourceInterface src;
498    // synchronized on latestPaths to avoid missing the new log
499    synchronized (this.latestPaths) {
500      ReplicationSourceInterface toRemove = this.sources.remove(peerId);
501      if (toRemove != null) {
502        LOG.info("Terminate replication source for " + toRemove.getPeerId());
503        // Do not clear metrics
504        toRemove.terminate(terminateMessage, null, false);
505      }
506      src = createRefreshedSource(queueId, peer);
507      this.sources.put(peerId, src);
508      for (NavigableSet<String> walsByGroup : walsById.get(queueId).values()) {
509        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
510      }
511    }
512    LOG.info("Startup replication source for " + src.getPeerId());
513    src.startup();
514
515    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
516    // synchronized on oldsources to avoid race with NodeFailoverWorker
517    synchronized (this.oldsources) {
518      List<ReplicationQueueId> oldSourceQueueIds = new ArrayList<>();
519      for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter
520        .hasNext();) {
521        ReplicationSourceInterface oldSource = iter.next();
522        if (oldSource.getPeerId().equals(peerId)) {
523          oldSourceQueueIds.add(oldSource.getQueueId());
524          oldSource.terminate(terminateMessage);
525          iter.remove();
526        }
527      }
528      for (ReplicationQueueId oldSourceQueueId : oldSourceQueueIds) {
529        ReplicationSourceInterface recoveredReplicationSource =
530          createRefreshedSource(oldSourceQueueId, peer);
531        this.oldsources.add(recoveredReplicationSource);
532        for (NavigableSet<Path> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId)
533          .values()) {
534          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal));
535        }
536        toStartup.add(recoveredReplicationSource);
537      }
538    }
539    for (ReplicationSourceInterface replicationSource : toStartup) {
540      replicationSource.startup();
541    }
542  }
543
544  /**
545   * Clear the metrics and related replication queue of the specified old source
546   * @param src source to clear
547   */
548  private boolean removeRecoveredSource(ReplicationSourceInterface src) {
549    if (!this.oldsources.remove(src)) {
550      return false;
551    }
552    LOG.info("Done with the recovered queue {}", src.getQueueId());
553    // Delete queue from storage and memory
554    deleteQueue(src.getQueueId());
555    this.walsByIdRecoveredQueues.remove(src.getQueueId());
556    return true;
557  }
558
559  void finishRecoveredSource(ReplicationSourceInterface src) {
560    synchronized (oldsources) {
561      if (!removeRecoveredSource(src)) {
562        return;
563      }
564    }
565    LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
566      src.getStats());
567  }
568
569  /**
570   * Clear the metrics and related replication queue of the specified old source
571   * @param src source to clear
572   */
573  void removeSource(ReplicationSourceInterface src) {
574    LOG.info("Done with the queue " + src.getQueueId());
575    this.sources.remove(src.getPeerId());
576    // Delete queue from storage and memory
577    deleteQueue(src.getQueueId());
578    this.walsById.remove(src.getQueueId());
579
580  }
581
582  /**
583   * Delete a complete queue of wals associated with a replication source
584   * @param queueId the id of replication queue to delete
585   */
586  private void deleteQueue(ReplicationQueueId queueId) {
587    abortWhenFail(() -> this.queueStorage.removeQueue(queueId));
588  }
589
590  @FunctionalInterface
591  private interface ReplicationQueueOperation {
592    void exec() throws ReplicationException;
593  }
594
595  /**
596   * Refresh replication source will terminate the old source first, then the source thread will be
597   * interrupted. Need to handle it instead of abort the region server.
598   */
599  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
600    try {
601      op.exec();
602    } catch (ReplicationException e) {
603      if (
604        e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
605          && e.getCause().getCause() != null
606          && e.getCause().getCause() instanceof InterruptedException
607      ) {
608        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
609        // that thread is interrupted deep down in the stack, it should pass the following
610        // processing logic and propagate to the most top layer which can handle this exception
611        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
612        throw new ReplicationRuntimeException(
613          "Thread is interrupted, the replication source may be terminated",
614          e.getCause().getCause());
615      }
616      server.abort("Failed to operate on replication queue", e);
617    }
618  }
619
620  private void abortWhenFail(ReplicationQueueOperation op) {
621    try {
622      op.exec();
623    } catch (ReplicationException e) {
624      server.abort("Failed to operate on replication queue", e);
625    }
626  }
627
628  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
629    try {
630      op.exec();
631    } catch (ReplicationException e) {
632      throw new IOException(e);
633    }
634  }
635
636  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
637    try {
638      op.exec();
639    } catch (ReplicationException e) {
640      server.abort("Failed to operate on replication queue", e);
641      throw new IOException(e);
642    }
643  }
644
645  /**
646   * This method will log the current position to storage. And also clean old logs from the
647   * replication queue.
648   * @param source     the replication source
649   * @param entryBatch the wal entry batch we just shipped
650   */
651  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
652    WALEntryBatch entryBatch) {
653    String walName = entryBatch.getLastWalPath().getName();
654    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName);
655    // if end of file, we just set the offset to -1 so we know that this file has already been fully
656    // replicated, otherwise we need to compare the file length
657    ReplicationGroupOffset offset = new ReplicationGroupOffset(walName,
658      entryBatch.isEndOfFile() ? -1 : entryBatch.getLastWalPosition());
659    interruptOrAbortWhenFail(() -> this.queueStorage.setOffset(source.getQueueId(), walPrefix,
660      offset, entryBatch.getLastSeqIds()));
661    cleanOldLogs(walName, entryBatch.isEndOfFile(), source);
662  }
663
664  /**
665   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
666   * file is closed and has no more entries.
667   * @param log       Path to the log
668   * @param inclusive whether we should also remove the given log file
669   * @param source    the replication source
670   */
671  void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
672    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
673    if (source.isRecovered()) {
674      NavigableSet<Path> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
675      if (wals != null) {
676        // here we just want to compare the timestamp, so it is OK to just create a fake WAL path
677        NavigableSet<String> walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive)
678          .stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new));
679        if (walsToRemove.isEmpty()) {
680          return;
681        }
682        cleanOldLogs(walsToRemove, source);
683        walsToRemove.clear();
684      }
685    } else {
686      NavigableSet<String> wals;
687      NavigableSet<String> walsToRemove;
688      // synchronized on walsById to avoid race with postLogRoll
689      synchronized (this.walsById) {
690        wals = walsById.get(source.getQueueId()).get(logPrefix);
691        if (wals == null) {
692          return;
693        }
694        walsToRemove = wals.headSet(log, inclusive);
695        if (walsToRemove.isEmpty()) {
696          return;
697        }
698        walsToRemove = new TreeSet<>(walsToRemove);
699      }
700      // cleanOldLogs may spend some time, especially for sync replication where we may want to
701      // remove remote wals as the remote cluster may have already been down, so we do it outside
702      // the lock to avoid block preLogRoll
703      cleanOldLogs(walsToRemove, source);
704      // now let's remove the files in the set
705      synchronized (this.walsById) {
706        wals.removeAll(walsToRemove);
707      }
708    }
709  }
710
711  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
712    throws IOException {
713    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
714    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
715    for (String wal : wals) {
716      Path walFile = new Path(remoteWALDirForPeer, wal);
717      try {
718        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
719          throw new IOException("Can not delete " + walFile);
720        }
721      } catch (FileNotFoundException e) {
722        // Just ignore since this means the file has already been deleted.
723        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
724        // inexistent file, so here we deal with both, i.e, check the return value of the
725        // FileSystem.delete, and also catch FNFE.
726        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
727      }
728    }
729  }
730
731  private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
732    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
733    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
734    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
735    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
736    // not contact with the HBase cluster either, so the replication will be blocked either.
737    if (source.isSyncReplication()) {
738      String peerId = source.getPeerId();
739      String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
740      // Filter out the wals need to be removed from the remote directory. Its name should be the
741      // special format, and also, the peer id in its name should match the peer id for the
742      // replication source.
743      List<String> remoteWals =
744        wals.stream().filter(w -> AbstractWALProvider.getSyncReplicationPeerIdFromWALName(w)
745          .map(peerId::equals).orElse(false)).collect(Collectors.toList());
746      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
747        remoteWALDir, remoteWals);
748      if (!remoteWals.isEmpty()) {
749        for (int sleepMultiplier = 0;;) {
750          try {
751            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
752            break;
753          } catch (IOException e) {
754            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
755              peerId);
756          }
757          if (!source.isSourceActive()) {
758            // skip the following operations
759            return;
760          }
761          if (
762            ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
763              sleepMultiplier, maxRetriesMultiplier)
764          ) {
765            sleepMultiplier++;
766          }
767        }
768      }
769    }
770  }
771
772  // public because of we call it in TestReplicationEmptyWALRecovery
773  public void postLogRoll(Path newLog) throws IOException {
774    String logName = newLog.getName();
775    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
776    // synchronized on latestPaths to avoid the new open source miss the new log
777    synchronized (this.latestPaths) {
778      // synchronized on walsById to avoid race with cleanOldLogs
779      synchronized (this.walsById) {
780        // Update walsById map
781        for (Map.Entry<ReplicationQueueId, Map<String, NavigableSet<String>>> entry : this.walsById
782          .entrySet()) {
783          ReplicationQueueId queueId = entry.getKey();
784          String peerId = queueId.getPeerId();
785          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
786          boolean existingPrefix = false;
787          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
788            SortedSet<String> wals = walsEntry.getValue();
789            if (this.sources.isEmpty()) {
790              // If there's no slaves, don't need to keep the old wals since
791              // we only consider the last one when a new slave comes in
792              wals.clear();
793            }
794            if (logPrefix.equals(walsEntry.getKey())) {
795              wals.add(logName);
796              existingPrefix = true;
797            }
798          }
799          if (!existingPrefix) {
800            // The new log belongs to a new group, add it into this peer
801            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
802            NavigableSet<String> wals = new TreeSet<>();
803            wals.add(logName);
804            walsByPrefix.put(logPrefix, wals);
805          }
806        }
807      }
808
809      // Add to latestPaths
810      latestPaths.put(logPrefix, newLog);
811    }
812    // This only updates the sources we own, not the recovered ones
813    for (ReplicationSourceInterface source : this.sources.values()) {
814      source.enqueueLog(newLog);
815      LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog,
816        source.getQueueId());
817    }
818  }
819
820  /**
821   * Check whether we should replicate the given {@code wal}.
822   * @param wal the file name of the wal
823   * @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
824   */
825  private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
826    // skip replicating meta wals
827    if (AbstractFSWALProvider.isMetaFile(wal)) {
828      return false;
829    }
830    return ReplicationOffsetUtil.shouldReplicate(offset, wal);
831  }
832
833  void claimQueue(ReplicationQueueId queueId) {
834    claimQueue(queueId, false);
835  }
836
837  // sorted from oldest to newest
838  private PriorityQueue<Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp,
839    Map<String, ReplicationGroupOffset> offsets) throws IOException {
840    List<Path> walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS,
841      URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
842    if (syncUp) {
843      // we also need to list WALs directory for ReplicationSyncUp
844      walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS));
845    }
846    PriorityQueue<Path> walFilesPQ =
847      new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
848    // sort the wal files and also filter out replicated files
849    for (Path file : walFiles) {
850      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
851      ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
852      if (shouldReplicate(groupOffset, file.getName())) {
853        walFilesPQ.add(file);
854      } else {
855        LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
856          groupOffset);
857      }
858    }
859    return walFilesPQ;
860  }
861
862  private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer,
863    ReplicationQueueId claimedQueueId, PriorityQueue<Path> walFiles) {
864    ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId());
865    if (peer == null || peer != oldPeer) {
866      src.terminate("Recovered queue doesn't belong to any current peer");
867      deleteQueue(claimedQueueId);
868      return;
869    }
870    // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
871    // transiting to STANDBY state. The only exception is we are in STANDBY state and
872    // transiting to DA, under this state we will replay the remote WAL and they need to be
873    // replicated back.
874    if (peer.getPeerConfig().isSyncReplication()) {
875      Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
876        peer.getSyncReplicationStateAndNewState();
877      if (
878        (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
879          && stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
880          || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
881      ) {
882        src.terminate("Sync replication peer is in STANDBY state");
883        deleteQueue(claimedQueueId);
884        return;
885      }
886    }
887    // track sources in walsByIdRecoveredQueues
888    Map<String, NavigableSet<Path>> walsByGroup = new HashMap<>();
889    walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
890    for (Path wal : walFiles) {
891      String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
892      NavigableSet<Path> wals = walsByGroup.get(walPrefix);
893      if (wals == null) {
894        wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
895        walsByGroup.put(walPrefix, wals);
896      }
897      wals.add(wal);
898    }
899    oldsources.add(src);
900    LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId,
901      walFiles.size());
902    for (Path wal : walFiles) {
903      LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
904      src.enqueueLog(wal);
905    }
906    src.startup();
907  }
908
909  /**
910   * Claim a replication queue.
911   * <p/>
912   * We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue
913   * operation, we are the last step of a SCP, so we can assume that all the WAL files are under
914   * oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a
915   * region server which has not been processed by SCP yet, so we still need to look at its WALs
916   * directory.
917   * @param queueId the replication queue id we want to claim
918   * @param syncUp  whether we are called by ReplicationSyncUp
919   */
920  void claimQueue(ReplicationQueueId queueId, boolean syncUp) {
921    // Wait a bit before transferring the queues, we may be shutting down.
922    // This sleep may not be enough in some cases.
923    try {
924      Thread.sleep(sleepBeforeFailover
925        + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
926    } catch (InterruptedException e) {
927      LOG.warn("Interrupted while waiting before transferring a queue.");
928      Thread.currentThread().interrupt();
929    }
930    // We try to lock that rs' queue directory
931    if (server.isStopped()) {
932      LOG.info("Not transferring queue since we are shutting down");
933      return;
934    }
935    // After claim the queues from dead region server, we will skip to start the
936    // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a
937    // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get
938    // a copy of the replication peer first to decide whether we should start the
939    // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to
940    // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475).
941    String peerId = queueId.getPeerId();
942    ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
943    if (oldPeer == null) {
944      LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist",
945        peerId, queueId);
946      return;
947    }
948    Map<String, ReplicationGroupOffset> offsets;
949    try {
950      offsets = queueStorage.claimQueue(queueId, server.getServerName());
951    } catch (ReplicationException e) {
952      LOG.error("ReplicationException: cannot claim dead region ({})'s replication queue",
953        queueId.getServerName(), e);
954      server.abort("Failed to claim queue from dead regionserver.", e);
955      return;
956    }
957    if (offsets.isEmpty()) {
958      // someone else claimed the queue
959      return;
960    }
961    ServerName sourceRS = queueId.getServerWALsBelongTo();
962    ReplicationQueueId claimedQueueId = queueId.claim(server.getServerName());
963    ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
964    if (peer == null || peer != oldPeer) {
965      LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, sourceRS);
966      deleteQueue(claimedQueueId);
967      return;
968    }
969    ReplicationSourceInterface src;
970    try {
971      src =
972        createSource(new ReplicationQueueData(claimedQueueId, ImmutableMap.copyOf(offsets)), peer);
973    } catch (IOException e) {
974      LOG.error("Can not create replication source for peer {} and queue {}", peerId,
975        claimedQueueId, e);
976      server.abort("Failed to create replication source after claiming queue.", e);
977      return;
978    }
979    PriorityQueue<Path> walFiles;
980    try {
981      walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets);
982    } catch (IOException e) {
983      LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e);
984      server.abort("Can not list wal files after claiming queue.", e);
985      return;
986    }
987    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
988    synchronized (oldsources) {
989      addRecoveredSource(src, oldPeer, claimedQueueId, walFiles);
990    }
991  }
992
993  /**
994   * Terminate the replication on this region server
995   */
996  public void join() {
997    this.executor.shutdown();
998    for (ReplicationSourceInterface source : this.sources.values()) {
999      source.terminate("Region server is closing");
1000    }
1001    synchronized (oldsources) {
1002      for (ReplicationSourceInterface source : this.oldsources) {
1003        source.terminate("Region server is closing");
1004      }
1005    }
1006  }
1007
1008  /**
1009   * Get a copy of the wals of the normal sources on this rs
1010   * @return a sorted set of wal names
1011   */
1012  @RestrictedApi(explanation = "Should only be called in tests", link = "",
1013      allowedOnPath = ".*/src/test/.*")
1014  public Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWALs() {
1015    return Collections.unmodifiableMap(walsById);
1016  }
1017
1018  /**
1019   * Get a list of all the normal sources of this rs
1020   * @return list of all normal sources
1021   */
1022  public List<ReplicationSourceInterface> getSources() {
1023    return new ArrayList<>(this.sources.values());
1024  }
1025
1026  /**
1027   * Get a list of all the recovered sources of this rs
1028   * @return list of all recovered sources
1029   */
1030  public List<ReplicationSourceInterface> getOldSources() {
1031    return this.oldsources;
1032  }
1033
1034  /**
1035   * Get the normal source for a given peer
1036   * @return the normal source for the give peer if it exists, otherwise null.
1037   */
1038  public ReplicationSourceInterface getSource(String peerId) {
1039    return this.sources.get(peerId);
1040  }
1041
1042  int getSizeOfLatestPath() {
1043    synchronized (latestPaths) {
1044      return latestPaths.size();
1045    }
1046  }
1047
1048  Set<Path> getLastestPath() {
1049    synchronized (latestPaths) {
1050      return Sets.newHashSet(latestPaths.values());
1051    }
1052  }
1053
1054  public long getTotalBufferUsed() {
1055    return totalBufferUsed.get();
1056  }
1057
1058  /**
1059   * Returns the maximum size in bytes of edits held in memory which are pending replication across
1060   * all sources inside this RegionServer.
1061   */
1062  public long getTotalBufferLimit() {
1063    return totalBufferLimit;
1064  }
1065
1066  /**
1067   * Get the directory where wals are archived
1068   * @return the directory where wals are archived
1069   */
1070  public Path getOldLogDir() {
1071    return this.oldLogDir;
1072  }
1073
1074  /**
1075   * Get the directory where wals are stored by their RSs
1076   * @return the directory where wals are stored by their RSs
1077   */
1078  public Path getLogDir() {
1079    return this.logDir;
1080  }
1081
1082  /**
1083   * Get the handle on the local file system
1084   * @return Handle on the local file system
1085   */
1086  public FileSystem getFs() {
1087    return this.fs;
1088  }
1089
1090  /**
1091   * Get the ReplicationPeers used by this ReplicationSourceManager
1092   * @return the ReplicationPeers used by this ReplicationSourceManager
1093   */
1094  public ReplicationPeers getReplicationPeers() {
1095    return this.replicationPeers;
1096  }
1097
1098  /**
1099   * Get a string representation of all the sources' metrics
1100   */
1101  public String getStats() {
1102    StringBuilder stats = new StringBuilder();
1103    // Print stats that apply across all Replication Sources
1104    stats.append("Global stats: ");
1105    stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=")
1106      .append(getTotalBufferLimit()).append("B\n");
1107    for (ReplicationSourceInterface source : this.sources.values()) {
1108      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
1109      stats.append(source.getStats() + "\n");
1110    }
1111    for (ReplicationSourceInterface oldSource : oldsources) {
1112      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
1113      stats.append(oldSource.getStats() + "\n");
1114    }
1115    return stats.toString();
1116  }
1117
1118  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
1119    throws IOException {
1120    for (ReplicationSourceInterface source : this.sources.values()) {
1121      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
1122    }
1123  }
1124
1125  public void cleanUpHFileRefs(String peerId, List<String> files) {
1126    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
1127  }
1128
1129  int activeFailoverTaskCount() {
1130    return executor.getActiveCount();
1131  }
1132
1133  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
1134    return this.globalMetrics;
1135  }
1136
1137  ReplicationQueueStorage getQueueStorage() {
1138    return queueStorage;
1139  }
1140
1141  /**
1142   * Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}.
1143   * @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer
1144   *              quota.
1145   * @return true if we should clear buffer and push all
1146   */
1147  boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) {
1148    long entrySize = walEntryBatch.incrementUsedBufferSize(entry);
1149    return this.acquireBufferQuota(entrySize);
1150  }
1151
1152  /**
1153   * To release the buffer quota of {@link WALEntryBatch} which acquired by
1154   * {@link ReplicationSourceManager#acquireWALEntryBufferQuota}.
1155   * @return the released buffer quota size.
1156   */
1157  long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) {
1158    long usedBufferSize = walEntryBatch.getUsedBufferSize();
1159    if (usedBufferSize > 0) {
1160      this.releaseBufferQuota(usedBufferSize);
1161    }
1162    return usedBufferSize;
1163  }
1164
1165  /**
1166   * Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds
1167   * {@link ReplicationSourceManager#totalBufferLimit}.
1168   * @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds
1169   *         {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and
1170   *         ship all.
1171   */
1172  boolean acquireBufferQuota(long size) {
1173    if (size < 0) {
1174      throw new IllegalArgumentException("size should not less than 0");
1175    }
1176    long newBufferUsed = addTotalBufferUsed(size);
1177    return newBufferUsed >= totalBufferLimit;
1178  }
1179
1180  /**
1181   * To release the buffer quota which acquired by
1182   * {@link ReplicationSourceManager#acquireBufferQuota}.
1183   */
1184  void releaseBufferQuota(long size) {
1185    if (size < 0) {
1186      throw new IllegalArgumentException("size should not less than 0");
1187    }
1188    addTotalBufferUsed(-size);
1189  }
1190
1191  private long addTotalBufferUsed(long size) {
1192    if (size == 0) {
1193      return totalBufferUsed.get();
1194    }
1195    long newBufferUsed = totalBufferUsed.addAndGet(size);
1196    // Record the new buffer usage
1197    this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed);
1198    return newBufferUsed;
1199  }
1200
1201  /**
1202   * Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds
1203   * {@link ReplicationSourceManager#totalBufferLimit} for peer.
1204   * @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than
1205   *         {@link ReplicationSourceManager#totalBufferLimit}.
1206   */
1207  boolean checkBufferQuota(String peerId) {
1208    // try not to go over total quota
1209    if (totalBufferUsed.get() > totalBufferLimit) {
1210      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
1211        peerId, totalBufferUsed.get(), totalBufferLimit);
1212      return false;
1213    }
1214    return true;
1215  }
1216}