001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.reflect.InvocationTargetException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.PriorityBlockingQueue;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.AtomicLong;
039import java.util.function.Predicate;
040
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.Server;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableDescriptors;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.RSRpcServices;
053import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
054import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
055import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
056import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
057import org.apache.hadoop.hbase.replication.ReplicationException;
058import org.apache.hadoop.hbase.replication.ReplicationPeer;
059import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
061import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
062import org.apache.hadoop.hbase.replication.WALEntryFilter;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.Pair;
065import org.apache.hadoop.hbase.util.Threads;
066import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
067import org.apache.hadoop.hbase.wal.WAL.Entry;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
073
074/**
075 * Class that handles the source of a replication stream.
076 * Currently does not handle more than 1 slave cluster.
077 * For each slave cluster it selects a random number of peers
078 * using a replication ratio. For example, if replication ration = 0.1
079 * and slave cluster has 100 region servers, 10 will be selected.
080 * <p>
081 * A stream is considered down when we cannot contact a region server on the
082 * peer cluster for more than 55 seconds by default.
083 * </p>
084 */
085@InterfaceAudience.Private
086public class ReplicationSource implements ReplicationSourceInterface {
087
088  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
089  // Queues of logs to process, entry in format of walGroupId->queue,
090  // each presents a queue for one wal group
091  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
092  // per group queue size, keep no more than this number of logs in each wal group
093  protected int queueSizePerGroup;
094  protected ReplicationQueueStorage queueStorage;
095  protected ReplicationPeer replicationPeer;
096
097  protected Configuration conf;
098  protected ReplicationQueueInfo replicationQueueInfo;
099  // id of the peer cluster this source replicates to
100  private String peerId;
101
102  // The manager of all sources to which we ping back our progress
103  protected ReplicationSourceManager manager;
104  // Should we stop everything?
105  protected Server server;
106  // How long should we sleep for each retry
107  private long sleepForRetries;
108  protected FileSystem fs;
109  // id of this cluster
110  private UUID clusterId;
111  // total number of edits we replicated
112  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
113  // The znode we currently play with
114  protected String queueId;
115  // Maximum number of retries before taking bold actions
116  private int maxRetriesMultiplier;
117  // Indicates if this particular source is running
118  private volatile boolean sourceRunning = false;
119  // Metrics for this source
120  private MetricsSource metrics;
121  // WARN threshold for the number of queued logs, defaults to 2
122  private int logQueueWarnThreshold;
123  // ReplicationEndpoint which will handle the actual replication
124  private volatile ReplicationEndpoint replicationEndpoint;
125
126  private boolean abortOnError;
127  //This is needed for the startup loop to identify when there's already
128  //an initialization happening (but not finished yet),
129  //so that it doesn't try submit another initialize thread.
130  //NOTE: this should only be set to false at the end of initialize method, prior to return.
131  private AtomicBoolean startupOngoing = new AtomicBoolean(false);
132  //Flag that signalizes uncaught error happening while starting up the source
133  //and a retry should be attempted
134  private AtomicBoolean retryStartup = new AtomicBoolean(false);
135
136  /**
137   * A filter (or a chain of filters) for WAL entries; filters out edits.
138   */
139  protected volatile WALEntryFilter walEntryFilter;
140
141  // throttler
142  private ReplicationThrottler throttler;
143  private long defaultBandwidth;
144  private long currentBandwidth;
145  private WALFileLengthProvider walFileLengthProvider;
146  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
147      new ConcurrentHashMap<>();
148
149  private AtomicLong totalBufferUsed;
150
151  public static final String WAIT_ON_ENDPOINT_SECONDS =
152    "hbase.replication.wait.on.endpoint.seconds";
153  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
154  private int waitOnEndpointSeconds = -1;
155
156  private Thread initThread;
157
158  /**
159   * WALs to replicate.
160   * Predicate that returns 'true' for WALs to replicate and false for WALs to skip.
161   */
162  private final Predicate<Path> filterInWALs;
163
164  /**
165   * Base WALEntry filters for this class. Unmodifiable. Set on construction.
166   * Filters *out* edits we do not want replicated, passed on to replication endpoints.
167   * This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of
168   * the WALEntry filter chain. These are put after those that we pick up from the configured
169   * endpoints and other machinations to create the final {@link #walEntryFilter}.
170   * @see WALEntryFilter
171   */
172  private final List<WALEntryFilter> baseFilterOutWALEntries;
173
174  ReplicationSource() {
175    // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
176    this(p -> !AbstractFSWALProvider.isMetaFile(p),
177      Lists.newArrayList(new SystemTableWALEntryFilter()));
178  }
179
180  /**
181   * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to Replicate;
182   *   i.e. return 'true' if you want to replicate the content of the WAL.
183   * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
184   *   WALEntries so they never make it out of this ReplicationSource.
185   */
186  ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
187    this.filterInWALs = replicateWAL;
188    this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
189  }
190
191  /**
192   * Instantiation method used by region servers
193   * @param conf configuration to use
194   * @param fs file system to use
195   * @param manager replication manager to ping to
196   * @param server the server for this region server
197   * @param queueId the id of our replication queue
198   * @param clusterId unique UUID for the cluster
199   * @param metrics metrics for replication source
200   */
201  @Override
202  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
203      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
204      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
205      MetricsSource metrics) throws IOException {
206    this.server = server;
207    this.conf = HBaseConfiguration.create(conf);
208    this.waitOnEndpointSeconds =
209      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
210    decorateConf();
211    this.sleepForRetries =
212        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
213    this.maxRetriesMultiplier =
214        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
215    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
216    this.queueStorage = queueStorage;
217    this.replicationPeer = replicationPeer;
218    this.manager = manager;
219    this.fs = fs;
220    this.metrics = metrics;
221    this.clusterId = clusterId;
222
223    this.queueId = queueId;
224    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
225    // ReplicationQueueInfo parses the peerId out of the znode for us
226    this.peerId = this.replicationQueueInfo.getPeerId();
227    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
228
229    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
230    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
231    currentBandwidth = getCurrentBandwidth();
232    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
233    this.totalBufferUsed = manager.getTotalBufferUsed();
234    this.walFileLengthProvider = walFileLengthProvider;
235
236    this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
237      true);
238
239    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
240      replicationPeer.getId(), this.currentBandwidth);
241  }
242
243  private void decorateConf() {
244    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
245    if (StringUtils.isNotEmpty(replicationCodec)) {
246      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
247    }
248  }
249
250  @Override
251  public void enqueueLog(Path wal) {
252    if (!this.filterInWALs.test(wal)) {
253      LOG.trace("NOT replicating {}", wal);
254      return;
255    }
256    // Use WAL prefix as the WALGroupId for this peer.
257    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
258    PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
259    if (queue == null) {
260      queue = new PriorityBlockingQueue<>(queueSizePerGroup,
261        new AbstractFSWALProvider.WALStartTimeComparator());
262      // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
263      // the shipper may quit immediately
264      queue.put(wal);
265      queues.put(walPrefix, queue);
266      if (this.isSourceActive() && this.walEntryFilter != null) {
267        // new wal group observed after source startup, start a new worker thread to track it
268        // notice: it's possible that wal enqueued when this.running is set but worker thread
269        // still not launched, so it's necessary to check workerThreads before start the worker
270        tryStartNewShipper(walPrefix, queue);
271      }
272    } else {
273      queue.put(wal);
274    }
275    if (LOG.isTraceEnabled()) {
276      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
277        this.replicationQueueInfo.getQueueId());
278    }
279    this.metrics.incrSizeOfLogQueue();
280    // This will wal a warning for each new wal that gets created above the warn threshold
281    int queueSize = queue.size();
282    if (queueSize > this.logQueueWarnThreshold) {
283      LOG.warn("{} WAL group {} queue size: {} exceeds value of "
284          + "replication.source.log.queue.warn: {}", logPeerId(),
285        walPrefix, queueSize, logQueueWarnThreshold);
286    }
287  }
288
289  @Override
290  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
291      throws ReplicationException {
292    String peerId = replicationPeer.getId();
293    Set<String> namespaces = replicationPeer.getNamespaces();
294    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
295    if (tableCFMap != null) { // All peers with TableCFs
296      List<String> tableCfs = tableCFMap.get(tableName);
297      if (tableCFMap.containsKey(tableName)
298          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
299        this.queueStorage.addHFileRefs(peerId, pairs);
300        metrics.incrSizeOfHFileRefsQueue(pairs.size());
301      } else {
302        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
303            tableName, Bytes.toString(family), peerId);
304      }
305    } else if (namespaces != null) { // Only for set NAMESPACES peers
306      if (namespaces.contains(tableName.getNamespaceAsString())) {
307        this.queueStorage.addHFileRefs(peerId, pairs);
308        metrics.incrSizeOfHFileRefsQueue(pairs.size());
309      } else {
310        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
311            tableName, Bytes.toString(family), peerId);
312      }
313    } else {
314      // user has explicitly not defined any table cfs for replication, means replicate all the
315      // data
316      this.queueStorage.addHFileRefs(peerId, pairs);
317      metrics.incrSizeOfHFileRefsQueue(pairs.size());
318    }
319  }
320
321  private ReplicationEndpoint createReplicationEndpoint()
322      throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
323    RegionServerCoprocessorHost rsServerHost = null;
324    if (server instanceof HRegionServer) {
325      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
326    }
327    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
328
329    ReplicationEndpoint replicationEndpoint;
330    if (replicationEndpointImpl == null) {
331      // Default to HBase inter-cluster replication endpoint; skip reflection
332      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
333    } else {
334      try {
335        replicationEndpoint = Class.forName(replicationEndpointImpl)
336            .asSubclass(ReplicationEndpoint.class)
337            .getDeclaredConstructor()
338            .newInstance();
339      } catch (NoSuchMethodException | InvocationTargetException e) {
340        throw new IllegalArgumentException(e);
341      }
342    }
343    if (rsServerHost != null) {
344      ReplicationEndpoint newReplicationEndPoint =
345        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
346      if (newReplicationEndPoint != null) {
347        // Override the newly created endpoint from the hook with configured end point
348        replicationEndpoint = newReplicationEndPoint;
349      }
350    }
351    return replicationEndpoint;
352  }
353
354  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
355      throws IOException, TimeoutException {
356    TableDescriptors tableDescriptors = null;
357    if (server instanceof HRegionServer) {
358      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
359    }
360    replicationEndpoint
361        .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
362            clusterId, replicationPeer, metrics, tableDescriptors, server));
363    replicationEndpoint.start();
364    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
365  }
366
367  private void initializeWALEntryFilter(UUID peerClusterId) {
368    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
369    List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
370    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
371    if (filterFromEndpoint != null) {
372      filters.add(filterFromEndpoint);
373    }
374    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
375    this.walEntryFilter = new ChainWALEntryFilter(filters);
376  }
377
378  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
379    workerThreads.compute(walGroupId, (key, value) -> {
380      if (value != null) {
381        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
382        return value;
383      } else {
384        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
385        ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
386        ReplicationSourceWALReader walReader =
387            createNewWALReader(walGroupId, queue, worker.getStartPosition());
388        Threads.setDaemonThreadRunning(
389            walReader, Thread.currentThread().getName()
390            + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
391          (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
392        worker.setWALReader(walReader);
393        worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
394        return worker;
395      }
396    });
397  }
398
399  @Override
400  public Map<String, ReplicationStatus> getWalGroupStatus() {
401    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
402    long ageOfLastShippedOp, replicationDelay, fileSize;
403    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
404      String walGroupId = walGroupShipper.getKey();
405      ReplicationSourceShipper shipper = walGroupShipper.getValue();
406      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
407      int queueSize = queues.get(walGroupId).size();
408      replicationDelay = metrics.getReplicationDelay();
409      Path currentPath = shipper.getCurrentPath();
410      fileSize = -1;
411      if (currentPath != null) {
412        try {
413          fileSize = getFileSize(currentPath);
414        } catch (IOException e) {
415          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
416        }
417      } else {
418        currentPath = new Path("NO_LOGS_IN_QUEUE");
419        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
420      }
421      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
422      statusBuilder.withPeerId(this.getPeerId())
423          .withQueueSize(queueSize)
424          .withWalGroup(walGroupId)
425          .withCurrentPath(currentPath)
426          .withCurrentPosition(shipper.getCurrentPosition())
427          .withFileSize(fileSize)
428          .withAgeOfLastShippedOp(ageOfLastShippedOp)
429          .withReplicationDelay(replicationDelay);
430      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
431    }
432    return sourceReplicationStatus;
433  }
434
435  private long getFileSize(Path currentPath) throws IOException {
436    long fileSize;
437    try {
438      fileSize = fs.getContentSummary(currentPath).getLength();
439    } catch (FileNotFoundException e) {
440      currentPath = getArchivedLogPath(currentPath, conf);
441      fileSize = fs.getContentSummary(currentPath).getLength();
442    }
443    return fileSize;
444  }
445
446  protected ReplicationSourceShipper createNewShipper(String walGroupId,
447      PriorityBlockingQueue<Path> queue) {
448    return new ReplicationSourceShipper(conf, walGroupId, queue, this);
449  }
450
451  private ReplicationSourceWALReader createNewWALReader(String walGroupId,
452      PriorityBlockingQueue<Path> queue, long startPosition) {
453    return replicationPeer.getPeerConfig().isSerial()
454      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
455      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
456  }
457
458  /**
459   * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
460   * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
461   */
462  WALEntryFilter getWalEntryFilter() {
463    return walEntryFilter;
464  }
465
466  protected final void uncaughtException(Thread t, Throwable e,
467      ReplicationSourceManager manager, String peerId) {
468    RSRpcServices.exitIfOOME(e);
469    LOG.error("Unexpected exception in {} currentPath={}",
470      t.getName(), getCurrentPath(), e);
471    if(abortOnError){
472      server.abort("Unexpected exception in " + t.getName(), e);
473    }
474    if(manager != null){
475      while (true) {
476        try {
477          LOG.info("Refreshing replication sources now due to previous error on thread: {}",
478            t.getName());
479          manager.refreshSources(peerId);
480          break;
481        } catch (IOException e1) {
482          LOG.error("Replication sources refresh failed.", e1);
483          sleepForRetries("Sleeping before try refreshing sources again",
484            maxRetriesMultiplier);
485        }
486      }
487    }
488  }
489
490  @Override
491  public ReplicationEndpoint getReplicationEndpoint() {
492    return this.replicationEndpoint;
493  }
494
495  @Override
496  public ReplicationSourceManager getSourceManager() {
497    return this.manager;
498  }
499
500  @Override
501  public void tryThrottle(int batchSize) throws InterruptedException {
502    checkBandwidthChangeAndResetThrottler();
503    if (throttler.isEnabled()) {
504      long sleepTicks = throttler.getNextSleepInterval(batchSize);
505      if (sleepTicks > 0) {
506        if (LOG.isTraceEnabled()) {
507          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
508        }
509        Thread.sleep(sleepTicks);
510        // reset throttler's cycle start tick when sleep for throttling occurs
511        throttler.resetStartTick();
512      }
513    }
514  }
515
516  private void checkBandwidthChangeAndResetThrottler() {
517    long peerBandwidth = getCurrentBandwidth();
518    if (peerBandwidth != currentBandwidth) {
519      currentBandwidth = peerBandwidth;
520      throttler.setBandwidth((double) currentBandwidth / 10.0);
521      LOG.info("ReplicationSource : " + peerId
522          + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
523    }
524  }
525
526  private long getCurrentBandwidth() {
527    long peerBandwidth = replicationPeer.getPeerBandwidth();
528    // User can set peer bandwidth to 0 to use default bandwidth.
529    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
530  }
531
532  /**
533   * Do the sleeping logic
534   * @param msg Why we sleep
535   * @param sleepMultiplier by how many times the default sleeping time is augmented
536   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
537   */
538  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
539    try {
540      if (LOG.isTraceEnabled()) {
541        LOG.trace("{} {}, sleeping {} times {}",
542          logPeerId(), msg, sleepForRetries, sleepMultiplier);
543      }
544      Thread.sleep(this.sleepForRetries * sleepMultiplier);
545    } catch (InterruptedException e) {
546      if(LOG.isDebugEnabled()) {
547        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
548      }
549      Thread.currentThread().interrupt();
550    }
551    return sleepMultiplier < maxRetriesMultiplier;
552  }
553
554  /**
555   * check whether the peer is enabled or not
556   * @return true if the peer is enabled, otherwise false
557   */
558  @Override
559  public boolean isPeerEnabled() {
560    return replicationPeer.isPeerEnabled();
561  }
562
563  private void initialize() {
564    int sleepMultiplier = 1;
565    while (this.isSourceActive()) {
566      ReplicationEndpoint replicationEndpoint;
567      try {
568        replicationEndpoint = createReplicationEndpoint();
569      } catch (Exception e) {
570        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
571        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
572          sleepMultiplier++;
573        }
574        continue;
575      }
576
577      try {
578        initAndStartReplicationEndpoint(replicationEndpoint);
579        this.replicationEndpoint = replicationEndpoint;
580        break;
581      } catch (Exception e) {
582        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
583        replicationEndpoint.stop();
584        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
585          sleepMultiplier++;
586        } else {
587          retryStartup.set(!this.abortOnError);
588          this.startupOngoing.set(false);
589          throw new RuntimeException("Exhausted retries to start replication endpoint.");
590        }
591      }
592    }
593
594    if (!this.isSourceActive()) {
595      retryStartup.set(!this.abortOnError);
596      this.startupOngoing.set(false);
597      throw new IllegalStateException("Source should be active.");
598    }
599
600    sleepMultiplier = 1;
601    UUID peerClusterId;
602    // delay this until we are in an asynchronous thread
603    for (;;) {
604      peerClusterId = replicationEndpoint.getPeerUUID();
605      if (this.isSourceActive() && peerClusterId == null) {
606        if(LOG.isDebugEnabled()) {
607          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
608            (this.sleepForRetries * sleepMultiplier));
609        }
610        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
611          sleepMultiplier++;
612        }
613      } else {
614        break;
615      }
616    }
617
618    if(!this.isSourceActive()) {
619      retryStartup.set(!this.abortOnError);
620      this.startupOngoing.set(false);
621      throw new IllegalStateException("Source should be active.");
622    }
623    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
624      logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
625      peerClusterId);
626    initializeWALEntryFilter(peerClusterId);
627    // Start workers
628    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
629      String walGroupId = entry.getKey();
630      PriorityBlockingQueue<Path> queue = entry.getValue();
631      tryStartNewShipper(walGroupId, queue);
632    }
633    this.startupOngoing.set(false);
634  }
635
636  @Override
637  public ReplicationSourceInterface startup() {
638    if (this.sourceRunning) {
639      return this;
640    }
641    this.sourceRunning = true;
642    startupOngoing.set(true);
643    initThread = new Thread(this::initialize);
644    Threads.setDaemonThreadRunning(initThread,
645      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
646      (t,e) -> {
647        //if first initialization attempt failed, and abortOnError is false, we will
648        //keep looping in this thread until initialize eventually succeeds,
649        //while the server main startup one can go on with its work.
650        sourceRunning = false;
651        uncaughtException(t, e, null, null);
652        retryStartup.set(!this.abortOnError);
653        do {
654          if(retryStartup.get()) {
655            this.sourceRunning = true;
656            startupOngoing.set(true);
657            retryStartup.set(false);
658            try {
659              initialize();
660            } catch(Throwable error){
661              sourceRunning = false;
662              uncaughtException(t, error, null, null);
663              retryStartup.set(!this.abortOnError);
664            }
665          }
666        } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
667      });
668    return this;
669  }
670
671  @Override
672  public void terminate(String reason) {
673    terminate(reason, null);
674  }
675
676  @Override
677  public void terminate(String reason, Exception cause) {
678    terminate(reason, cause, true);
679  }
680
681  @Override
682  public void terminate(String reason, Exception cause, boolean clearMetrics) {
683    terminate(reason, cause, clearMetrics, true);
684  }
685
686  public void terminate(String reason, Exception cause, boolean clearMetrics,
687      boolean join) {
688    if (cause == null) {
689      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
690    } else {
691      LOG.error("{} Closing source {} because an error occurred: {}",
692        logPeerId(), this.queueId, reason, cause);
693    }
694    this.sourceRunning = false;
695    if (initThread != null && Thread.currentThread() != initThread) {
696      // This usually won't happen but anyway, let's wait until the initialization thread exits.
697      // And notice that we may call terminate directly from the initThread so here we need to
698      // avoid join on ourselves.
699      initThread.interrupt();
700      Threads.shutdown(initThread, this.sleepForRetries);
701    }
702    Collection<ReplicationSourceShipper> workers = workerThreads.values();
703    for (ReplicationSourceShipper worker : workers) {
704      worker.stopWorker();
705      if(worker.entryReader != null) {
706        worker.entryReader.setReaderRunning(false);
707      }
708    }
709
710    if (this.replicationEndpoint != null) {
711      this.replicationEndpoint.stop();
712    }
713    for (ReplicationSourceShipper worker : workers) {
714      if (worker.isAlive() || worker.entryReader.isAlive()) {
715        try {
716          // Wait worker to stop
717          Thread.sleep(this.sleepForRetries);
718        } catch (InterruptedException e) {
719          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
720          Thread.currentThread().interrupt();
721        }
722        // If worker still is alive after waiting, interrupt it
723        if (worker.isAlive()) {
724          worker.interrupt();
725        }
726        // If entry reader is alive after waiting, interrupt it
727        if (worker.entryReader.isAlive()) {
728          worker.entryReader.interrupt();
729        }
730      }
731    }
732
733    if (join) {
734      for (ReplicationSourceShipper worker : workers) {
735        Threads.shutdown(worker, this.sleepForRetries);
736        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
737      }
738      if (this.replicationEndpoint != null) {
739        try {
740          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
741            TimeUnit.MILLISECONDS);
742        } catch (TimeoutException te) {
743          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
744            + "for replication source : {}", logPeerId(), this.queueId, te);
745        }
746      }
747    }
748    if (clearMetrics) {
749      // Can be null in test context.
750      if (this.metrics != null) {
751        this.metrics.clear();
752      }
753    }
754  }
755
756  @Override
757  public String getQueueId() {
758    return this.queueId;
759  }
760
761  @Override
762  public String getPeerId() {
763    return this.peerId;
764  }
765
766  @Override
767  public Path getCurrentPath() {
768    // only for testing
769    for (ReplicationSourceShipper worker : workerThreads.values()) {
770      if (worker.getCurrentPath() != null) {
771        return worker.getCurrentPath();
772      }
773    }
774    return null;
775  }
776
777  @Override
778  public boolean isSourceActive() {
779    return !this.server.isStopped() && this.sourceRunning;
780  }
781
782  public ReplicationQueueInfo getReplicationQueueInfo() {
783    return replicationQueueInfo;
784  }
785
786  public boolean isWorkerRunning(){
787    for(ReplicationSourceShipper worker : this.workerThreads.values()){
788      if(worker.isActive()){
789        return worker.isActive();
790      }
791    }
792    return false;
793  }
794
795  @Override
796  public String getStats() {
797    StringBuilder sb = new StringBuilder();
798    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
799        .append(", current progress: \n");
800    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
801      String walGroupId = entry.getKey();
802      ReplicationSourceShipper worker = entry.getValue();
803      long position = worker.getCurrentPosition();
804      Path currentPath = worker.getCurrentPath();
805      sb.append("walGroup [").append(walGroupId).append("]: ");
806      if (currentPath != null) {
807        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
808            .append(position).append("\n");
809      } else {
810        sb.append("no replication ongoing, waiting for new log");
811      }
812    }
813    return sb.toString();
814  }
815
816  @Override
817  public MetricsSource getSourceMetrics() {
818    return this.metrics;
819  }
820
821  @Override
822  //offsets totalBufferUsed by deducting shipped batchSize.
823  public void postShipEdits(List<Entry> entries, int batchSize) {
824    if (throttler.isEnabled()) {
825      throttler.addPushSize(batchSize);
826    }
827    totalReplicatedEdits.addAndGet(entries.size());
828    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
829    // Record the new buffer usage
830    this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
831  }
832
833  @Override
834  public WALFileLengthProvider getWALFileLengthProvider() {
835    return walFileLengthProvider;
836  }
837
838  @Override
839  public ServerName getServerWALsBelongTo() {
840    return server.getServerName();
841  }
842
843  Server getServer() {
844    return server;
845  }
846
847  @Override
848  public ReplicationQueueStorage getReplicationQueueStorage() {
849    return queueStorage;
850  }
851
852  /**
853   * @return String to use as a log prefix that contains current peerId.
854   */
855  private String logPeerId(){
856    return "peerId=" + this.getPeerId() + ",";
857  }
858}