001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableSet;
029import java.util.OptionalLong;
030import java.util.Set;
031import java.util.SortedSet;
032import java.util.TreeSet;
033import java.util.UUID;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.Future;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.RejectedExecutionException;
039import java.util.concurrent.ThreadLocalRandom;
040import java.util.concurrent.ThreadPoolExecutor;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.AtomicReference;
044import java.util.stream.Collectors;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.Server;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
055import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
056import org.apache.hadoop.hbase.replication.ReplicationException;
057import org.apache.hadoop.hbase.replication.ReplicationListener;
058import org.apache.hadoop.hbase.replication.ReplicationPeer;
059import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
060import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
061import org.apache.hadoop.hbase.replication.ReplicationPeers;
062import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
063import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
064import org.apache.hadoop.hbase.replication.ReplicationTracker;
065import org.apache.hadoop.hbase.util.Pair;
066import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
068import org.apache.hadoop.hbase.wal.WAL;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.apache.hadoop.hbase.wal.WALProvider;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.apache.zookeeper.KeeperException;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
077
078/**
079 * This class is responsible to manage all the replication sources. There are two classes of
080 * sources:
081 * <ul>
082 * <li>Normal sources are persistent and one per peer cluster</li>
083 * <li>Old sources are recovered from a failed region server and our only goal is to finish
084 * replicating the WAL queue it had</li>
085 * </ul>
086 * <p>
087 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
088 * in order to transfer all the queues in a local old source.
089 * <p>
090 * Synchronization specification:
091 * <ul>
092 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
093 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
094 * operations.</li>
095 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
096 * {@link #addPeer(String)}, {@link #removePeer(String)},
097 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
098 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
099 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
100 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
101 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
102 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
103 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
104 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
105 * {@link #preLogRoll(Path)}.</li>
106 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
107 * modify it, {@link #removePeer(String)} ,
108 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
109 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
110 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
111 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
112 * {@link ReplicationSourceInterface} firstly, then remove the wals from
113 * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
114 * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
115 * {@link ReplicationSourceInterface}. So there is no race here. For
116 * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
117 * is already synchronized on {@link #oldsources}. So no need synchronized on
118 * {@link #walsByIdRecoveredQueues}.</li>
119 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
120 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
121 * to-be-removed peer.</li>
122 * </ul>
123 */
124@InterfaceAudience.Private
125public class ReplicationSourceManager implements ReplicationListener {
126  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
127  // all the sources that read this RS's logs and every peer only has one replication source
128  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
129  // List of all the sources we got from died RSs
130  private final List<ReplicationSourceInterface> oldsources;
131
132  /**
133   * Storage for queues that need persistance; e.g. Replication state so can be recovered
134   * after a crash. queueStorage upkeep is spread about this class and passed
135   * to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource
136   * instances keep state.
137   */
138  private final ReplicationQueueStorage queueStorage;
139
140  private final ReplicationTracker replicationTracker;
141  private final ReplicationPeers replicationPeers;
142  // UUID for this cluster
143  private final UUID clusterId;
144  // All about stopping
145  private final Server server;
146
147  // All logs we are currently tracking
148  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
149  // For normal replication source, the peer id is same with the queue id
150  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
151  // Logs for recovered sources we are currently tracking
152  // the map is: queue_id->logPrefix/logGroup->logs
153  // For recovered source, the queue id's format is peer_id-servername-*
154  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
155
156  private final Configuration conf;
157  private final FileSystem fs;
158  // The paths to the latest log of each wal group, for new coming peers
159  private final Set<Path> latestPaths;
160  // Path to the wals directories
161  private final Path logDir;
162  // Path to the wal archive
163  private final Path oldLogDir;
164  private final WALFactory walFactory;
165  // The number of ms that we wait before moving znodes, HBASE-3596
166  private final long sleepBeforeFailover;
167  // Homemade executer service for replication
168  private final ThreadPoolExecutor executor;
169
170  private final boolean replicationForBulkLoadDataEnabled;
171
172
173  private AtomicLong totalBufferUsed = new AtomicLong();
174  // Total buffer size on this RegionServer for holding batched edits to be shipped.
175  private final long totalBufferLimit;
176  private final MetricsReplicationGlobalSourceSource globalMetrics;
177
178  /**
179   * A special ReplicationSource for hbase:meta Region Read Replicas.
180   * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
181   * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
182   * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
183   * this server (in case it later gets moved back). We synchronize on this instance testing for
184   * presence and if absent, while creating so only created and started once.
185   */
186  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
187
188  /**
189   * Creates a replication manager and sets the watch on all the other registered region servers
190   * @param queueStorage the interface for manipulating replication queues
191   * @param conf the configuration to use
192   * @param server the server for this region server
193   * @param fs the file system to use
194   * @param logDir the directory that contains all wal directories of live RSs
195   * @param oldLogDir the directory where old logs are archived
196   */
197  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
198      ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
199      Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
200      WALFactory walFactory,
201      MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
202    // CopyOnWriteArrayList is thread-safe.
203    // Generally, reading is more than modifying.
204    this.sources = new ConcurrentHashMap<>();
205    this.queueStorage = queueStorage;
206    this.replicationPeers = replicationPeers;
207    this.replicationTracker = replicationTracker;
208    this.server = server;
209    this.walsById = new ConcurrentHashMap<>();
210    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
211    this.oldsources = new ArrayList<>();
212    this.conf = conf;
213    this.fs = fs;
214    this.logDir = logDir;
215    this.oldLogDir = oldLogDir;
216    this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
217                                                                                         // seconds
218    this.clusterId = clusterId;
219    this.walFactory = walFactory;
220    this.replicationTracker.registerListener(this);
221    // It's preferable to failover 1 RS at a time, but with good zk servers
222    // more could be processed at the same time.
223    int nbWorkers = conf.getInt("replication.executor.workers", 1);
224    // use a short 100ms sleep since this could be done inline with a RS startup
225    // even if we fail, other region servers can take care of it
226    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100,
227        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
228    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
229    tfb.setNameFormat("ReplicationExecutor-%d");
230    tfb.setDaemon(true);
231    this.executor.setThreadFactory(tfb.build());
232    this.latestPaths = new HashSet<Path>();
233    replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
234      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
235    this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
236        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
237    this.globalMetrics = globalMetrics;
238  }
239
240  /**
241   * Adds a normal source per registered peer cluster and tries to process all old region server wal
242   * queues
243   * <p>
244   * The returned future is for adoptAbandonedQueues task.
245   */
246  Future<?> init() throws IOException {
247    for (String id : this.replicationPeers.getAllPeerIds()) {
248      addSource(id);
249      if (replicationForBulkLoadDataEnabled) {
250        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
251        // when a peer was added before replication for bulk loaded data was enabled.
252        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id));
253      }
254    }
255    return this.executor.submit(this::adoptAbandonedQueues);
256  }
257
258  private void adoptAbandonedQueues() {
259    List<ServerName> currentReplicators = null;
260    try {
261      currentReplicators = queueStorage.getListOfReplicators();
262    } catch (ReplicationException e) {
263      server.abort("Failed to get all replicators", e);
264      return;
265    }
266    if (currentReplicators == null || currentReplicators.isEmpty()) {
267      return;
268    }
269    List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
270        .map(ServerName::valueOf).collect(Collectors.toList());
271    LOG.info(
272      "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
273
274    // Look if there's anything to process after a restart
275    for (ServerName rs : currentReplicators) {
276      if (!otherRegionServers.contains(rs)) {
277        transferQueues(rs);
278      }
279    }
280  }
281
282  /**
283   * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
284   * HFile Refs
285   * @param peerId the id of replication peer
286   */
287  public void addPeer(String peerId) throws IOException {
288    boolean added = false;
289    try {
290      added = this.replicationPeers.addPeer(peerId);
291    } catch (ReplicationException e) {
292      throw new IOException(e);
293    }
294    if (added) {
295      addSource(peerId);
296      if (replicationForBulkLoadDataEnabled) {
297        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId));
298      }
299    }
300  }
301
302  /**
303   * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
304   * and related replication queues 3. Remove the normal source and related replication queue 4.
305   * Remove HFile Refs
306   * @param peerId the id of the replication peer
307   */
308  public void removePeer(String peerId) {
309    replicationPeers.removePeer(peerId);
310    String terminateMessage = "Replication stream was removed by a user";
311    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
312    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
313    // see NodeFailoverWorker.run
314    synchronized (this.oldsources) {
315      // First close all the recovered sources for this peer
316      for (ReplicationSourceInterface src : oldsources) {
317        if (peerId.equals(src.getPeerId())) {
318          oldSourcesToDelete.add(src);
319        }
320      }
321      for (ReplicationSourceInterface src : oldSourcesToDelete) {
322        src.terminate(terminateMessage);
323        removeRecoveredSource(src);
324      }
325    }
326    LOG.info(
327      "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size());
328    // Now close the normal source for this peer
329    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
330    if (srcToRemove != null) {
331      srcToRemove.terminate(terminateMessage);
332      removeSource(srcToRemove);
333    } else {
334      // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup
335      // Delete queue from storage and memory and queue id is same with peer id for normal
336      // source
337      deleteQueue(peerId);
338      this.walsById.remove(peerId);
339    }
340
341    // Remove HFile Refs
342    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
343  }
344
345  /**
346   * @return a new 'classic' user-space replication source.
347   * @param queueId the id of the replication queue to associate the ReplicationSource with.
348   * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta.
349   */
350  private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
351      throws IOException {
352    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
353    // Init the just created replication source. Pass the default walProvider's wal file length
354    // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
355    // replication, see #createCatalogReplicationSource().
356    WALFileLengthProvider walFileLengthProvider =
357      this.walFactory.getWALProvider() != null?
358        this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
359    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
360      walFileLengthProvider, new MetricsSource(queueId));
361    return src;
362  }
363
364  /**
365   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
366   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
367   * group and do replication
368   * @param peerId the id of the replication peer
369   * @return the source that was created
370   */
371  ReplicationSourceInterface addSource(String peerId) throws IOException {
372    ReplicationPeer peer = replicationPeers.getPeer(peerId);
373    ReplicationSourceInterface src = createSource(peerId, peer);
374    // synchronized on latestPaths to avoid missing the new log
375    synchronized (this.latestPaths) {
376      this.sources.put(peerId, src);
377      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
378      this.walsById.put(peerId, walsByGroup);
379      // Add the latest wal to that source's queue
380      if (this.latestPaths.size() > 0) {
381        for (Path logPath : latestPaths) {
382          String name = logPath.getName();
383          String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
384          NavigableSet<String> logs = new TreeSet<>();
385          logs.add(name);
386          walsByGroup.put(walPrefix, logs);
387          // Abort RS and throw exception to make add peer failed
388          abortAndThrowIOExceptionWhenFail(
389            () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
390          src.enqueueLog(logPath);
391        }
392      }
393    }
394    src.startup();
395    return src;
396  }
397
398  /**
399   * Close the previous replication sources of this peer id and open new sources to trigger the new
400   * replication state changes or new replication config changes. Here we don't need to change
401   * replication queue storage and only to enqueue all logs to the new replication source
402   * @param peerId the id of the replication peer
403   * @throws IOException
404   */
405  public void refreshSources(String peerId) throws IOException {
406    String terminateMessage = "Peer " + peerId +
407      " state or config changed. Will close the previous replication source and open a new one";
408    ReplicationPeer peer = replicationPeers.getPeer(peerId);
409    ReplicationSourceInterface src = createSource(peerId, peer);
410    // synchronized on latestPaths to avoid missing the new log
411    synchronized (this.latestPaths) {
412      ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
413      if (toRemove != null) {
414        LOG.info("Terminate replication source for " + toRemove.getPeerId());
415        // Do not clear metrics
416        toRemove.terminate(terminateMessage, null, false);
417      }
418      for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
419        walsByGroup.forEach(wal -> {
420          Path walPath = new Path(this.logDir, wal);
421          src.enqueueLog(walPath);
422          LOG.trace("Enqueued {} to source {} during source creation.",
423            walPath, src.getQueueId());
424        });
425
426      }
427    }
428    LOG.info("Startup replication source for " + src.getPeerId());
429    src.startup();
430
431    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
432    // synchronized on oldsources to avoid race with NodeFailoverWorker
433    synchronized (this.oldsources) {
434      List<String> previousQueueIds = new ArrayList<>();
435      for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter
436          .hasNext();) {
437        ReplicationSourceInterface oldSource = iter.next();
438        if (oldSource.getPeerId().equals(peerId)) {
439          previousQueueIds.add(oldSource.getQueueId());
440          oldSource.terminate(terminateMessage);
441          iter.remove();
442        }
443      }
444      for (String queueId : previousQueueIds) {
445        ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
446        this.oldsources.add(recoveredReplicationSource);
447        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
448          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
449        }
450        toStartup.add(recoveredReplicationSource);
451      }
452    }
453    for (ReplicationSourceInterface replicationSource : toStartup) {
454      replicationSource.startup();
455    }
456  }
457
458  /**
459   * Clear the metrics and related replication queue of the specified old source
460   * @param src source to clear
461   */
462  void removeRecoveredSource(ReplicationSourceInterface src) {
463    LOG.info("Done with the recovered queue " + src.getQueueId());
464    this.oldsources.remove(src);
465    // Delete queue from storage and memory
466    deleteQueue(src.getQueueId());
467    this.walsByIdRecoveredQueues.remove(src.getQueueId());
468  }
469
470  /**
471   * Clear the metrics and related replication queue of the specified old source
472   * @param src source to clear
473   */
474  void removeSource(ReplicationSourceInterface src) {
475    LOG.info("Done with the queue " + src.getQueueId());
476    this.sources.remove(src.getPeerId());
477    // Delete queue from storage and memory
478    deleteQueue(src.getQueueId());
479    this.walsById.remove(src.getQueueId());
480  }
481
482  /**
483   * Delete a complete queue of wals associated with a replication source
484   * @param queueId the id of replication queue to delete
485   */
486  private void deleteQueue(String queueId) {
487    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId));
488  }
489
490  @FunctionalInterface
491  private interface ReplicationQueueOperation {
492    void exec() throws ReplicationException;
493  }
494
495  /**
496   * Refresh replication source will terminate the old source first, then the source thread will be
497   * interrupted. Need to handle it instead of abort the region server.
498   */
499  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
500    try {
501      op.exec();
502    } catch (ReplicationException e) {
503      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
504          && e.getCause().getCause() != null && e.getCause()
505          .getCause() instanceof InterruptedException) {
506        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
507        // that thread is interrupted deep down in the stack, it should pass the following
508        // processing logic and propagate to the most top layer which can handle this exception
509        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
510        throw new ReplicationRuntimeException(
511          "Thread is interrupted, the replication source may be terminated",
512          e.getCause().getCause());
513      }
514      server.abort("Failed to operate on replication queue", e);
515    }
516  }
517
518  private void abortWhenFail(ReplicationQueueOperation op) {
519    try {
520      op.exec();
521    } catch (ReplicationException e) {
522      server.abort("Failed to operate on replication queue", e);
523    }
524  }
525
526  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
527    try {
528      op.exec();
529    } catch (ReplicationException e) {
530      throw new IOException(e);
531    }
532  }
533
534  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
535    try {
536      op.exec();
537    } catch (ReplicationException e) {
538      server.abort("Failed to operate on replication queue", e);
539      throw new IOException(e);
540    }
541  }
542
543  /**
544   * This method will log the current position to storage. And also clean old logs from the
545   * replication queue.
546   * @param entryBatch the wal entry batch we just shipped
547   */
548  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
549      WALEntryBatch entryBatch) {
550    String fileName = entryBatch.getLastWalPath().getName();
551    String queueId = source.getQueueId();
552    interruptOrAbortWhenFail(() -> this.queueStorage
553        .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(),
554            entryBatch.getLastSeqIds()));
555    cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered());
556  }
557
558  /**
559   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
560   * file is closed and has no more entries.
561   * @param log Path to the log
562   * @param inclusive whether we should also remove the given log file
563   * @param queueId id of the replication queue
564   * @param queueRecovered Whether this is a recovered queue
565   */
566  void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
567    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
568    if (queueRecovered) {
569      NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
570      if (wals != null) {
571        cleanOldLogs(wals, log, inclusive, queueId);
572      }
573    } else {
574      // synchronized on walsById to avoid race with preLogRoll
575      synchronized (this.walsById) {
576        NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
577        if (wals != null) {
578          cleanOldLogs(wals, log, inclusive, queueId);
579        }
580      }
581    }
582  }
583
584  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
585    NavigableSet<String> walSet = wals.headSet(key, inclusive);
586    if (walSet.isEmpty()) {
587      return;
588    }
589    LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
590    for (String wal : walSet) {
591      interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
592    }
593    walSet.clear();
594  }
595
596  // public because of we call it in TestReplicationEmptyWALRecovery
597  public void preLogRoll(Path newLog) throws IOException {
598    String logName = newLog.getName();
599    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
600    // synchronized on latestPaths to avoid the new open source miss the new log
601    synchronized (this.latestPaths) {
602      // Add log to queue storage
603      for (ReplicationSourceInterface source : this.sources.values()) {
604        // If record log to queue storage failed, abort RS and throw exception to make log roll
605        // failed
606        abortAndThrowIOExceptionWhenFail(
607          () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
608      }
609
610      // synchronized on walsById to avoid race with cleanOldLogs
611      synchronized (this.walsById) {
612        // Update walsById map
613        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
614          .entrySet()) {
615          String peerId = entry.getKey();
616          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
617          boolean existingPrefix = false;
618          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
619            SortedSet<String> wals = walsEntry.getValue();
620            if (this.sources.isEmpty()) {
621              // If there's no slaves, don't need to keep the old wals since
622              // we only consider the last one when a new slave comes in
623              wals.clear();
624            }
625            if (logPrefix.equals(walsEntry.getKey())) {
626              wals.add(logName);
627              existingPrefix = true;
628            }
629          }
630          if (!existingPrefix) {
631            // The new log belongs to a new group, add it into this peer
632            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
633            NavigableSet<String> wals = new TreeSet<>();
634            wals.add(logName);
635            walsByPrefix.put(logPrefix, wals);
636          }
637        }
638      }
639
640      // Add to latestPaths
641      Iterator<Path> iterator = latestPaths.iterator();
642      while (iterator.hasNext()) {
643        Path path = iterator.next();
644        if (path.getName().contains(logPrefix)) {
645          iterator.remove();
646          break;
647        }
648      }
649      this.latestPaths.add(newLog);
650    }
651  }
652
653  // public because of we call it in TestReplicationEmptyWALRecovery
654  public void postLogRoll(Path newLog) throws IOException {
655    // This only updates the sources we own, not the recovered ones
656    for (ReplicationSourceInterface source : this.sources.values()) {
657      source.enqueueLog(newLog);
658      LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.",
659          newLog, source.getQueueId());
660    }
661  }
662
663  @Override
664  public void regionServerRemoved(String regionserver) {
665    transferQueues(ServerName.valueOf(regionserver));
666  }
667
668  /**
669   * Transfer all the queues of the specified to this region server. First it tries to grab a lock
670   * and if it works it will move the old queues and finally will delete the old queues.
671   * <p>
672   * It creates one old source for any type of source of the old rs.
673   */
674  private void transferQueues(ServerName deadRS) {
675    if (server.getServerName().equals(deadRS)) {
676      // it's just us, give up
677      return;
678    }
679    NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
680    try {
681      this.executor.execute(transfer);
682    } catch (RejectedExecutionException ex) {
683      CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
684          .getGlobalSource().incrFailedRecoveryQueue();
685      LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
686    }
687  }
688
689  /**
690   * Class responsible to setup new ReplicationSources to take care of the queues from dead region
691   * servers.
692   */
693  class NodeFailoverWorker extends Thread {
694
695    private final ServerName deadRS;
696    // After claim the queues from dead region server, the NodeFailoverWorker will skip to start
697    // the RecoveredReplicationSource if the peer has been removed. but there's possible that
698    // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
699    // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
700    // should start the RecoveredReplicationSource. If the latest peer is not the old peer when
701    // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
702    // the rs will abort (See HBASE-20475).
703    private final Map<String, ReplicationPeerImpl> peersSnapshot;
704
705    public NodeFailoverWorker(ServerName deadRS) {
706      super("Failover-for-" + deadRS);
707      this.deadRS = deadRS;
708      peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
709    }
710
711    private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
712      ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
713      return oldPeerRef != null && oldPeerRef == newPeerRef;
714    }
715
716    @Override
717    public void run() {
718      // Wait a bit before transferring the queues, we may be shutting down.
719      // This sleep may not be enough in some cases.
720      try {
721        Thread.sleep(sleepBeforeFailover +
722          (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
723      } catch (InterruptedException e) {
724        LOG.warn("Interrupted while waiting before transferring a queue.");
725        Thread.currentThread().interrupt();
726      }
727      // We try to lock that rs' queue directory
728      if (server.isStopped()) {
729        LOG.info("Not transferring queue since we are shutting down");
730        return;
731      }
732      Map<String, Set<String>> newQueues = new HashMap<>();
733      try {
734        List<String> queues = queueStorage.getAllQueues(deadRS);
735        while (!queues.isEmpty()) {
736          Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
737            queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName());
738          long sleep = sleepBeforeFailover / 2;
739          if (!peer.getSecond().isEmpty()) {
740            newQueues.put(peer.getFirst(), peer.getSecond());
741            sleep = sleepBeforeFailover;
742          }
743          try {
744            Thread.sleep(sleep);
745          } catch (InterruptedException e) {
746            LOG.warn("Interrupted while waiting before transferring a queue.");
747            Thread.currentThread().interrupt();
748          }
749          queues = queueStorage.getAllQueues(deadRS);
750        }
751        if (queues.isEmpty()) {
752          queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
753        }
754      } catch (ReplicationException e) {
755        LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
756            "replication queue. Znode : (%s)" +
757            " Possible solution: check if znode size exceeds jute.maxBuffer value. " +
758            " If so, increase it for both client and server side." + e),  deadRS,
759            queueStorage.getRsNode(deadRS));
760        server.abort("Failed to claim queue from dead regionserver.", e);
761        return;
762      }
763      // Copying over the failed queue is completed.
764      if (newQueues.isEmpty()) {
765        // We either didn't get the lock or the failed region server didn't have any outstanding
766        // WALs to replicate, so we are done.
767        return;
768      }
769
770      for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
771        String queueId = entry.getKey();
772        Set<String> walsSet = entry.getValue();
773        try {
774          // there is not an actual peer defined corresponding to peerId for the failover.
775          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
776          String actualPeerId = replicationQueueInfo.getPeerId();
777
778          ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
779          if (peer == null || !isOldPeer(actualPeerId, peer)) {
780            LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
781              deadRS);
782            abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
783            continue;
784          }
785          if (server instanceof ReplicationSyncUp.DummyServer
786              && peer.getPeerState().equals(PeerState.DISABLED)) {
787            LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
788                + "replicating data to this peer.",
789              actualPeerId);
790            continue;
791          }
792          // track sources in walsByIdRecoveredQueues
793          Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
794          walsByIdRecoveredQueues.put(queueId, walsByGroup);
795          for (String wal : walsSet) {
796            String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
797            NavigableSet<String> wals = walsByGroup.get(walPrefix);
798            if (wals == null) {
799              wals = new TreeSet<>();
800              walsByGroup.put(walPrefix, wals);
801            }
802            wals.add(wal);
803          }
804
805          ReplicationSourceInterface src = createSource(queueId, peer);
806          // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
807          synchronized (oldsources) {
808            peer = replicationPeers.getPeer(src.getPeerId());
809            if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
810              src.terminate("Recovered queue doesn't belong to any current peer");
811              removeRecoveredSource(src);
812              continue;
813            }
814            oldsources.add(src);
815            LOG.info("Added recovered source {}", src.getQueueId());
816            for (String wal : walsSet) {
817              src.enqueueLog(new Path(oldLogDir, wal));
818            }
819            src.startup();
820          }
821        } catch (IOException e) {
822          // TODO manage it
823          LOG.error("Failed creating a source", e);
824        }
825      }
826    }
827  }
828
829  /**
830   * Terminate the replication on this region server
831   */
832  public void join() {
833    this.executor.shutdown();
834    for (ReplicationSourceInterface source : this.sources.values()) {
835      source.terminate("Region server is closing");
836    }
837  }
838
839  /**
840   * Get a copy of the wals of the normal sources on this rs
841   * @return a sorted set of wal names
842   */
843  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
844    return Collections.unmodifiableMap(walsById);
845  }
846
847  /**
848   * Get a copy of the wals of the recovered sources on this rs
849   * @return a sorted set of wal names
850   */
851  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
852    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
853  }
854
855  /**
856   * Get a list of all the normal sources of this rs
857   * @return list of all normal sources
858   */
859  public List<ReplicationSourceInterface> getSources() {
860    return new ArrayList<>(this.sources.values());
861  }
862
863  /**
864   * Get a list of all the recovered sources of this rs
865   * @return list of all recovered sources
866   */
867  public List<ReplicationSourceInterface> getOldSources() {
868    return this.oldsources;
869  }
870
871  /**
872   * Get the normal source for a given peer
873   * @return the normal source for the give peer if it exists, otherwise null.
874   */
875  public ReplicationSourceInterface getSource(String peerId) {
876    return this.sources.get(peerId);
877  }
878
879  List<String> getAllQueues() throws IOException {
880    List<String> allQueues = Collections.emptyList();
881    try {
882      allQueues = queueStorage.getAllQueues(server.getServerName());
883    } catch (ReplicationException e) {
884      throw new IOException(e);
885    }
886    return allQueues;
887  }
888
889  int getSizeOfLatestPath() {
890    synchronized (latestPaths) {
891      return latestPaths.size();
892    }
893  }
894
895  public AtomicLong getTotalBufferUsed() {
896    return totalBufferUsed;
897  }
898
899  /**
900   * Returns the maximum size in bytes of edits held in memory which are pending replication
901   * across all sources inside this RegionServer.
902   */
903  public long getTotalBufferLimit() {
904    return totalBufferLimit;
905  }
906
907  /**
908   * Get the directory where wals are archived
909   * @return the directory where wals are archived
910   */
911  public Path getOldLogDir() {
912    return this.oldLogDir;
913  }
914
915  /**
916   * Get the directory where wals are stored by their RSs
917   * @return the directory where wals are stored by their RSs
918   */
919  public Path getLogDir() {
920    return this.logDir;
921  }
922
923  /**
924   * Get the handle on the local file system
925   * @return Handle on the local file system
926   */
927  public FileSystem getFs() {
928    return this.fs;
929  }
930
931  /**
932   * Get the ReplicationPeers used by this ReplicationSourceManager
933   * @return the ReplicationPeers used by this ReplicationSourceManager
934   */
935  public ReplicationPeers getReplicationPeers() {
936    return this.replicationPeers;
937  }
938
939  /**
940   * Get a string representation of all the sources' metrics
941   */
942  public String getStats() {
943    StringBuilder stats = new StringBuilder();
944    // Print stats that apply across all Replication Sources
945    stats.append("Global stats: ");
946    stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
947        .append(getTotalBufferLimit()).append("B\n");
948    for (ReplicationSourceInterface source : this.sources.values()) {
949      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
950      stats.append(source.getStats() + "\n");
951    }
952    for (ReplicationSourceInterface oldSource : oldsources) {
953      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
954      stats.append(oldSource.getStats() + "\n");
955    }
956    return stats.toString();
957  }
958
959  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
960      throws IOException {
961    for (ReplicationSourceInterface source : this.sources.values()) {
962      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
963    }
964  }
965
966  public void cleanUpHFileRefs(String peerId, List<String> files) {
967    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
968  }
969
970  int activeFailoverTaskCount() {
971    return executor.getActiveCount();
972  }
973
974  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
975    return this.globalMetrics;
976  }
977
978  /**
979   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
980   * Create it once only. If exists already, use the existing one.
981   * @see #removeCatalogReplicationSource(RegionInfo)
982   * @see #addSource(String) This is specialization on the addSource method.
983   */
984  public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
985      throws IOException {
986    // Poor-man's putIfAbsent
987    synchronized (this.catalogReplicationSource) {
988      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
989      return rs != null ? rs :
990        this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
991    }
992  }
993
994  /**
995   * Remove the hbase:meta Catalog replication source.
996   * Called when we close hbase:meta.
997   * @see #addCatalogReplicationSource(RegionInfo regionInfo)
998   */
999  public void removeCatalogReplicationSource(RegionInfo regionInfo) {
1000    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
1001    // comes back to this server.
1002  }
1003
1004  /**
1005   * Create, initialize, and start the Catalog ReplicationSource.
1006   * Presumes called one-time only (caller must ensure one-time only call).
1007   * This ReplicationSource is NOT created via {@link ReplicationSourceFactory}.
1008   * @see #addSource(String) This is a specialization of the addSource call.
1009   * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
1010   * why the special handling).
1011   */
1012  private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
1013      throws IOException {
1014    // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
1015    // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
1016    WALProvider walProvider = this.walFactory.getMetaWALProvider();
1017    boolean instantiate = walProvider == null;
1018    if (instantiate) {
1019      walProvider = this.walFactory.getMetaProvider();
1020    }
1021    // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
1022    // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
1023    // read replicas feature that makes use of the source does a reset on a crash of the WAL
1024    // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
1025    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
1026    CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
1027      this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
1028    final ReplicationSourceInterface crs = new CatalogReplicationSource();
1029    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
1030      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
1031    // Add listener on the provider so we can pick up the WAL to replicate on roll.
1032    WALActionsListener listener = new WALActionsListener() {
1033      @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
1034        crs.enqueueLog(newPath);
1035      }
1036    };
1037    walProvider.addWALActionsListener(listener);
1038    if (!instantiate) {
1039      // If we did not instantiate provider, need to add our listener on already-created WAL
1040      // instance too (listeners are passed by provider to WAL instance on creation but if provider
1041      // created already, our listener add above is missed). And add the current WAL file to the
1042      // Replication Source so it can start replicating it.
1043      WAL wal = walProvider.getWAL(regionInfo);
1044      wal.registerWALActionsListener(listener);
1045      crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
1046    }
1047    return crs.startup();
1048  }
1049}