001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
021
022import com.google.errorprone.annotations.RestrictedApi;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.PriorityBlockingQueue;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.AtomicLong;
039import java.util.function.Predicate;
040import org.apache.commons.lang3.StringUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableDescriptors;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.regionserver.HRegionServer;
051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
054import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationException;
056import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
057import org.apache.hadoop.hbase.replication.ReplicationPeer;
058import org.apache.hadoop.hbase.replication.ReplicationQueueData;
059import org.apache.hadoop.hbase.replication.ReplicationQueueId;
060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
061import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
062import org.apache.hadoop.hbase.replication.WALEntryFilter;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.OOMEChecker;
065import org.apache.hadoop.hbase.util.Pair;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
068import org.apache.hadoop.hbase.wal.WAL.Entry;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075
076/**
077 * Class that handles the source of a replication stream. Currently does not handle more than 1
078 * slave cluster. For each slave cluster it selects a random number of peers using a replication
079 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
080 * be selected.
081 * <p>
082 * A stream is considered down when we cannot contact a region server on the peer cluster for more
083 * than 55 seconds by default.
084 * </p>
085 */
086@InterfaceAudience.Private
087public class ReplicationSource implements ReplicationSourceInterface {
088
089  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
090  // per group queue size, keep no more than this number of logs in each wal group
091  protected int queueSizePerGroup;
092  protected ReplicationSourceLogQueue logQueue;
093  protected ReplicationQueueStorage queueStorage;
094  protected ReplicationPeer replicationPeer;
095
096  protected Configuration conf;
097
098  // The manager of all sources to which we ping back our progress
099  protected ReplicationSourceManager manager;
100  // Should we stop everything?
101  protected Server server;
102  // How long should we sleep for each retry
103  private long sleepForRetries;
104  protected FileSystem fs;
105  // id of this cluster
106  private UUID clusterId;
107  // total number of edits we replicated
108  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
109  // The id of the replication queue
110  protected ReplicationQueueId queueId;
111  // The start offsets. Usually only recovered replication queue needs this, but probably when we
112  // update the peer config and restart the replication peer, we also need this?
113  protected ImmutableMap<String, ReplicationGroupOffset> startOffsets;
114  // Maximum number of retries before taking bold actions
115  private int maxRetriesMultiplier;
116  // Indicates if this particular source is running
117  volatile boolean sourceRunning = false;
118  // Metrics for this source
119  private MetricsSource metrics;
120  // ReplicationEndpoint which will handle the actual replication
121  private volatile ReplicationEndpoint replicationEndpoint;
122
123  private boolean abortOnError;
124  // This is needed for the startup loop to identify when there's already
125  // an initialization happening (but not finished yet),
126  // so that it doesn't try submit another initialize thread.
127  // NOTE: this should only be set to false at the end of initialize method, prior to return.
128  private AtomicBoolean startupOngoing = new AtomicBoolean(false);
129  // Flag that signalizes uncaught error happening while starting up the source
130  // and a retry should be attempted
131  private AtomicBoolean retryStartup = new AtomicBoolean(false);
132
133  /**
134   * A filter (or a chain of filters) for WAL entries; filters out edits.
135   */
136  protected volatile WALEntryFilter walEntryFilter;
137
138  // throttler
139  private ReplicationThrottler throttler;
140  private long defaultBandwidth;
141  private long currentBandwidth;
142  private WALFileLengthProvider walFileLengthProvider;
143  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
144    new ConcurrentHashMap<>();
145
146  public static final String WAIT_ON_ENDPOINT_SECONDS =
147    "hbase.replication.wait.on.endpoint.seconds";
148  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
149  private int waitOnEndpointSeconds = -1;
150
151  private Thread initThread;
152
153  /**
154   * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to
155   * skip.
156   */
157  private final Predicate<Path> filterInWALs;
158
159  /**
160   * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we
161   * do not want replicated, passed on to replication endpoints. This is the basic set. Down in
162   * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are
163   * put after those that we pick up from the configured endpoints and other machinations to create
164   * the final {@link #walEntryFilter}.
165   * @see WALEntryFilter
166   */
167  private final List<WALEntryFilter> baseFilterOutWALEntries;
168
169  ReplicationSource() {
170    // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
171    this(p -> !AbstractFSWALProvider.isMetaFile(p),
172      Lists.newArrayList(new SystemTableWALEntryFilter()));
173  }
174
175  /**
176   * @param replicateWAL            Pass a filter to run against WAL Path; filter *in* WALs to
177   *                                Replicate; i.e. return 'true' if you want to replicate the
178   *                                content of the WAL.
179   * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
180   *                                WALEntries so they never make it out of this ReplicationSource.
181   */
182  ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
183    this.filterInWALs = replicateWAL;
184    this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
185  }
186
187  /**
188   * Instantiation method used by region servers
189   * @param conf      configuration to use
190   * @param fs        file system to use
191   * @param manager   replication manager to ping to
192   * @param server    the server for this region server
193   * @param queueData the id and offsets of our replication queue
194   * @param clusterId unique UUID for the cluster
195   * @param metrics   metrics for replication source
196   */
197  @Override
198  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
199    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
200    ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
201    MetricsSource metrics) throws IOException {
202    this.server = server;
203    this.conf = HBaseConfiguration.create(conf);
204    this.waitOnEndpointSeconds =
205      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
206    decorateConf();
207    // 1 second
208    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
209    // 5 minutes @ 1 sec per
210    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
211    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
212    this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
213    this.queueStorage = queueStorage;
214    this.replicationPeer = replicationPeer;
215    this.manager = manager;
216    this.fs = fs;
217    this.metrics = metrics;
218    this.clusterId = clusterId;
219
220    this.queueId = queueData.getId();
221    this.startOffsets = queueData.getOffsets();
222
223    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
224    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
225    currentBandwidth = getCurrentBandwidth();
226    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
227    this.walFileLengthProvider = walFileLengthProvider;
228
229    this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
230
231    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
232      replicationPeer.getId(), this.currentBandwidth);
233  }
234
235  private void decorateConf() {
236    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
237    if (StringUtils.isNotEmpty(replicationCodec)) {
238      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
239    }
240  }
241
242  @Override
243  public void enqueueLog(Path wal) {
244    if (!this.filterInWALs.test(wal)) {
245      LOG.trace("NOT replicating {}", wal);
246      return;
247    }
248    // Use WAL prefix as the WALGroupId for this peer.
249    String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
250    boolean queueExists = logQueue.enqueueLog(wal, walGroupId);
251
252    if (!queueExists) {
253      if (this.isSourceActive() && this.walEntryFilter != null) {
254        // new wal group observed after source startup, start a new worker thread to track it
255        // notice: it's possible that wal enqueued when this.running is set but worker thread
256        // still not launched, so it's necessary to check workerThreads before start the worker
257        tryStartNewShipper(walGroupId);
258      }
259    }
260    if (LOG.isTraceEnabled()) {
261      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walGroupId, queueId);
262    }
263  }
264
265  @RestrictedApi(explanation = "Should only be called in tests", link = "",
266      allowedOnPath = ".*/src/test/.*")
267  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
268    return logQueue.getQueues();
269  }
270
271  @Override
272  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
273    throws ReplicationException {
274    String peerId = replicationPeer.getId();
275    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
276      this.queueStorage.addHFileRefs(peerId, pairs);
277      metrics.incrSizeOfHFileRefsQueue(pairs.size());
278    } else {
279      LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
280        tableName, Bytes.toString(family), peerId);
281    }
282  }
283
284  private ReplicationEndpoint createReplicationEndpoint()
285    throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
286    RegionServerCoprocessorHost rsServerHost = null;
287    if (server instanceof HRegionServer) {
288      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
289    }
290    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
291
292    ReplicationEndpoint replicationEndpoint;
293    if (replicationEndpointImpl == null) {
294      // Default to HBase inter-cluster replication endpoint; skip reflection
295      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
296    } else {
297      try {
298        replicationEndpoint = Class.forName(replicationEndpointImpl)
299          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
300      } catch (NoSuchMethodException | InvocationTargetException e) {
301        throw new IllegalArgumentException(e);
302      }
303    }
304    if (rsServerHost != null) {
305      ReplicationEndpoint newReplicationEndPoint =
306        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
307      if (newReplicationEndPoint != null) {
308        // Override the newly created endpoint from the hook with configured end point
309        replicationEndpoint = newReplicationEndPoint;
310      }
311    }
312    return replicationEndpoint;
313  }
314
315  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
316    throws IOException, TimeoutException {
317    TableDescriptors tableDescriptors = null;
318    if (server instanceof HRegionServer) {
319      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
320    }
321    replicationEndpoint
322      .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
323        replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
324    replicationEndpoint.start();
325    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
326  }
327
328  private void initializeWALEntryFilter(UUID peerClusterId) {
329    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
330    List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
331    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
332    if (filterFromEndpoint != null) {
333      filters.add(filterFromEndpoint);
334    }
335    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
336    this.walEntryFilter = new ChainWALEntryFilter(filters);
337    this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial());
338  }
339
340  private long getStartOffset(String walGroupId) {
341    ReplicationGroupOffset startOffset = startOffsets.get(walGroupId);
342    if (startOffset == null || startOffset == ReplicationGroupOffset.BEGIN) {
343      return 0L;
344    }
345    // this method will only be called when start new shipper, and we will only start new shipper
346    // when there is a new queue, so here the queue for walGroupId will never be null.
347    Path first = logQueue.getQueue(walGroupId).peek();
348    if (!startOffset.getWal().equals(first.getName())) {
349      return 0L;
350    }
351    // Usually, if we arrive here, the start offset should never be -1, as it means this file has
352    // been fully replicated so we should have filtered it out in upper layer, usually in
353    // ReplicationSourceManager. Add a warn message for safety, as usually replicate more data will
354    // not cause big problems.
355    if (startOffset.getOffset() < 0) {
356      LOG.warn("Should have already replicated wal {}, return start offset as 0",
357        startOffset.getWal());
358      return 0L;
359    } else {
360      return startOffset.getOffset();
361    }
362  }
363
364  protected final ReplicationSourceShipper createNewShipper(String walGroupId) {
365    ReplicationSourceWALReader walReader =
366      createNewWALReader(walGroupId, getStartOffset(walGroupId));
367    ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
368    Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
369      + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
370    return worker;
371  }
372
373  protected final void startShipper(ReplicationSourceShipper worker) {
374    worker.startup(this::retryRefreshing);
375  }
376
377  private void tryStartNewShipper(String walGroupId) {
378    workerThreads.compute(walGroupId, (key, value) -> {
379      if (value != null) {
380        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
381        return value;
382      } else {
383        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
384        ReplicationSourceShipper worker = createNewShipper(walGroupId);
385        startShipper(worker);
386        return worker;
387      }
388    });
389  }
390
391  @Override
392  public Map<String, ReplicationStatus> getWalGroupStatus() {
393    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
394    long ageOfLastShippedOp, replicationDelay, fileSize;
395    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
396      String walGroupId = walGroupShipper.getKey();
397      ReplicationSourceShipper shipper = walGroupShipper.getValue();
398      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
399      int queueSize = logQueue.getQueueSize(walGroupId);
400      replicationDelay = metrics.getReplicationDelay();
401      Path currentPath = shipper.getCurrentPath();
402      fileSize = -1;
403      if (currentPath != null) {
404        try {
405          fileSize = getFileSize(currentPath);
406        } catch (IOException e) {
407          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
408        }
409      } else {
410        currentPath = new Path("NO_LOGS_IN_QUEUE");
411        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
412      }
413      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
414      statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId)
415        .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition())
416        .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp)
417        .withReplicationDelay(replicationDelay);
418      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
419    }
420    return sourceReplicationStatus;
421  }
422
423  private long getFileSize(Path currentPath) throws IOException {
424    long fileSize;
425    try {
426      fileSize = fs.getContentSummary(currentPath).getLength();
427    } catch (FileNotFoundException e) {
428      Path archivedLogPath = findArchivedLog(currentPath, conf);
429      // archivedLogPath can be null if unable to locate in archiveDir.
430      if (archivedLogPath == null) {
431        throw new FileNotFoundException("Couldn't find path: " + currentPath);
432      }
433      fileSize = fs.getContentSummary(archivedLogPath).getLength();
434    }
435    return fileSize;
436  }
437
438  protected ReplicationSourceShipper createNewShipper(String walGroupId,
439    ReplicationSourceWALReader walReader) {
440    return new ReplicationSourceShipper(conf, walGroupId, this, walReader);
441  }
442
443  private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
444    return replicationPeer.getPeerConfig().isSerial()
445      ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
446        this, walGroupId)
447      : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this,
448        walGroupId);
449  }
450
451  /**
452   * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
453   * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
454   */
455  WALEntryFilter getWalEntryFilter() {
456    return walEntryFilter;
457  }
458
459  // log the error, check if the error is OOME, or whether we should abort the server
460  private void checkError(Thread t, Throwable error) {
461    OOMEChecker.exitIfOOME(error, getClass().getSimpleName());
462    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error);
463    if (abortOnError) {
464      server.abort("Unexpected exception in " + t.getName(), error);
465    }
466  }
467
468  private void retryRefreshing(Thread t, Throwable error) {
469    checkError(t, error);
470    while (true) {
471      if (server.isAborted() || server.isStopped() || server.isStopping()) {
472        LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId());
473        return;
474      }
475      try {
476        LOG.info("Refreshing replication sources now due to previous error on thread: {}",
477          t.getName());
478        manager.refreshSources(getPeerId());
479        break;
480      } catch (Exception e) {
481        LOG.error("Replication sources refresh failed.", e);
482        sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
483      }
484    }
485  }
486
487  @Override
488  public ReplicationEndpoint getReplicationEndpoint() {
489    return this.replicationEndpoint;
490  }
491
492  @Override
493  public ReplicationSourceManager getSourceManager() {
494    return this.manager;
495  }
496
497  @Override
498  public void tryThrottle(int batchSize) throws InterruptedException {
499    checkBandwidthChangeAndResetThrottler();
500    if (throttler.isEnabled()) {
501      long sleepTicks = throttler.getNextSleepInterval(batchSize);
502      if (sleepTicks > 0) {
503        if (LOG.isTraceEnabled()) {
504          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
505        }
506        Thread.sleep(sleepTicks);
507        // reset throttler's cycle start tick when sleep for throttling occurs
508        throttler.resetStartTick();
509      }
510    }
511  }
512
513  private void checkBandwidthChangeAndResetThrottler() {
514    long peerBandwidth = getCurrentBandwidth();
515    if (peerBandwidth != currentBandwidth) {
516      currentBandwidth = peerBandwidth;
517      throttler.setBandwidth((double) currentBandwidth / 10.0);
518      LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}",
519        replicationPeer.getId(), currentBandwidth);
520    }
521  }
522
523  private long getCurrentBandwidth() {
524    long peerBandwidth = replicationPeer.getPeerBandwidth();
525    // User can set peer bandwidth to 0 to use default bandwidth.
526    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
527  }
528
529  /**
530   * Do the sleeping logic
531   * @param msg             Why we sleep
532   * @param sleepMultiplier by how many times the default sleeping time is augmented
533   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
534   */
535  private boolean sleepForRetries(String msg, int sleepMultiplier) {
536    try {
537      if (LOG.isTraceEnabled()) {
538        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
539          sleepMultiplier);
540      }
541      Thread.sleep(this.sleepForRetries * sleepMultiplier);
542    } catch (InterruptedException e) {
543      if (LOG.isDebugEnabled()) {
544        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
545      }
546      Thread.currentThread().interrupt();
547    }
548    return sleepMultiplier < maxRetriesMultiplier;
549  }
550
551  private void initialize() {
552    int sleepMultiplier = 1;
553    while (this.isSourceActive()) {
554      ReplicationEndpoint replicationEndpoint;
555      try {
556        replicationEndpoint = createReplicationEndpoint();
557      } catch (Exception e) {
558        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
559        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
560          sleepMultiplier++;
561        }
562        continue;
563      }
564
565      try {
566        initAndStartReplicationEndpoint(replicationEndpoint);
567        this.replicationEndpoint = replicationEndpoint;
568        break;
569      } catch (Exception e) {
570        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
571        replicationEndpoint.stop();
572        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
573          sleepMultiplier++;
574        } else {
575          retryStartup.set(!this.abortOnError);
576          setSourceStartupStatus(false);
577          throw new RuntimeException("Exhausted retries to start replication endpoint.");
578        }
579      }
580    }
581
582    if (!this.isSourceActive()) {
583      // this means the server is shutting down or the source is terminated, just give up
584      // initializing
585      setSourceStartupStatus(false);
586      return;
587    }
588
589    sleepMultiplier = 1;
590    UUID peerClusterId;
591    // delay this until we are in an asynchronous thread
592    for (;;) {
593      peerClusterId = replicationEndpoint.getPeerUUID();
594      if (this.isSourceActive() && peerClusterId == null) {
595        if (LOG.isDebugEnabled()) {
596          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
597            (this.sleepForRetries * sleepMultiplier));
598        }
599        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
600          sleepMultiplier++;
601        }
602      } else {
603        break;
604      }
605    }
606
607    if (!this.isSourceActive()) {
608      // this means the server is shutting down or the source is terminated, just give up
609      // initializing
610      setSourceStartupStatus(false);
611      return;
612    }
613
614    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(),
615      queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
616    initializeWALEntryFilter(peerClusterId);
617    // Start workers
618    startShippers();
619    setSourceStartupStatus(false);
620  }
621
622  protected void startShippers() {
623    for (String walGroupId : logQueue.getQueues().keySet()) {
624      tryStartNewShipper(walGroupId);
625    }
626  }
627
628  private synchronized void setSourceStartupStatus(boolean initializing) {
629    startupOngoing.set(initializing);
630    if (initializing) {
631      metrics.incrSourceInitializing();
632    } else {
633      metrics.decrSourceInitializing();
634    }
635  }
636
637  @Override
638  public ReplicationSourceInterface startup() {
639    if (this.sourceRunning) {
640      return this;
641    }
642    this.sourceRunning = true;
643    setSourceStartupStatus(true);
644    initThread = new Thread(this::initialize);
645    Threads.setDaemonThreadRunning(initThread,
646      Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> {
647        // if first initialization attempt failed, and abortOnError is false, we will
648        // keep looping in this thread until initialize eventually succeeds,
649        // while the server main startup one can go on with its work.
650        sourceRunning = false;
651        checkError(t, e);
652        retryStartup.set(!this.abortOnError);
653        do {
654          if (retryStartup.get()) {
655            this.sourceRunning = true;
656            setSourceStartupStatus(true);
657            retryStartup.set(false);
658            try {
659              initialize();
660            } catch (Throwable error) {
661              setSourceStartupStatus(false);
662              checkError(t, error);
663              retryStartup.set(!this.abortOnError);
664            }
665          }
666        } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
667      });
668    return this;
669  }
670
671  @Override
672  public void terminate(String reason) {
673    terminate(reason, null);
674  }
675
676  @Override
677  public void terminate(String reason, Exception cause) {
678    terminate(reason, cause, true);
679  }
680
681  @Override
682  public void terminate(String reason, Exception cause, boolean clearMetrics) {
683    terminate(reason, cause, clearMetrics, true);
684  }
685
686  private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
687    if (cause == null) {
688      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
689    } else {
690      LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(),
691        this.queueId, reason), cause);
692    }
693    this.sourceRunning = false;
694    if (initThread != null && Thread.currentThread() != initThread) {
695      // This usually won't happen but anyway, let's wait until the initialization thread exits.
696      // And notice that we may call terminate directly from the initThread so here we need to
697      // avoid join on ourselves.
698      initThread.interrupt();
699      Threads.shutdown(initThread, this.sleepForRetries);
700    }
701    Collection<ReplicationSourceShipper> workers = workerThreads.values();
702
703    for (ReplicationSourceShipper worker : workers) {
704      worker.stopWorker();
705      worker.entryReader.setReaderRunning(false);
706    }
707
708    if (this.replicationEndpoint != null) {
709      this.replicationEndpoint.stop();
710    }
711
712    for (ReplicationSourceShipper worker : workers) {
713      if (worker.isAlive() || worker.entryReader.isAlive()) {
714        try {
715          // Wait worker to stop
716          Thread.sleep(this.sleepForRetries);
717        } catch (InterruptedException e) {
718          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
719          Thread.currentThread().interrupt();
720        }
721        // If worker still is alive after waiting, interrupt it
722        if (worker.isAlive()) {
723          worker.interrupt();
724        }
725        // If entry reader is alive after waiting, interrupt it
726        if (worker.entryReader.isAlive()) {
727          worker.entryReader.interrupt();
728        }
729      }
730      if (!server.isAborted() && !server.isStopped()) {
731        // If server is running and worker is already stopped but there was still entries batched,
732        // we need to clear buffer used for non processed entries
733        worker.clearWALEntryBatch();
734      }
735    }
736
737    if (join) {
738      for (ReplicationSourceShipper worker : workers) {
739        Threads.shutdown(worker, this.sleepForRetries);
740        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
741      }
742      if (this.replicationEndpoint != null) {
743        try {
744          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
745            TimeUnit.MILLISECONDS);
746        } catch (TimeoutException te) {
747          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
748            + "for replication source : {}", logPeerId(), this.queueId, te);
749        }
750      }
751    }
752
753    // Can be null in test context.
754    if (this.metrics != null) {
755      if (clearMetrics) {
756        this.metrics.clear();
757      } else {
758        this.metrics.terminate();
759      }
760    }
761  }
762
763  @Override
764  public ReplicationQueueId getQueueId() {
765    return this.queueId;
766  }
767
768  @Override
769  public Path getCurrentPath() {
770    // only for testing
771    for (ReplicationSourceShipper worker : workerThreads.values()) {
772      if (worker.getCurrentPath() != null) {
773        return worker.getCurrentPath();
774      }
775    }
776    return null;
777  }
778
779  @Override
780  public boolean isSourceActive() {
781    return !this.server.isStopped() && this.sourceRunning;
782  }
783
784  public boolean isWorkerRunning() {
785    for (ReplicationSourceShipper worker : this.workerThreads.values()) {
786      if (worker.isActive()) {
787        return worker.isActive();
788      }
789    }
790    return false;
791  }
792
793  @Override
794  public String getStats() {
795    StringBuilder sb = new StringBuilder();
796    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
797      .append(", current progress: \n");
798    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
799      String walGroupId = entry.getKey();
800      ReplicationSourceShipper worker = entry.getValue();
801      long position = worker.getCurrentPosition();
802      Path currentPath = worker.getCurrentPath();
803      sb.append("walGroup [").append(walGroupId).append("]: ");
804      if (currentPath != null) {
805        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
806          .append(position).append("\n");
807      } else {
808        sb.append("no replication ongoing, waiting for new log").append("\n");
809      }
810    }
811    return sb.toString();
812  }
813
814  @Override
815  public MetricsSource getSourceMetrics() {
816    return this.metrics;
817  }
818
819  @Override
820  // offsets totalBufferUsed by deducting shipped batchSize.
821  public void postShipEdits(List<Entry> entries, long batchSize) {
822    if (throttler.isEnabled()) {
823      throttler.addPushSize(batchSize);
824    }
825    totalReplicatedEdits.addAndGet(entries.size());
826    this.manager.releaseBufferQuota(batchSize);
827  }
828
829  @Override
830  public WALFileLengthProvider getWALFileLengthProvider() {
831    return walFileLengthProvider;
832  }
833
834  @Override
835  public ServerName getServerWALsBelongTo() {
836    return queueId.getServerWALsBelongTo();
837  }
838
839  @Override
840  public ReplicationPeer getPeer() {
841    return replicationPeer;
842  }
843
844  Server getServer() {
845    return server;
846  }
847
848  @Override
849  public ReplicationQueueStorage getReplicationQueueStorage() {
850    return queueStorage;
851  }
852
853  void removeWorker(ReplicationSourceShipper worker) {
854    workerThreads.remove(worker.walGroupId, worker);
855  }
856
857  public String logPeerId() {
858    return "peerId=" + this.getPeerId() + ",";
859  }
860
861  // Visible for testing purpose
862  public long getTotalReplicatedEdits() {
863    return totalReplicatedEdits.get();
864  }
865}