001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.UUID;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.PriorityBlockingQueue;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.function.Predicate;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableDescriptors;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
051import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
052import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
053import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
054import org.apache.hadoop.hbase.replication.ReplicationException;
055import org.apache.hadoop.hbase.replication.ReplicationPeer;
056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
058import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
059import org.apache.hadoop.hbase.replication.WALEntryFilter;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.OOMEChecker;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
071
072/**
073 * Class that handles the source of a replication stream. Currently does not handle more than 1
074 * slave cluster. For each slave cluster it selects a random number of peers using a replication
075 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
076 * be selected.
077 * <p>
078 * A stream is considered down when we cannot contact a region server on the peer cluster for more
079 * than 55 seconds by default.
080 * </p>
081 */
082@InterfaceAudience.Private
083public class ReplicationSource implements ReplicationSourceInterface {
084
085  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
086  // per group queue size, keep no more than this number of logs in each wal group
087  protected int queueSizePerGroup;
088  protected ReplicationSourceLogQueue logQueue;
089  protected ReplicationQueueStorage queueStorage;
090  protected ReplicationPeer replicationPeer;
091
092  protected Configuration conf;
093  protected ReplicationQueueInfo replicationQueueInfo;
094
095  // The manager of all sources to which we ping back our progress
096  protected ReplicationSourceManager manager;
097  // Should we stop everything?
098  protected Server server;
099  // How long should we sleep for each retry
100  private long sleepForRetries;
101  protected FileSystem fs;
102  // id of this cluster
103  private UUID clusterId;
104  // total number of edits we replicated
105  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
106  // The znode we currently play with
107  protected String queueId;
108  // Maximum number of retries before taking bold actions
109  private int maxRetriesMultiplier;
110  // Indicates if this particular source is running
111  volatile boolean sourceRunning = false;
112  // Metrics for this source
113  private MetricsSource metrics;
114  // ReplicationEndpoint which will handle the actual replication
115  private volatile ReplicationEndpoint replicationEndpoint;
116
117  private boolean abortOnError;
118  // This is needed for the startup loop to identify when there's already
119  // an initialization happening (but not finished yet),
120  // so that it doesn't try submit another initialize thread.
121  // NOTE: this should only be set to false at the end of initialize method, prior to return.
122  private AtomicBoolean startupOngoing = new AtomicBoolean(false);
123  // Flag that signalizes uncaught error happening while starting up the source
124  // and a retry should be attempted
125  private AtomicBoolean retryStartup = new AtomicBoolean(false);
126
127  /**
128   * A filter (or a chain of filters) for WAL entries; filters out edits.
129   */
130  protected volatile WALEntryFilter walEntryFilter;
131
132  // throttler
133  private ReplicationThrottler throttler;
134  private long defaultBandwidth;
135  private long currentBandwidth;
136  private WALFileLengthProvider walFileLengthProvider;
137  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
138    new ConcurrentHashMap<>();
139
140  private AtomicLong totalBufferUsed;
141
142  public static final String WAIT_ON_ENDPOINT_SECONDS =
143    "hbase.replication.wait.on.endpoint.seconds";
144  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
145  private int waitOnEndpointSeconds = -1;
146
147  private Thread initThread;
148
149  /**
150   * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to
151   * skip.
152   */
153  private final Predicate<Path> filterInWALs;
154
155  /**
156   * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we
157   * do not want replicated, passed on to replication endpoints. This is the basic set. Down in
158   * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are
159   * put after those that we pick up from the configured endpoints and other machinations to create
160   * the final {@link #walEntryFilter}.
161   * @see WALEntryFilter
162   */
163  private final List<WALEntryFilter> baseFilterOutWALEntries;
164
165  ReplicationSource() {
166    // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
167    this(p -> !AbstractFSWALProvider.isMetaFile(p),
168      Lists.newArrayList(new SystemTableWALEntryFilter()));
169  }
170
171  /**
172   * @param replicateWAL            Pass a filter to run against WAL Path; filter *in* WALs to
173   *                                Replicate; i.e. return 'true' if you want to replicate the
174   *                                content of the WAL.
175   * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
176   *                                WALEntries so they never make it out of this ReplicationSource.
177   */
178  ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
179    this.filterInWALs = replicateWAL;
180    this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
181  }
182
183  /**
184   * Instantiation method used by region servers
185   * @param conf      configuration to use
186   * @param fs        file system to use
187   * @param manager   replication manager to ping to
188   * @param server    the server for this region server
189   * @param queueId   the id of our replication queue
190   * @param clusterId unique UUID for the cluster
191   * @param metrics   metrics for replication source
192   */
193  @Override
194  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
195    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
196    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
197    MetricsSource metrics) throws IOException {
198    this.server = server;
199    this.conf = HBaseConfiguration.create(conf);
200    this.waitOnEndpointSeconds =
201      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
202    decorateConf();
203    // 1 second
204    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
205    // 5 minutes @ 1 sec per
206    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
207    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
208    this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
209    this.queueStorage = queueStorage;
210    this.replicationPeer = replicationPeer;
211    this.manager = manager;
212    this.fs = fs;
213    this.metrics = metrics;
214    this.clusterId = clusterId;
215
216    this.queueId = queueId;
217    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
218
219    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
220    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
221    currentBandwidth = getCurrentBandwidth();
222    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
223    this.totalBufferUsed = manager.getTotalBufferUsed();
224    this.walFileLengthProvider = walFileLengthProvider;
225
226    this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
227
228    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
229      replicationPeer.getId(), this.currentBandwidth);
230  }
231
232  private void decorateConf() {
233    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
234    if (StringUtils.isNotEmpty(replicationCodec)) {
235      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
236    }
237  }
238
239  @Override
240  public void enqueueLog(Path wal) {
241    if (!this.filterInWALs.test(wal)) {
242      LOG.trace("NOT replicating {}", wal);
243      return;
244    }
245    // Use WAL prefix as the WALGroupId for this peer.
246    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
247    boolean queueExists = logQueue.enqueueLog(wal, walPrefix);
248
249    if (!queueExists) {
250      if (this.isSourceActive() && this.walEntryFilter != null) {
251        // new wal group observed after source startup, start a new worker thread to track it
252        // notice: it's possible that wal enqueued when this.running is set but worker thread
253        // still not launched, so it's necessary to check workerThreads before start the worker
254        tryStartNewShipper(walPrefix);
255      }
256    }
257    if (LOG.isTraceEnabled()) {
258      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
259        this.replicationQueueInfo.getQueueId());
260    }
261  }
262
263  @InterfaceAudience.Private
264  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
265    return logQueue.getQueues();
266  }
267
268  @Override
269  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
270    throws ReplicationException {
271    String peerId = replicationPeer.getId();
272    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
273      this.queueStorage.addHFileRefs(peerId, pairs);
274      metrics.incrSizeOfHFileRefsQueue(pairs.size());
275    } else {
276      LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
277        tableName, Bytes.toString(family), peerId);
278    }
279  }
280
281  private ReplicationEndpoint createReplicationEndpoint()
282    throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
283    RegionServerCoprocessorHost rsServerHost = null;
284    if (server instanceof HRegionServer) {
285      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
286    }
287    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
288
289    ReplicationEndpoint replicationEndpoint;
290    if (replicationEndpointImpl == null) {
291      // Default to HBase inter-cluster replication endpoint; skip reflection
292      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
293    } else {
294      try {
295        replicationEndpoint = Class.forName(replicationEndpointImpl)
296          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
297      } catch (NoSuchMethodException | InvocationTargetException e) {
298        throw new IllegalArgumentException(e);
299      }
300    }
301    if (rsServerHost != null) {
302      ReplicationEndpoint newReplicationEndPoint =
303        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
304      if (newReplicationEndPoint != null) {
305        // Override the newly created endpoint from the hook with configured end point
306        replicationEndpoint = newReplicationEndPoint;
307      }
308    }
309    return replicationEndpoint;
310  }
311
312  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
313    throws IOException, TimeoutException {
314    TableDescriptors tableDescriptors = null;
315    if (server instanceof HRegionServer) {
316      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
317    }
318    replicationEndpoint
319      .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
320        replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
321    replicationEndpoint.start();
322    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
323  }
324
325  private void initializeWALEntryFilter(UUID peerClusterId) {
326    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
327    List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
328    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
329    if (filterFromEndpoint != null) {
330      filters.add(filterFromEndpoint);
331    }
332    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
333    this.walEntryFilter = new ChainWALEntryFilter(filters);
334  }
335
336  private void tryStartNewShipper(String walGroupId) {
337    workerThreads.compute(walGroupId, (key, value) -> {
338      if (value != null) {
339        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
340        return value;
341      } else {
342        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
343        ReplicationSourceShipper worker = createNewShipper(walGroupId);
344        ReplicationSourceWALReader walReader =
345          createNewWALReader(walGroupId, worker.getStartPosition());
346        Threads.setDaemonThreadRunning(
347          walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
348            + walGroupId + "," + queueId,
349          (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
350        worker.setWALReader(walReader);
351        worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
352        return worker;
353      }
354    });
355  }
356
357  @Override
358  public Map<String, ReplicationStatus> getWalGroupStatus() {
359    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
360    long ageOfLastShippedOp, replicationDelay, fileSize;
361    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
362      String walGroupId = walGroupShipper.getKey();
363      ReplicationSourceShipper shipper = walGroupShipper.getValue();
364      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
365      int queueSize = logQueue.getQueueSize(walGroupId);
366      replicationDelay = metrics.getReplicationDelay();
367      Path currentPath = shipper.getCurrentPath();
368      fileSize = -1;
369      if (currentPath != null) {
370        try {
371          fileSize = getFileSize(currentPath);
372        } catch (IOException e) {
373          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
374        }
375      } else {
376        currentPath = new Path("NO_LOGS_IN_QUEUE");
377        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
378      }
379      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
380      statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId)
381        .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition())
382        .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp)
383        .withReplicationDelay(replicationDelay);
384      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
385    }
386    return sourceReplicationStatus;
387  }
388
389  private long getFileSize(Path currentPath) throws IOException {
390    long fileSize;
391    try {
392      fileSize = fs.getContentSummary(currentPath).getLength();
393    } catch (FileNotFoundException e) {
394      Path archivedLogPath = findArchivedLog(currentPath, conf);
395      // archivedLogPath can be null if unable to locate in archiveDir.
396      if (archivedLogPath == null) {
397        throw new FileNotFoundException("Couldn't find path: " + currentPath);
398      }
399      fileSize = fs.getContentSummary(archivedLogPath).getLength();
400    }
401    return fileSize;
402  }
403
404  protected ReplicationSourceShipper createNewShipper(String walGroupId) {
405    return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
406  }
407
408  private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
409    return replicationPeer.getPeerConfig().isSerial()
410      ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
411        this, walGroupId)
412      : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this,
413        walGroupId);
414  }
415
416  /**
417   * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
418   * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
419   */
420  WALEntryFilter getWalEntryFilter() {
421    return walEntryFilter;
422  }
423
424  protected final void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager,
425    String peerId) {
426    OOMEChecker.exitIfOOME(e, getClass().getSimpleName());
427    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e);
428    if (abortOnError) {
429      server.abort("Unexpected exception in " + t.getName(), e);
430    }
431    if (manager != null) {
432      while (true) {
433        try {
434          LOG.info("Refreshing replication sources now due to previous error on thread: {}",
435            t.getName());
436          manager.refreshSources(peerId);
437          break;
438        } catch (IOException e1) {
439          LOG.error("Replication sources refresh failed.", e1);
440          sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
441        }
442      }
443    }
444  }
445
446  @Override
447  public ReplicationEndpoint getReplicationEndpoint() {
448    return this.replicationEndpoint;
449  }
450
451  @Override
452  public ReplicationSourceManager getSourceManager() {
453    return this.manager;
454  }
455
456  @Override
457  public void tryThrottle(int batchSize) throws InterruptedException {
458    checkBandwidthChangeAndResetThrottler();
459    if (throttler.isEnabled()) {
460      long sleepTicks = throttler.getNextSleepInterval(batchSize);
461      if (sleepTicks > 0) {
462        if (LOG.isTraceEnabled()) {
463          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
464        }
465        Thread.sleep(sleepTicks);
466        // reset throttler's cycle start tick when sleep for throttling occurs
467        throttler.resetStartTick();
468      }
469    }
470  }
471
472  private void checkBandwidthChangeAndResetThrottler() {
473    long peerBandwidth = getCurrentBandwidth();
474    if (peerBandwidth != currentBandwidth) {
475      currentBandwidth = peerBandwidth;
476      throttler.setBandwidth((double) currentBandwidth / 10.0);
477      LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}",
478        replicationPeer.getId(), currentBandwidth);
479    }
480  }
481
482  private long getCurrentBandwidth() {
483    long peerBandwidth = replicationPeer.getPeerBandwidth();
484    // User can set peer bandwidth to 0 to use default bandwidth.
485    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
486  }
487
488  /**
489   * Do the sleeping logic
490   * @param msg             Why we sleep
491   * @param sleepMultiplier by how many times the default sleeping time is augmented
492   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
493   */
494  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
495    try {
496      if (LOG.isTraceEnabled()) {
497        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
498          sleepMultiplier);
499      }
500      Thread.sleep(this.sleepForRetries * sleepMultiplier);
501    } catch (InterruptedException e) {
502      if (LOG.isDebugEnabled()) {
503        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
504      }
505      Thread.currentThread().interrupt();
506    }
507    return sleepMultiplier < maxRetriesMultiplier;
508  }
509
510  private void initialize() {
511    int sleepMultiplier = 1;
512    while (this.isSourceActive()) {
513      ReplicationEndpoint replicationEndpoint;
514      try {
515        replicationEndpoint = createReplicationEndpoint();
516      } catch (Exception e) {
517        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
518        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
519          sleepMultiplier++;
520        }
521        continue;
522      }
523
524      try {
525        initAndStartReplicationEndpoint(replicationEndpoint);
526        this.replicationEndpoint = replicationEndpoint;
527        break;
528      } catch (Exception e) {
529        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
530        replicationEndpoint.stop();
531        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
532          sleepMultiplier++;
533        } else {
534          retryStartup.set(!this.abortOnError);
535          setSourceStartupStatus(false);
536          throw new RuntimeException("Exhausted retries to start replication endpoint.");
537        }
538      }
539    }
540
541    if (!this.isSourceActive()) {
542      setSourceStartupStatus(false);
543      if (Thread.currentThread().isInterrupted()) {
544        // If source is not running and thread is interrupted this means someone has tried to
545        // remove this peer.
546        return;
547      }
548
549      retryStartup.set(!this.abortOnError);
550      throw new IllegalStateException("Source should be active.");
551    }
552
553    sleepMultiplier = 1;
554    UUID peerClusterId;
555    // delay this until we are in an asynchronous thread
556    for (;;) {
557      peerClusterId = replicationEndpoint.getPeerUUID();
558      if (this.isSourceActive() && peerClusterId == null) {
559        if (LOG.isDebugEnabled()) {
560          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
561            (this.sleepForRetries * sleepMultiplier));
562        }
563        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
564          sleepMultiplier++;
565        }
566      } else {
567        break;
568      }
569    }
570
571    if (!this.isSourceActive()) {
572      setSourceStartupStatus(false);
573      if (Thread.currentThread().isInterrupted()) {
574        // If source is not running and thread is interrupted this means someone has tried to
575        // remove this peer.
576        return;
577      }
578      retryStartup.set(!this.abortOnError);
579      throw new IllegalStateException("Source should be active.");
580    }
581    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(),
582      this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId, peerClusterId);
583    initializeWALEntryFilter(peerClusterId);
584    // Start workers
585    for (String walGroupId : logQueue.getQueues().keySet()) {
586      tryStartNewShipper(walGroupId);
587    }
588    setSourceStartupStatus(false);
589  }
590
591  private synchronized void setSourceStartupStatus(boolean initializing) {
592    startupOngoing.set(initializing);
593    if (initializing) {
594      metrics.incrSourceInitializing();
595    } else {
596      metrics.decrSourceInitializing();
597    }
598  }
599
600  @Override
601  public ReplicationSourceInterface startup() {
602    if (this.sourceRunning) {
603      return this;
604    }
605    this.sourceRunning = true;
606    setSourceStartupStatus(true);
607    initThread = new Thread(this::initialize);
608    Threads.setDaemonThreadRunning(initThread,
609      Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> {
610        // if first initialization attempt failed, and abortOnError is false, we will
611        // keep looping in this thread until initialize eventually succeeds,
612        // while the server main startup one can go on with its work.
613        sourceRunning = false;
614        uncaughtException(t, e, null, null);
615        retryStartup.set(!this.abortOnError);
616        do {
617          if (retryStartup.get()) {
618            this.sourceRunning = true;
619            setSourceStartupStatus(true);
620            retryStartup.set(false);
621            try {
622              initialize();
623            } catch (Throwable error) {
624              setSourceStartupStatus(false);
625              uncaughtException(t, error, null, null);
626              retryStartup.set(!this.abortOnError);
627            }
628          }
629        } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
630      });
631    return this;
632  }
633
634  @Override
635  public void terminate(String reason) {
636    terminate(reason, null);
637  }
638
639  @Override
640  public void terminate(String reason, Exception cause) {
641    terminate(reason, cause, true);
642  }
643
644  @Override
645  public void terminate(String reason, Exception cause, boolean clearMetrics) {
646    terminate(reason, cause, clearMetrics, true);
647  }
648
649  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
650    if (cause == null) {
651      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
652    } else {
653      LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(),
654        this.queueId, reason), cause);
655    }
656    this.sourceRunning = false;
657    if (initThread != null && Thread.currentThread() != initThread) {
658      // This usually won't happen but anyway, let's wait until the initialization thread exits.
659      // And notice that we may call terminate directly from the initThread so here we need to
660      // avoid join on ourselves.
661      initThread.interrupt();
662      Threads.shutdown(initThread, this.sleepForRetries);
663    }
664    Collection<ReplicationSourceShipper> workers = workerThreads.values();
665
666    for (ReplicationSourceShipper worker : workers) {
667      worker.stopWorker();
668      if (worker.entryReader != null) {
669        worker.entryReader.setReaderRunning(false);
670      }
671    }
672
673    if (this.replicationEndpoint != null) {
674      this.replicationEndpoint.stop();
675    }
676
677    for (ReplicationSourceShipper worker : workers) {
678      if (worker.isAlive() || worker.entryReader.isAlive()) {
679        try {
680          // Wait worker to stop
681          Thread.sleep(this.sleepForRetries);
682        } catch (InterruptedException e) {
683          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
684          Thread.currentThread().interrupt();
685        }
686        // If worker still is alive after waiting, interrupt it
687        if (worker.isAlive()) {
688          worker.interrupt();
689        }
690        // If entry reader is alive after waiting, interrupt it
691        if (worker.entryReader.isAlive()) {
692          worker.entryReader.interrupt();
693        }
694      }
695      if (!server.isAborted() && !server.isStopped()) {
696        // If server is running and worker is already stopped but there was still entries batched,
697        // we need to clear buffer used for non processed entries
698        worker.clearWALEntryBatch();
699      }
700    }
701
702    if (join) {
703      for (ReplicationSourceShipper worker : workers) {
704        Threads.shutdown(worker, this.sleepForRetries);
705        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
706      }
707      if (this.replicationEndpoint != null) {
708        try {
709          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
710            TimeUnit.MILLISECONDS);
711        } catch (TimeoutException te) {
712          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
713            + "for replication source : {}", logPeerId(), this.queueId, te);
714        }
715      }
716    }
717    if (clearMetrics) {
718      // Can be null in test context.
719      if (this.metrics != null) {
720        this.metrics.clear();
721      }
722    }
723  }
724
725  @Override
726  public String getQueueId() {
727    return this.queueId;
728  }
729
730  @Override
731  public Path getCurrentPath() {
732    // only for testing
733    for (ReplicationSourceShipper worker : workerThreads.values()) {
734      if (worker.getCurrentPath() != null) {
735        return worker.getCurrentPath();
736      }
737    }
738    return null;
739  }
740
741  @Override
742  public boolean isSourceActive() {
743    return !this.server.isStopped() && this.sourceRunning;
744  }
745
746  public ReplicationQueueInfo getReplicationQueueInfo() {
747    return replicationQueueInfo;
748  }
749
750  public boolean isWorkerRunning() {
751    for (ReplicationSourceShipper worker : this.workerThreads.values()) {
752      if (worker.isActive()) {
753        return worker.isActive();
754      }
755    }
756    return false;
757  }
758
759  @Override
760  public String getStats() {
761    StringBuilder sb = new StringBuilder();
762    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
763      .append(", current progress: \n");
764    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
765      String walGroupId = entry.getKey();
766      ReplicationSourceShipper worker = entry.getValue();
767      long position = worker.getCurrentPosition();
768      Path currentPath = worker.getCurrentPath();
769      sb.append("walGroup [").append(walGroupId).append("]: ");
770      if (currentPath != null) {
771        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
772          .append(position).append("\n");
773      } else {
774        sb.append("no replication ongoing, waiting for new log");
775      }
776    }
777    return sb.toString();
778  }
779
780  @Override
781  public MetricsSource getSourceMetrics() {
782    return this.metrics;
783  }
784
785  @Override
786  // offsets totalBufferUsed by deducting shipped batchSize.
787  public void postShipEdits(List<Entry> entries, int batchSize) {
788    if (throttler.isEnabled()) {
789      throttler.addPushSize(batchSize);
790    }
791    totalReplicatedEdits.addAndGet(entries.size());
792    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
793    // Record the new buffer usage
794    this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
795  }
796
797  @Override
798  public WALFileLengthProvider getWALFileLengthProvider() {
799    return walFileLengthProvider;
800  }
801
802  @Override
803  public ServerName getServerWALsBelongTo() {
804    return server.getServerName();
805  }
806
807  @Override
808  public ReplicationPeer getPeer() {
809    return replicationPeer;
810  }
811
812  Server getServer() {
813    return server;
814  }
815
816  @Override
817  public ReplicationQueueStorage getReplicationQueueStorage() {
818    return queueStorage;
819  }
820
821  void removeWorker(ReplicationSourceShipper worker) {
822    workerThreads.remove(worker.walGroupId, worker);
823  }
824
825  public String logPeerId() {
826    return "peerId=" + this.getPeerId() + ",";
827  }
828}