001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
021
022import com.google.errorprone.annotations.RestrictedApi;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.PriorityBlockingQueue;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.AtomicLong;
039import java.util.function.Predicate;
040import org.apache.commons.lang3.StringUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableDescriptors;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
054import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationException;
056import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
057import org.apache.hadoop.hbase.replication.ReplicationPeer;
058import org.apache.hadoop.hbase.replication.ReplicationQueueData;
059import org.apache.hadoop.hbase.replication.ReplicationQueueId;
060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
061import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
062import org.apache.hadoop.hbase.replication.WALEntryFilter;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.OOMEChecker;
065import org.apache.hadoop.hbase.util.Pair;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
068import org.apache.hadoop.hbase.wal.WAL.Entry;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075
076/**
077 * Class that handles the source of a replication stream. Currently does not handle more than 1
078 * slave cluster. For each slave cluster it selects a random number of peers using a replication
079 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
080 * be selected.
081 * <p>
082 * A stream is considered down when we cannot contact a region server on the peer cluster for more
083 * than 55 seconds by default.
084 * </p>
085 */
086@InterfaceAudience.Private
087public class ReplicationSource implements ReplicationSourceInterface {
088
089  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
090  // per group queue size, keep no more than this number of logs in each wal group
091  protected int queueSizePerGroup;
092  protected ReplicationSourceLogQueue logQueue;
093  protected ReplicationQueueStorage queueStorage;
094  protected ReplicationPeer replicationPeer;
095
096  protected Configuration conf;
097
098  // The manager of all sources to which we ping back our progress
099  protected ReplicationSourceManager manager;
100  // Should we stop everything?
101  protected Server server;
102  // How long should we sleep for each retry
103  private long sleepForRetries;
104  protected FileSystem fs;
105  // id of this cluster
106  private UUID clusterId;
107  // total number of edits we replicated
108  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
109  // The id of the replication queue
110  protected ReplicationQueueId queueId;
111  // The start offsets. Usually only recovered replication queue needs this, but probably when we
112  // update the peer config and restart the replication peer, we also need this?
113  protected ImmutableMap<String, ReplicationGroupOffset> startOffsets;
114  // Maximum number of retries before taking bold actions
115  private int maxRetriesMultiplier;
116  // Indicates if this particular source is running
117  volatile boolean sourceRunning = false;
118  // Metrics for this source
119  private MetricsSource metrics;
120  // ReplicationEndpoint which will handle the actual replication
121  private volatile ReplicationEndpoint replicationEndpoint;
122
123  private boolean abortOnError;
124  // This is needed for the startup loop to identify when there's already
125  // an initialization happening (but not finished yet),
126  // so that it doesn't try submit another initialize thread.
127  // NOTE: this should only be set to false at the end of initialize method, prior to return.
128  private AtomicBoolean startupOngoing = new AtomicBoolean(false);
129  // Flag that signalizes uncaught error happening while starting up the source
130  // and a retry should be attempted
131  private AtomicBoolean retryStartup = new AtomicBoolean(false);
132
133  /**
134   * A filter (or a chain of filters) for WAL entries; filters out edits.
135   */
136  protected volatile WALEntryFilter walEntryFilter;
137
138  // throttler
139  private ReplicationThrottler throttler;
140  private long defaultBandwidth;
141  private long currentBandwidth;
142  private WALFileLengthProvider walFileLengthProvider;
143  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
144    new ConcurrentHashMap<>();
145
146  public static final String WAIT_ON_ENDPOINT_SECONDS =
147    "hbase.replication.wait.on.endpoint.seconds";
148  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
149  private int waitOnEndpointSeconds = -1;
150
151  private Thread initThread;
152
153  /**
154   * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to
155   * skip.
156   */
157  private final Predicate<Path> filterInWALs;
158
159  /**
160   * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we
161   * do not want replicated, passed on to replication endpoints. This is the basic set. Down in
162   * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are
163   * put after those that we pick up from the configured endpoints and other machinations to create
164   * the final {@link #walEntryFilter}.
165   * @see WALEntryFilter
166   */
167  private final List<WALEntryFilter> baseFilterOutWALEntries;
168
169  ReplicationSource() {
170    // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
171    this(p -> !AbstractFSWALProvider.isMetaFile(p),
172      Lists.newArrayList(new SystemTableWALEntryFilter()));
173  }
174
175  /**
176   * @param replicateWAL            Pass a filter to run against WAL Path; filter *in* WALs to
177   *                                Replicate; i.e. return 'true' if you want to replicate the
178   *                                content of the WAL.
179   * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
180   *                                WALEntries so they never make it out of this ReplicationSource.
181   */
182  ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
183    this.filterInWALs = replicateWAL;
184    this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
185  }
186
187  /**
188   * Instantiation method used by region servers
189   * @param conf      configuration to use
190   * @param fs        file system to use
191   * @param manager   replication manager to ping to
192   * @param server    the server for this region server
193   * @param queueData the id and offsets of our replication queue
194   * @param clusterId unique UUID for the cluster
195   * @param metrics   metrics for replication source
196   */
197  @Override
198  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
199    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
200    ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
201    MetricsSource metrics) throws IOException {
202    this.server = server;
203    this.conf = HBaseConfiguration.create(conf);
204    this.waitOnEndpointSeconds =
205      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
206    decorateConf();
207    // 1 second
208    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
209    // 5 minutes @ 1 sec per
210    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
211    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
212    this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
213    this.queueStorage = queueStorage;
214    this.replicationPeer = replicationPeer;
215    this.manager = manager;
216    this.fs = fs;
217    this.metrics = metrics;
218    this.clusterId = clusterId;
219
220    this.queueId = queueData.getId();
221    this.startOffsets = queueData.getOffsets();
222
223    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
224    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
225    currentBandwidth = getCurrentBandwidth();
226    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
227    this.walFileLengthProvider = walFileLengthProvider;
228
229    this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
230
231    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
232      replicationPeer.getId(), this.currentBandwidth);
233  }
234
235  private void decorateConf() {
236    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
237    if (StringUtils.isNotEmpty(replicationCodec)) {
238      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
239    }
240  }
241
242  @Override
243  public void enqueueLog(Path wal) {
244    if (!this.filterInWALs.test(wal)) {
245      LOG.trace("NOT replicating {}", wal);
246      return;
247    }
248    // Use WAL prefix as the WALGroupId for this peer.
249    String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
250    boolean queueExists = logQueue.enqueueLog(wal, walGroupId);
251
252    if (!queueExists) {
253      if (this.isSourceActive() && this.walEntryFilter != null) {
254        // new wal group observed after source startup, start a new worker thread to track it
255        // notice: it's possible that wal enqueued when this.running is set but worker thread
256        // still not launched, so it's necessary to check workerThreads before start the worker
257        tryStartNewShipper(walGroupId);
258      }
259    }
260    if (LOG.isTraceEnabled()) {
261      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walGroupId, queueId);
262    }
263  }
264
265  @RestrictedApi(explanation = "Should only be called in tests", link = "",
266      allowedOnPath = ".*/src/test/.*")
267  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
268    return logQueue.getQueues();
269  }
270
271  @Override
272  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
273    throws ReplicationException {
274    String peerId = replicationPeer.getId();
275    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
276      this.queueStorage.addHFileRefs(peerId, pairs);
277      metrics.incrSizeOfHFileRefsQueue(pairs.size());
278    } else {
279      LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
280        tableName, Bytes.toString(family), peerId);
281    }
282  }
283
284  private ReplicationEndpoint createReplicationEndpoint()
285    throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
286    RegionServerCoprocessorHost rsServerHost = null;
287    if (server instanceof HRegionServer) {
288      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
289    }
290    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
291
292    ReplicationEndpoint replicationEndpoint;
293    if (replicationEndpointImpl == null) {
294      // Default to HBase inter-cluster replication endpoint; skip reflection
295      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
296    } else {
297      try {
298        replicationEndpoint = Class.forName(replicationEndpointImpl)
299          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
300      } catch (NoSuchMethodException | InvocationTargetException e) {
301        throw new IllegalArgumentException(e);
302      }
303    }
304    if (rsServerHost != null) {
305      ReplicationEndpoint newReplicationEndPoint =
306        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
307      if (newReplicationEndPoint != null) {
308        // Override the newly created endpoint from the hook with configured end point
309        replicationEndpoint = newReplicationEndPoint;
310      }
311    }
312    return replicationEndpoint;
313  }
314
315  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
316    throws IOException, TimeoutException {
317    TableDescriptors tableDescriptors = null;
318    if (server instanceof HRegionServer) {
319      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
320    }
321    replicationEndpoint
322      .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
323        replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
324    replicationEndpoint.start();
325    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
326  }
327
328  private void initializeWALEntryFilter(UUID peerClusterId) {
329    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
330    List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
331    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
332    if (filterFromEndpoint != null) {
333      filters.add(filterFromEndpoint);
334    }
335    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
336    this.walEntryFilter = new ChainWALEntryFilter(filters);
337  }
338
339  private long getStartOffset(String walGroupId) {
340    ReplicationGroupOffset startOffset = startOffsets.get(walGroupId);
341    if (startOffset == null || startOffset == ReplicationGroupOffset.BEGIN) {
342      return 0L;
343    }
344    // this method will only be called when start new shipper, and we will only start new shipper
345    // when there is a new queue, so here the queue for walGroupId will never be null.
346    Path first = logQueue.getQueue(walGroupId).peek();
347    if (!startOffset.getWal().equals(first.getName())) {
348      return 0L;
349    }
350    // Usually, if we arrive here, the start offset should never be -1, as it means this file has
351    // been fully replicated so we should have filtered it out in upper layer, usually in
352    // ReplicationSourceManager. Add a warn message for safety, as usually replicate more data will
353    // not cause big problems.
354    if (startOffset.getOffset() < 0) {
355      LOG.warn("Should have already replicated wal {}, return start offset as 0",
356        startOffset.getWal());
357      return 0L;
358    } else {
359      return startOffset.getOffset();
360    }
361  }
362
363  protected final ReplicationSourceShipper createNewShipper(String walGroupId) {
364    ReplicationSourceWALReader walReader =
365      createNewWALReader(walGroupId, getStartOffset(walGroupId));
366    ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
367    Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
368      + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
369    return worker;
370  }
371
372  protected final void startShipper(ReplicationSourceShipper worker) {
373    worker.startup(this::retryRefreshing);
374  }
375
376  private void tryStartNewShipper(String walGroupId) {
377    workerThreads.compute(walGroupId, (key, value) -> {
378      if (value != null) {
379        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
380        return value;
381      } else {
382        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
383        ReplicationSourceShipper worker = createNewShipper(walGroupId);
384        startShipper(worker);
385        return worker;
386      }
387    });
388  }
389
390  @Override
391  public Map<String, ReplicationStatus> getWalGroupStatus() {
392    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
393    long ageOfLastShippedOp, replicationDelay, fileSize;
394    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
395      String walGroupId = walGroupShipper.getKey();
396      ReplicationSourceShipper shipper = walGroupShipper.getValue();
397      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
398      int queueSize = logQueue.getQueueSize(walGroupId);
399      replicationDelay = metrics.getReplicationDelay();
400      Path currentPath = shipper.getCurrentPath();
401      fileSize = -1;
402      if (currentPath != null) {
403        try {
404          fileSize = getFileSize(currentPath);
405        } catch (IOException e) {
406          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
407        }
408      } else {
409        currentPath = new Path("NO_LOGS_IN_QUEUE");
410        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
411      }
412      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
413      statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId)
414        .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition())
415        .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp)
416        .withReplicationDelay(replicationDelay);
417      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
418    }
419    return sourceReplicationStatus;
420  }
421
422  private long getFileSize(Path currentPath) throws IOException {
423    long fileSize;
424    try {
425      fileSize = fs.getContentSummary(currentPath).getLength();
426    } catch (FileNotFoundException e) {
427      Path archivedLogPath = findArchivedLog(currentPath, conf);
428      // archivedLogPath can be null if unable to locate in archiveDir.
429      if (archivedLogPath == null) {
430        throw new FileNotFoundException("Couldn't find path: " + currentPath);
431      }
432      fileSize = fs.getContentSummary(archivedLogPath).getLength();
433    }
434    return fileSize;
435  }
436
437  protected ReplicationSourceShipper createNewShipper(String walGroupId,
438    ReplicationSourceWALReader walReader) {
439    return new ReplicationSourceShipper(conf, walGroupId, this, walReader);
440  }
441
442  private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
443    return replicationPeer.getPeerConfig().isSerial()
444      ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
445        this, walGroupId)
446      : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this,
447        walGroupId);
448  }
449
450  /**
451   * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
452   * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
453   */
454  WALEntryFilter getWalEntryFilter() {
455    return walEntryFilter;
456  }
457
458  // log the error, check if the error is OOME, or whether we should abort the server
459  private void checkError(Thread t, Throwable error) {
460    OOMEChecker.exitIfOOME(error, getClass().getSimpleName());
461    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error);
462    if (abortOnError) {
463      server.abort("Unexpected exception in " + t.getName(), error);
464    }
465  }
466
467  private void retryRefreshing(Thread t, Throwable error) {
468    checkError(t, error);
469    while (true) {
470      if (server.isAborted() || server.isStopped() || server.isStopping()) {
471        LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId());
472        return;
473      }
474      try {
475        LOG.info("Refreshing replication sources now due to previous error on thread: {}",
476          t.getName());
477        manager.refreshSources(getPeerId());
478        break;
479      } catch (Exception e) {
480        LOG.error("Replication sources refresh failed.", e);
481        sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
482      }
483    }
484  }
485
486  @Override
487  public ReplicationEndpoint getReplicationEndpoint() {
488    return this.replicationEndpoint;
489  }
490
491  @Override
492  public ReplicationSourceManager getSourceManager() {
493    return this.manager;
494  }
495
496  @Override
497  public void tryThrottle(int batchSize) throws InterruptedException {
498    checkBandwidthChangeAndResetThrottler();
499    if (throttler.isEnabled()) {
500      long sleepTicks = throttler.getNextSleepInterval(batchSize);
501      if (sleepTicks > 0) {
502        if (LOG.isTraceEnabled()) {
503          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
504        }
505        Thread.sleep(sleepTicks);
506        // reset throttler's cycle start tick when sleep for throttling occurs
507        throttler.resetStartTick();
508      }
509    }
510  }
511
512  private void checkBandwidthChangeAndResetThrottler() {
513    long peerBandwidth = getCurrentBandwidth();
514    if (peerBandwidth != currentBandwidth) {
515      currentBandwidth = peerBandwidth;
516      throttler.setBandwidth((double) currentBandwidth / 10.0);
517      LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}",
518        replicationPeer.getId(), currentBandwidth);
519    }
520  }
521
522  private long getCurrentBandwidth() {
523    long peerBandwidth = replicationPeer.getPeerBandwidth();
524    // User can set peer bandwidth to 0 to use default bandwidth.
525    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
526  }
527
528  /**
529   * Do the sleeping logic
530   * @param msg             Why we sleep
531   * @param sleepMultiplier by how many times the default sleeping time is augmented
532   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
533   */
534  private boolean sleepForRetries(String msg, int sleepMultiplier) {
535    try {
536      if (LOG.isTraceEnabled()) {
537        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
538          sleepMultiplier);
539      }
540      Thread.sleep(this.sleepForRetries * sleepMultiplier);
541    } catch (InterruptedException e) {
542      if (LOG.isDebugEnabled()) {
543        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
544      }
545      Thread.currentThread().interrupt();
546    }
547    return sleepMultiplier < maxRetriesMultiplier;
548  }
549
550  private void initialize() {
551    int sleepMultiplier = 1;
552    while (this.isSourceActive()) {
553      ReplicationEndpoint replicationEndpoint;
554      try {
555        replicationEndpoint = createReplicationEndpoint();
556      } catch (Exception e) {
557        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
558        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
559          sleepMultiplier++;
560        }
561        continue;
562      }
563
564      try {
565        initAndStartReplicationEndpoint(replicationEndpoint);
566        this.replicationEndpoint = replicationEndpoint;
567        break;
568      } catch (Exception e) {
569        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
570        replicationEndpoint.stop();
571        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
572          sleepMultiplier++;
573        } else {
574          retryStartup.set(!this.abortOnError);
575          setSourceStartupStatus(false);
576          throw new RuntimeException("Exhausted retries to start replication endpoint.");
577        }
578      }
579    }
580
581    if (!this.isSourceActive()) {
582      // this means the server is shutting down or the source is terminated, just give up
583      // initializing
584      setSourceStartupStatus(false);
585      return;
586    }
587
588    sleepMultiplier = 1;
589    UUID peerClusterId;
590    // delay this until we are in an asynchronous thread
591    for (;;) {
592      peerClusterId = replicationEndpoint.getPeerUUID();
593      if (this.isSourceActive() && peerClusterId == null) {
594        if (LOG.isDebugEnabled()) {
595          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
596            (this.sleepForRetries * sleepMultiplier));
597        }
598        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
599          sleepMultiplier++;
600        }
601      } else {
602        break;
603      }
604    }
605
606    if (!this.isSourceActive()) {
607      // this means the server is shutting down or the source is terminated, just give up
608      // initializing
609      setSourceStartupStatus(false);
610      return;
611    }
612
613    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(),
614      queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
615    initializeWALEntryFilter(peerClusterId);
616    // Start workers
617    startShippers();
618    setSourceStartupStatus(false);
619  }
620
621  protected void startShippers() {
622    for (String walGroupId : logQueue.getQueues().keySet()) {
623      tryStartNewShipper(walGroupId);
624    }
625  }
626
627  private synchronized void setSourceStartupStatus(boolean initializing) {
628    startupOngoing.set(initializing);
629    if (initializing) {
630      metrics.incrSourceInitializing();
631    } else {
632      metrics.decrSourceInitializing();
633    }
634  }
635
636  @Override
637  public ReplicationSourceInterface startup() {
638    if (this.sourceRunning) {
639      return this;
640    }
641    this.sourceRunning = true;
642    setSourceStartupStatus(true);
643    initThread = new Thread(this::initialize);
644    Threads.setDaemonThreadRunning(initThread,
645      Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> {
646        // if first initialization attempt failed, and abortOnError is false, we will
647        // keep looping in this thread until initialize eventually succeeds,
648        // while the server main startup one can go on with its work.
649        sourceRunning = false;
650        checkError(t, e);
651        retryStartup.set(!this.abortOnError);
652        do {
653          if (retryStartup.get()) {
654            this.sourceRunning = true;
655            setSourceStartupStatus(true);
656            retryStartup.set(false);
657            try {
658              initialize();
659            } catch (Throwable error) {
660              setSourceStartupStatus(false);
661              checkError(t, error);
662              retryStartup.set(!this.abortOnError);
663            }
664          }
665        } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
666      });
667    return this;
668  }
669
670  @Override
671  public void terminate(String reason) {
672    terminate(reason, null);
673  }
674
675  @Override
676  public void terminate(String reason, Exception cause) {
677    terminate(reason, cause, true);
678  }
679
680  @Override
681  public void terminate(String reason, Exception cause, boolean clearMetrics) {
682    terminate(reason, cause, clearMetrics, true);
683  }
684
685  private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
686    if (cause == null) {
687      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
688    } else {
689      LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(),
690        this.queueId, reason), cause);
691    }
692    this.sourceRunning = false;
693    if (initThread != null && Thread.currentThread() != initThread) {
694      // This usually won't happen but anyway, let's wait until the initialization thread exits.
695      // And notice that we may call terminate directly from the initThread so here we need to
696      // avoid join on ourselves.
697      initThread.interrupt();
698      Threads.shutdown(initThread, this.sleepForRetries);
699    }
700    Collection<ReplicationSourceShipper> workers = workerThreads.values();
701
702    for (ReplicationSourceShipper worker : workers) {
703      worker.stopWorker();
704      worker.entryReader.setReaderRunning(false);
705    }
706
707    if (this.replicationEndpoint != null) {
708      this.replicationEndpoint.stop();
709    }
710
711    for (ReplicationSourceShipper worker : workers) {
712      if (worker.isAlive() || worker.entryReader.isAlive()) {
713        try {
714          // Wait worker to stop
715          Thread.sleep(this.sleepForRetries);
716        } catch (InterruptedException e) {
717          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
718          Thread.currentThread().interrupt();
719        }
720        // If worker still is alive after waiting, interrupt it
721        if (worker.isAlive()) {
722          worker.interrupt();
723        }
724        // If entry reader is alive after waiting, interrupt it
725        if (worker.entryReader.isAlive()) {
726          worker.entryReader.interrupt();
727        }
728      }
729      if (!server.isAborted() && !server.isStopped()) {
730        // If server is running and worker is already stopped but there was still entries batched,
731        // we need to clear buffer used for non processed entries
732        worker.clearWALEntryBatch();
733      }
734    }
735
736    if (join) {
737      for (ReplicationSourceShipper worker : workers) {
738        Threads.shutdown(worker, this.sleepForRetries);
739        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
740      }
741      if (this.replicationEndpoint != null) {
742        try {
743          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
744            TimeUnit.MILLISECONDS);
745        } catch (TimeoutException te) {
746          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
747            + "for replication source : {}", logPeerId(), this.queueId, te);
748        }
749      }
750    }
751
752    // Can be null in test context.
753    if (this.metrics != null) {
754      if (clearMetrics) {
755        this.metrics.clear();
756      } else {
757        this.metrics.terminate();
758      }
759    }
760  }
761
762  @Override
763  public ReplicationQueueId getQueueId() {
764    return this.queueId;
765  }
766
767  @Override
768  public Path getCurrentPath() {
769    // only for testing
770    for (ReplicationSourceShipper worker : workerThreads.values()) {
771      if (worker.getCurrentPath() != null) {
772        return worker.getCurrentPath();
773      }
774    }
775    return null;
776  }
777
778  @Override
779  public boolean isSourceActive() {
780    return !this.server.isStopped() && this.sourceRunning;
781  }
782
783  public boolean isWorkerRunning() {
784    for (ReplicationSourceShipper worker : this.workerThreads.values()) {
785      if (worker.isActive()) {
786        return worker.isActive();
787      }
788    }
789    return false;
790  }
791
792  @Override
793  public String getStats() {
794    StringBuilder sb = new StringBuilder();
795    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
796      .append(", current progress: \n");
797    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
798      String walGroupId = entry.getKey();
799      ReplicationSourceShipper worker = entry.getValue();
800      long position = worker.getCurrentPosition();
801      Path currentPath = worker.getCurrentPath();
802      sb.append("walGroup [").append(walGroupId).append("]: ");
803      if (currentPath != null) {
804        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
805          .append(position).append("\n");
806      } else {
807        sb.append("no replication ongoing, waiting for new log").append("\n");
808      }
809    }
810    return sb.toString();
811  }
812
813  @Override
814  public MetricsSource getSourceMetrics() {
815    return this.metrics;
816  }
817
818  @Override
819  // offsets totalBufferUsed by deducting shipped batchSize.
820  public void postShipEdits(List<Entry> entries, long batchSize) {
821    if (throttler.isEnabled()) {
822      throttler.addPushSize(batchSize);
823    }
824    totalReplicatedEdits.addAndGet(entries.size());
825    this.manager.releaseBufferQuota(batchSize);
826  }
827
828  @Override
829  public WALFileLengthProvider getWALFileLengthProvider() {
830    return walFileLengthProvider;
831  }
832
833  @Override
834  public ServerName getServerWALsBelongTo() {
835    return queueId.getServerWALsBelongTo();
836  }
837
838  @Override
839  public ReplicationPeer getPeer() {
840    return replicationPeer;
841  }
842
843  Server getServer() {
844    return server;
845  }
846
847  @Override
848  public ReplicationQueueStorage getReplicationQueueStorage() {
849    return queueStorage;
850  }
851
852  void removeWorker(ReplicationSourceShipper worker) {
853    workerThreads.remove(worker.walGroupId, worker);
854  }
855
856  public String logPeerId() {
857    return "peerId=" + this.getPeerId() + ",";
858  }
859
860  // Visible for testing purpose
861  public long getTotalReplicatedEdits() {
862    return totalReplicatedEdits.get();
863  }
864}