001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.UUID;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.PriorityBlockingQueue;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.function.Predicate;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableDescriptors;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.regionserver.RSRpcServices;
051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
054import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationException;
056import org.apache.hadoop.hbase.replication.ReplicationPeer;
057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
071
072/**
073 * Class that handles the source of a replication stream. Currently does not handle more than 1
074 * slave cluster. For each slave cluster it selects a random number of peers using a replication
075 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
076 * be selected.
077 * <p>
078 * A stream is considered down when we cannot contact a region server on the peer cluster for more
079 * than 55 seconds by default.
080 * </p>
081 */
082@InterfaceAudience.Private
083public class ReplicationSource implements ReplicationSourceInterface {
084
085  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
086  // per group queue size, keep no more than this number of logs in each wal group
087  protected int queueSizePerGroup;
088  protected ReplicationSourceLogQueue logQueue;
089  protected ReplicationQueueStorage queueStorage;
090  protected ReplicationPeer replicationPeer;
091
092  protected Configuration conf;
093  protected ReplicationQueueInfo replicationQueueInfo;
094  // id of the peer cluster this source replicates to
095  private String peerId;
096
097  // The manager of all sources to which we ping back our progress
098  protected ReplicationSourceManager manager;
099  // Should we stop everything?
100  protected Server server;
101  // How long should we sleep for each retry
102  private long sleepForRetries;
103  protected FileSystem fs;
104  // id of this cluster
105  private UUID clusterId;
106  // total number of edits we replicated
107  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
108  // The znode we currently play with
109  protected String queueId;
110  // Maximum number of retries before taking bold actions
111  private int maxRetriesMultiplier;
112  // Indicates if this particular source is running
113  volatile boolean sourceRunning = false;
114  // Metrics for this source
115  private MetricsSource metrics;
116  // ReplicationEndpoint which will handle the actual replication
117  private volatile ReplicationEndpoint replicationEndpoint;
118
119  private boolean abortOnError;
120  // This is needed for the startup loop to identify when there's already
121  // an initialization happening (but not finished yet),
122  // so that it doesn't try submit another initialize thread.
123  // NOTE: this should only be set to false at the end of initialize method, prior to return.
124  private AtomicBoolean startupOngoing = new AtomicBoolean(false);
125  // Flag that signalizes uncaught error happening while starting up the source
126  // and a retry should be attempted
127  private AtomicBoolean retryStartup = new AtomicBoolean(false);
128
129  /**
130   * A filter (or a chain of filters) for WAL entries; filters out edits.
131   */
132  protected volatile WALEntryFilter walEntryFilter;
133
134  // throttler
135  private ReplicationThrottler throttler;
136  private long defaultBandwidth;
137  private long currentBandwidth;
138  private WALFileLengthProvider walFileLengthProvider;
139  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
140    new ConcurrentHashMap<>();
141
142  private AtomicLong totalBufferUsed;
143
144  public static final String WAIT_ON_ENDPOINT_SECONDS =
145    "hbase.replication.wait.on.endpoint.seconds";
146  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
147  private int waitOnEndpointSeconds = -1;
148
149  private Thread initThread;
150
151  /**
152   * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to
153   * skip.
154   */
155  private final Predicate<Path> filterInWALs;
156
157  /**
158   * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we
159   * do not want replicated, passed on to replication endpoints. This is the basic set. Down in
160   * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are
161   * put after those that we pick up from the configured endpoints and other machinations to create
162   * the final {@link #walEntryFilter}.
163   * @see WALEntryFilter
164   */
165  private final List<WALEntryFilter> baseFilterOutWALEntries;
166
167  ReplicationSource() {
168    // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
169    this(p -> !AbstractFSWALProvider.isMetaFile(p),
170      Lists.newArrayList(new SystemTableWALEntryFilter()));
171  }
172
173  /**
174   * @param replicateWAL            Pass a filter to run against WAL Path; filter *in* WALs to
175   *                                Replicate; i.e. return 'true' if you want to replicate the
176   *                                content of the WAL.
177   * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
178   *                                WALEntries so they never make it out of this ReplicationSource.
179   */
180  ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
181    this.filterInWALs = replicateWAL;
182    this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
183  }
184
185  /**
186   * Instantiation method used by region servers
187   * @param conf      configuration to use
188   * @param fs        file system to use
189   * @param manager   replication manager to ping to
190   * @param server    the server for this region server
191   * @param queueId   the id of our replication queue
192   * @param clusterId unique UUID for the cluster
193   * @param metrics   metrics for replication source
194   */
195  @Override
196  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
197    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
198    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
199    MetricsSource metrics) throws IOException {
200    this.server = server;
201    this.conf = HBaseConfiguration.create(conf);
202    this.waitOnEndpointSeconds =
203      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
204    decorateConf();
205    // 1 second
206    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
207    // 5 minutes @ 1 sec per
208    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
209    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
210    this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
211    this.queueStorage = queueStorage;
212    this.replicationPeer = replicationPeer;
213    this.manager = manager;
214    this.fs = fs;
215    this.metrics = metrics;
216    this.clusterId = clusterId;
217
218    this.queueId = queueId;
219    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
220    // ReplicationQueueInfo parses the peerId out of the znode for us
221    this.peerId = this.replicationQueueInfo.getPeerId();
222
223    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
224    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
225    currentBandwidth = getCurrentBandwidth();
226    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
227    this.totalBufferUsed = manager.getTotalBufferUsed();
228    this.walFileLengthProvider = walFileLengthProvider;
229
230    this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
231
232    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
233      replicationPeer.getId(), this.currentBandwidth);
234  }
235
236  private void decorateConf() {
237    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
238    if (StringUtils.isNotEmpty(replicationCodec)) {
239      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
240    }
241  }
242
243  @Override
244  public void enqueueLog(Path wal) {
245    if (!this.filterInWALs.test(wal)) {
246      LOG.trace("NOT replicating {}", wal);
247      return;
248    }
249    // Use WAL prefix as the WALGroupId for this peer.
250    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
251    boolean queueExists = logQueue.enqueueLog(wal, walPrefix);
252
253    if (!queueExists) {
254      if (this.isSourceActive() && this.walEntryFilter != null) {
255        // new wal group observed after source startup, start a new worker thread to track it
256        // notice: it's possible that wal enqueued when this.running is set but worker thread
257        // still not launched, so it's necessary to check workerThreads before start the worker
258        tryStartNewShipper(walPrefix);
259      }
260    }
261    if (LOG.isTraceEnabled()) {
262      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
263        this.replicationQueueInfo.getQueueId());
264    }
265  }
266
267  @InterfaceAudience.Private
268  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
269    return logQueue.getQueues();
270  }
271
272  @Override
273  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
274    throws ReplicationException {
275    String peerId = replicationPeer.getId();
276    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
277      this.queueStorage.addHFileRefs(peerId, pairs);
278      metrics.incrSizeOfHFileRefsQueue(pairs.size());
279    } else {
280      LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
281        tableName, Bytes.toString(family), peerId);
282    }
283  }
284
285  private ReplicationEndpoint createReplicationEndpoint()
286    throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
287    RegionServerCoprocessorHost rsServerHost = null;
288    if (server instanceof HRegionServer) {
289      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
290    }
291    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
292
293    ReplicationEndpoint replicationEndpoint;
294    if (replicationEndpointImpl == null) {
295      // Default to HBase inter-cluster replication endpoint; skip reflection
296      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
297    } else {
298      try {
299        replicationEndpoint = Class.forName(replicationEndpointImpl)
300          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
301      } catch (NoSuchMethodException | InvocationTargetException e) {
302        throw new IllegalArgumentException(e);
303      }
304    }
305    if (rsServerHost != null) {
306      ReplicationEndpoint newReplicationEndPoint =
307        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
308      if (newReplicationEndPoint != null) {
309        // Override the newly created endpoint from the hook with configured end point
310        replicationEndpoint = newReplicationEndPoint;
311      }
312    }
313    return replicationEndpoint;
314  }
315
316  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
317    throws IOException, TimeoutException {
318    TableDescriptors tableDescriptors = null;
319    if (server instanceof HRegionServer) {
320      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
321    }
322    replicationEndpoint
323      .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
324        clusterId, replicationPeer, metrics, tableDescriptors, server));
325    replicationEndpoint.start();
326    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
327  }
328
329  private void initializeWALEntryFilter(UUID peerClusterId) {
330    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
331    List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
332    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
333    if (filterFromEndpoint != null) {
334      filters.add(filterFromEndpoint);
335    }
336    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
337    this.walEntryFilter = new ChainWALEntryFilter(filters);
338  }
339
340  private void tryStartNewShipper(String walGroupId) {
341    workerThreads.compute(walGroupId, (key, value) -> {
342      if (value != null) {
343        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
344        return value;
345      } else {
346        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
347        ReplicationSourceShipper worker = createNewShipper(walGroupId);
348        ReplicationSourceWALReader walReader =
349          createNewWALReader(walGroupId, worker.getStartPosition());
350        Threads.setDaemonThreadRunning(
351          walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
352            + walGroupId + "," + queueId,
353          (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
354        worker.setWALReader(walReader);
355        worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
356        return worker;
357      }
358    });
359  }
360
361  @Override
362  public Map<String, ReplicationStatus> getWalGroupStatus() {
363    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
364    long ageOfLastShippedOp, replicationDelay, fileSize;
365    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
366      String walGroupId = walGroupShipper.getKey();
367      ReplicationSourceShipper shipper = walGroupShipper.getValue();
368      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
369      int queueSize = logQueue.getQueueSize(walGroupId);
370      replicationDelay = metrics.getReplicationDelay();
371      Path currentPath = shipper.getCurrentPath();
372      fileSize = -1;
373      if (currentPath != null) {
374        try {
375          fileSize = getFileSize(currentPath);
376        } catch (IOException e) {
377          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
378        }
379      } else {
380        currentPath = new Path("NO_LOGS_IN_QUEUE");
381        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
382      }
383      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
384      statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId)
385        .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition())
386        .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp)
387        .withReplicationDelay(replicationDelay);
388      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
389    }
390    return sourceReplicationStatus;
391  }
392
393  private long getFileSize(Path currentPath) throws IOException {
394    long fileSize;
395    try {
396      fileSize = fs.getContentSummary(currentPath).getLength();
397    } catch (FileNotFoundException e) {
398      Path archivedLogPath = findArchivedLog(currentPath, conf);
399      // archivedLogPath can be null if unable to locate in archiveDir.
400      if (archivedLogPath == null) {
401        throw new FileNotFoundException("Couldn't find path: " + currentPath);
402      }
403      fileSize = fs.getContentSummary(archivedLogPath).getLength();
404    }
405    return fileSize;
406  }
407
408  protected ReplicationSourceShipper createNewShipper(String walGroupId) {
409    return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
410  }
411
412  private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
413    return replicationPeer.getPeerConfig().isSerial()
414      ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
415        this, walGroupId)
416      : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this,
417        walGroupId);
418  }
419
420  /**
421   * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
422   * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
423   */
424  WALEntryFilter getWalEntryFilter() {
425    return walEntryFilter;
426  }
427
428  protected final void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager,
429    String peerId) {
430    RSRpcServices.exitIfOOME(e);
431    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e);
432    if (abortOnError) {
433      server.abort("Unexpected exception in " + t.getName(), e);
434    }
435    if (manager != null) {
436      while (true) {
437        try {
438          LOG.info("Refreshing replication sources now due to previous error on thread: {}",
439            t.getName());
440          manager.refreshSources(peerId);
441          break;
442        } catch (IOException e1) {
443          LOG.error("Replication sources refresh failed.", e1);
444          sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
445        }
446      }
447    }
448  }
449
450  @Override
451  public ReplicationEndpoint getReplicationEndpoint() {
452    return this.replicationEndpoint;
453  }
454
455  @Override
456  public ReplicationSourceManager getSourceManager() {
457    return this.manager;
458  }
459
460  @Override
461  public void tryThrottle(int batchSize) throws InterruptedException {
462    checkBandwidthChangeAndResetThrottler();
463    if (throttler.isEnabled()) {
464      long sleepTicks = throttler.getNextSleepInterval(batchSize);
465      if (sleepTicks > 0) {
466        if (LOG.isTraceEnabled()) {
467          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
468        }
469        Thread.sleep(sleepTicks);
470        // reset throttler's cycle start tick when sleep for throttling occurs
471        throttler.resetStartTick();
472      }
473    }
474  }
475
476  private void checkBandwidthChangeAndResetThrottler() {
477    long peerBandwidth = getCurrentBandwidth();
478    if (peerBandwidth != currentBandwidth) {
479      currentBandwidth = peerBandwidth;
480      throttler.setBandwidth((double) currentBandwidth / 10.0);
481      LOG.info("ReplicationSource : " + peerId + " bandwidth throttling changed, currentBandWidth="
482        + currentBandwidth);
483    }
484  }
485
486  private long getCurrentBandwidth() {
487    long peerBandwidth = replicationPeer.getPeerBandwidth();
488    // User can set peer bandwidth to 0 to use default bandwidth.
489    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
490  }
491
492  /**
493   * Do the sleeping logic
494   * @param msg             Why we sleep
495   * @param sleepMultiplier by how many times the default sleeping time is augmented
496   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
497   */
498  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
499    try {
500      if (LOG.isTraceEnabled()) {
501        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
502          sleepMultiplier);
503      }
504      Thread.sleep(this.sleepForRetries * sleepMultiplier);
505    } catch (InterruptedException e) {
506      if (LOG.isDebugEnabled()) {
507        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
508      }
509      Thread.currentThread().interrupt();
510    }
511    return sleepMultiplier < maxRetriesMultiplier;
512  }
513
514  /**
515   * check whether the peer is enabled or not
516   * @return true if the peer is enabled, otherwise false
517   */
518  @Override
519  public boolean isPeerEnabled() {
520    return replicationPeer.isPeerEnabled();
521  }
522
523  private void initialize() {
524    int sleepMultiplier = 1;
525    while (this.isSourceActive()) {
526      ReplicationEndpoint replicationEndpoint;
527      try {
528        replicationEndpoint = createReplicationEndpoint();
529      } catch (Exception e) {
530        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
531        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
532          sleepMultiplier++;
533        }
534        continue;
535      }
536
537      try {
538        initAndStartReplicationEndpoint(replicationEndpoint);
539        this.replicationEndpoint = replicationEndpoint;
540        break;
541      } catch (Exception e) {
542        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
543        replicationEndpoint.stop();
544        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
545          sleepMultiplier++;
546        } else {
547          retryStartup.set(!this.abortOnError);
548          setSourceStartupStatus(false);
549          throw new RuntimeException("Exhausted retries to start replication endpoint.");
550        }
551      }
552    }
553
554    if (!this.isSourceActive()) {
555      setSourceStartupStatus(false);
556      if (Thread.currentThread().isInterrupted()) {
557        // If source is not running and thread is interrupted this means someone has tried to
558        // remove this peer.
559        return;
560      }
561
562      retryStartup.set(!this.abortOnError);
563      throw new IllegalStateException("Source should be active.");
564    }
565
566    sleepMultiplier = 1;
567    UUID peerClusterId;
568    // delay this until we are in an asynchronous thread
569    for (;;) {
570      peerClusterId = replicationEndpoint.getPeerUUID();
571      if (this.isSourceActive() && peerClusterId == null) {
572        if (LOG.isDebugEnabled()) {
573          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
574            (this.sleepForRetries * sleepMultiplier));
575        }
576        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
577          sleepMultiplier++;
578        }
579      } else {
580        break;
581      }
582    }
583
584    if (!this.isSourceActive()) {
585      setSourceStartupStatus(false);
586      if (Thread.currentThread().isInterrupted()) {
587        // If source is not running and thread is interrupted this means someone has tried to
588        // remove this peer.
589        return;
590      }
591      retryStartup.set(!this.abortOnError);
592      throw new IllegalStateException("Source should be active.");
593    }
594    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(),
595      this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId, peerClusterId);
596    initializeWALEntryFilter(peerClusterId);
597    // Start workers
598    for (String walGroupId : logQueue.getQueues().keySet()) {
599      tryStartNewShipper(walGroupId);
600    }
601    setSourceStartupStatus(false);
602  }
603
604  private synchronized void setSourceStartupStatus(boolean initializing) {
605    startupOngoing.set(initializing);
606    if (initializing) {
607      metrics.incrSourceInitializing();
608    } else {
609      metrics.decrSourceInitializing();
610    }
611  }
612
613  @Override
614  public ReplicationSourceInterface startup() {
615    if (this.sourceRunning) {
616      return this;
617    }
618    this.sourceRunning = true;
619    setSourceStartupStatus(true);
620    initThread = new Thread(this::initialize);
621    Threads.setDaemonThreadRunning(initThread,
622      Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> {
623        // if first initialization attempt failed, and abortOnError is false, we will
624        // keep looping in this thread until initialize eventually succeeds,
625        // while the server main startup one can go on with its work.
626        sourceRunning = false;
627        uncaughtException(t, e, null, null);
628        retryStartup.set(!this.abortOnError);
629        do {
630          if (retryStartup.get()) {
631            this.sourceRunning = true;
632            setSourceStartupStatus(true);
633            retryStartup.set(false);
634            try {
635              initialize();
636            } catch (Throwable error) {
637              setSourceStartupStatus(false);
638              uncaughtException(t, error, null, null);
639              retryStartup.set(!this.abortOnError);
640            }
641          }
642        } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
643      });
644    return this;
645  }
646
647  @Override
648  public void terminate(String reason) {
649    terminate(reason, null);
650  }
651
652  @Override
653  public void terminate(String reason, Exception cause) {
654    terminate(reason, cause, true);
655  }
656
657  @Override
658  public void terminate(String reason, Exception cause, boolean clearMetrics) {
659    terminate(reason, cause, clearMetrics, true);
660  }
661
662  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
663    if (cause == null) {
664      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
665    } else {
666      LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(),
667        this.queueId, reason), cause);
668    }
669    this.sourceRunning = false;
670    if (initThread != null && Thread.currentThread() != initThread) {
671      // This usually won't happen but anyway, let's wait until the initialization thread exits.
672      // And notice that we may call terminate directly from the initThread so here we need to
673      // avoid join on ourselves.
674      initThread.interrupt();
675      Threads.shutdown(initThread, this.sleepForRetries);
676    }
677    Collection<ReplicationSourceShipper> workers = workerThreads.values();
678
679    for (ReplicationSourceShipper worker : workers) {
680      worker.stopWorker();
681      if (worker.entryReader != null) {
682        worker.entryReader.setReaderRunning(false);
683      }
684    }
685
686    if (this.replicationEndpoint != null) {
687      this.replicationEndpoint.stop();
688    }
689
690    for (ReplicationSourceShipper worker : workers) {
691      if (worker.isAlive() || worker.entryReader.isAlive()) {
692        try {
693          // Wait worker to stop
694          Thread.sleep(this.sleepForRetries);
695        } catch (InterruptedException e) {
696          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
697          Thread.currentThread().interrupt();
698        }
699        // If worker still is alive after waiting, interrupt it
700        if (worker.isAlive()) {
701          worker.interrupt();
702        }
703        // If entry reader is alive after waiting, interrupt it
704        if (worker.entryReader.isAlive()) {
705          worker.entryReader.interrupt();
706        }
707      }
708      if (!server.isAborted() && !server.isStopped()) {
709        // If server is running and worker is already stopped but there was still entries batched,
710        // we need to clear buffer used for non processed entries
711        worker.clearWALEntryBatch();
712      }
713    }
714
715    if (join) {
716      for (ReplicationSourceShipper worker : workers) {
717        Threads.shutdown(worker, this.sleepForRetries);
718        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
719      }
720      if (this.replicationEndpoint != null) {
721        try {
722          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
723            TimeUnit.MILLISECONDS);
724        } catch (TimeoutException te) {
725          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
726            + "for replication source : {}", logPeerId(), this.queueId, te);
727        }
728      }
729    }
730    if (clearMetrics) {
731      // Can be null in test context.
732      if (this.metrics != null) {
733        this.metrics.clear();
734      }
735    }
736  }
737
738  @Override
739  public String getQueueId() {
740    return this.queueId;
741  }
742
743  @Override
744  public String getPeerId() {
745    return this.peerId;
746  }
747
748  @Override
749  public Path getCurrentPath() {
750    // only for testing
751    for (ReplicationSourceShipper worker : workerThreads.values()) {
752      if (worker.getCurrentPath() != null) {
753        return worker.getCurrentPath();
754      }
755    }
756    return null;
757  }
758
759  @Override
760  public boolean isSourceActive() {
761    return !this.server.isStopped() && this.sourceRunning;
762  }
763
764  public ReplicationQueueInfo getReplicationQueueInfo() {
765    return replicationQueueInfo;
766  }
767
768  public boolean isWorkerRunning() {
769    for (ReplicationSourceShipper worker : this.workerThreads.values()) {
770      if (worker.isActive()) {
771        return worker.isActive();
772      }
773    }
774    return false;
775  }
776
777  @Override
778  public String getStats() {
779    StringBuilder sb = new StringBuilder();
780    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
781      .append(", current progress: \n");
782    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
783      String walGroupId = entry.getKey();
784      ReplicationSourceShipper worker = entry.getValue();
785      long position = worker.getCurrentPosition();
786      Path currentPath = worker.getCurrentPath();
787      sb.append("walGroup [").append(walGroupId).append("]: ");
788      if (currentPath != null) {
789        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
790          .append(position).append("\n");
791      } else {
792        sb.append("no replication ongoing, waiting for new log");
793      }
794    }
795    return sb.toString();
796  }
797
798  @Override
799  public MetricsSource getSourceMetrics() {
800    return this.metrics;
801  }
802
803  @Override
804  // offsets totalBufferUsed by deducting shipped batchSize.
805  public void postShipEdits(List<Entry> entries, int batchSize) {
806    if (throttler.isEnabled()) {
807      throttler.addPushSize(batchSize);
808    }
809    totalReplicatedEdits.addAndGet(entries.size());
810    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
811    // Record the new buffer usage
812    this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
813  }
814
815  @Override
816  public WALFileLengthProvider getWALFileLengthProvider() {
817    return walFileLengthProvider;
818  }
819
820  @Override
821  public ServerName getServerWALsBelongTo() {
822    return server.getServerName();
823  }
824
825  Server getServer() {
826    return server;
827  }
828
829  @Override
830  public ReplicationQueueStorage getReplicationQueueStorage() {
831    return queueStorage;
832  }
833
834  /** Returns String to use as a log prefix that contains current peerId. */
835  public String logPeerId() {
836    return "peerId=" + this.getPeerId() + ",";
837  }
838}