001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.UUID;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.PriorityBlockingQueue;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicLong;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableDescriptors;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.regionserver.RSRpcServices;
051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
054import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationException;
056import org.apache.hadoop.hbase.replication.ReplicationPeer;
057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
071
072/**
073 * Class that handles the source of a replication stream.
074 * Currently does not handle more than 1 slave
075 * For each slave cluster it selects a random number of peers
076 * using a replication ratio. For example, if replication ration = 0.1
077 * and slave cluster has 100 region servers, 10 will be selected.
078 * <p>
079 * A stream is considered down when we cannot contact a region server on the
080 * peer cluster for more than 55 seconds by default.
081 * </p>
082 */
083@InterfaceAudience.Private
084public class ReplicationSource implements ReplicationSourceInterface {
085
086  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
087  // Queues of logs to process, entry in format of walGroupId->queue,
088  // each presents a queue for one wal group
089  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
090  // per group queue size, keep no more than this number of logs in each wal group
091  protected int queueSizePerGroup;
092  protected ReplicationQueueStorage queueStorage;
093  protected ReplicationPeer replicationPeer;
094
095  protected Configuration conf;
096  protected ReplicationQueueInfo replicationQueueInfo;
097  // id of the peer cluster this source replicates to
098  private String peerId;
099
100  // The manager of all sources to which we ping back our progress
101  protected ReplicationSourceManager manager;
102  // Should we stop everything?
103  protected Server server;
104  // How long should we sleep for each retry
105  private long sleepForRetries;
106  protected FileSystem fs;
107  // id of this cluster
108  private UUID clusterId;
109  // total number of edits we replicated
110  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
111  // The znode we currently play with
112  protected String queueId;
113  // Maximum number of retries before taking bold actions
114  private int maxRetriesMultiplier;
115  // Indicates if this particular source is running
116  private volatile boolean sourceRunning = false;
117  // Metrics for this source
118  private MetricsSource metrics;
119  // WARN threshold for the number of queued logs, defaults to 2
120  private int logQueueWarnThreshold;
121  // ReplicationEndpoint which will handle the actual replication
122  private volatile ReplicationEndpoint replicationEndpoint;
123  // A filter (or a chain of filters) for the WAL entries.
124  protected volatile WALEntryFilter walEntryFilter;
125  // throttler
126  private ReplicationThrottler throttler;
127  private long defaultBandwidth;
128  private long currentBandwidth;
129  private WALFileLengthProvider walFileLengthProvider;
130  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
131      new ConcurrentHashMap<>();
132
133  private AtomicLong totalBufferUsed;
134
135  public static final String WAIT_ON_ENDPOINT_SECONDS =
136    "hbase.replication.wait.on.endpoint.seconds";
137  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
138  private int waitOnEndpointSeconds = -1;
139
140  private Thread initThread;
141
142  /**
143   * Instantiation method used by region servers
144   * @param conf configuration to use
145   * @param fs file system to use
146   * @param manager replication manager to ping to
147   * @param server the server for this region server
148   * @param queueId the id of our replication queue
149   * @param clusterId unique UUID for the cluster
150   * @param metrics metrics for replication source
151   */
152  @Override
153  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
154      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
155      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
156      MetricsSource metrics) throws IOException {
157    this.server = server;
158    this.conf = HBaseConfiguration.create(conf);
159    this.waitOnEndpointSeconds =
160      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
161    decorateConf();
162    this.sleepForRetries =
163        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
164    this.maxRetriesMultiplier =
165        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
166    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
167    this.queueStorage = queueStorage;
168    this.replicationPeer = replicationPeer;
169    this.manager = manager;
170    this.fs = fs;
171    this.metrics = metrics;
172    this.clusterId = clusterId;
173
174    this.queueId = queueId;
175    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
176    // ReplicationQueueInfo parses the peerId out of the znode for us
177    this.peerId = this.replicationQueueInfo.getPeerId();
178    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
179
180    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
181    currentBandwidth = getCurrentBandwidth();
182    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
183    this.totalBufferUsed = manager.getTotalBufferUsed();
184    this.walFileLengthProvider = walFileLengthProvider;
185    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
186      replicationPeer.getId(), this.currentBandwidth);
187  }
188
189  private void decorateConf() {
190    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
191    if (StringUtils.isNotEmpty(replicationCodec)) {
192      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
193    }
194  }
195
196  @Override
197  public void enqueueLog(Path log) {
198    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
199    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
200    if (queue == null) {
201      queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
202      queues.put(logPrefix, queue);
203      if (this.isSourceActive() && this.walEntryFilter != null) {
204        // new wal group observed after source startup, start a new worker thread to track it
205        // notice: it's possible that log enqueued when this.running is set but worker thread
206        // still not launched, so it's necessary to check workerThreads before start the worker
207        tryStartNewShipper(logPrefix, queue);
208      }
209    }
210    queue.put(log);
211    if (LOG.isTraceEnabled()) {
212      LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
213        this.replicationQueueInfo.getQueueId());
214    }
215    this.metrics.incrSizeOfLogQueue();
216    // This will log a warning for each new log that gets created above the warn threshold
217    int queueSize = queue.size();
218    if (queueSize > this.logQueueWarnThreshold) {
219      LOG.warn("{} WAL group {} queue size: {} exceeds value of "
220          + "replication.source.log.queue.warn: {}", logPeerId(),
221        logPrefix, queueSize, logQueueWarnThreshold);
222    }
223  }
224
225  @Override
226  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
227      throws ReplicationException {
228    String peerId = replicationPeer.getId();
229    Set<String> namespaces = replicationPeer.getNamespaces();
230    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
231    if (tableCFMap != null) { // All peers with TableCFs
232      List<String> tableCfs = tableCFMap.get(tableName);
233      if (tableCFMap.containsKey(tableName)
234          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
235        this.queueStorage.addHFileRefs(peerId, pairs);
236        metrics.incrSizeOfHFileRefsQueue(pairs.size());
237      } else {
238        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
239            tableName, Bytes.toString(family), peerId);
240      }
241    } else if (namespaces != null) { // Only for set NAMESPACES peers
242      if (namespaces.contains(tableName.getNamespaceAsString())) {
243        this.queueStorage.addHFileRefs(peerId, pairs);
244        metrics.incrSizeOfHFileRefsQueue(pairs.size());
245      } else {
246        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
247            tableName, Bytes.toString(family), peerId);
248      }
249    } else {
250      // user has explicitly not defined any table cfs for replication, means replicate all the
251      // data
252      this.queueStorage.addHFileRefs(peerId, pairs);
253      metrics.incrSizeOfHFileRefsQueue(pairs.size());
254    }
255  }
256
257  private ReplicationEndpoint createReplicationEndpoint()
258      throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
259    RegionServerCoprocessorHost rsServerHost = null;
260    if (server instanceof HRegionServer) {
261      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
262    }
263    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
264
265    ReplicationEndpoint replicationEndpoint;
266    if (replicationEndpointImpl == null) {
267      // Default to HBase inter-cluster replication endpoint; skip reflection
268      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
269    } else {
270      try {
271        replicationEndpoint = Class.forName(replicationEndpointImpl)
272            .asSubclass(ReplicationEndpoint.class)
273            .getDeclaredConstructor()
274            .newInstance();
275      } catch (NoSuchMethodException | InvocationTargetException e) {
276        throw new IllegalArgumentException(e);
277      }
278    }
279    if (rsServerHost != null) {
280      ReplicationEndpoint newReplicationEndPoint =
281        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
282      if (newReplicationEndPoint != null) {
283        // Override the newly created endpoint from the hook with configured end point
284        replicationEndpoint = newReplicationEndPoint;
285      }
286    }
287    return replicationEndpoint;
288  }
289
290  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
291      throws IOException, TimeoutException {
292    TableDescriptors tableDescriptors = null;
293    if (server instanceof HRegionServer) {
294      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
295    }
296    replicationEndpoint
297        .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
298            clusterId, replicationPeer, metrics, tableDescriptors, server));
299    replicationEndpoint.start();
300    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
301  }
302
303  private void initializeWALEntryFilter(UUID peerClusterId) {
304    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
305    ArrayList<WALEntryFilter> filters =
306      Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
307    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
308    if (filterFromEndpoint != null) {
309      filters.add(filterFromEndpoint);
310    }
311    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
312    this.walEntryFilter = new ChainWALEntryFilter(filters);
313  }
314
315  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
316    ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
317    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
318    if (extant != null) {
319      if(LOG.isDebugEnabled()) {
320        LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
321          walGroupId);
322      }
323    } else {
324      if(LOG.isDebugEnabled()) {
325        LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
326      }
327      ReplicationSourceWALReader walReader =
328        createNewWALReader(walGroupId, queue, worker.getStartPosition());
329      Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
330        ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
331      worker.setWALReader(walReader);
332      worker.startup(this::uncaughtException);
333    }
334  }
335
336  @Override
337  public Map<String, ReplicationStatus> getWalGroupStatus() {
338    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
339    long ageOfLastShippedOp, replicationDelay, fileSize;
340    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
341      String walGroupId = walGroupShipper.getKey();
342      ReplicationSourceShipper shipper = walGroupShipper.getValue();
343      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
344      int queueSize = queues.get(walGroupId).size();
345      replicationDelay = metrics.getReplicationDelay();
346      Path currentPath = shipper.getCurrentPath();
347      fileSize = -1;
348      if (currentPath != null) {
349        try {
350          fileSize = getFileSize(currentPath);
351        } catch (IOException e) {
352          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
353        }
354      } else {
355        currentPath = new Path("NO_LOGS_IN_QUEUE");
356        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
357      }
358      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
359      statusBuilder.withPeerId(this.getPeerId())
360          .withQueueSize(queueSize)
361          .withWalGroup(walGroupId)
362          .withCurrentPath(currentPath)
363          .withCurrentPosition(shipper.getCurrentPosition())
364          .withFileSize(fileSize)
365          .withAgeOfLastShippedOp(ageOfLastShippedOp)
366          .withReplicationDelay(replicationDelay);
367      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
368    }
369    return sourceReplicationStatus;
370  }
371
372  private long getFileSize(Path currentPath) throws IOException {
373    long fileSize;
374    try {
375      fileSize = fs.getContentSummary(currentPath).getLength();
376    } catch (FileNotFoundException e) {
377      currentPath = getArchivedLogPath(currentPath, conf);
378      fileSize = fs.getContentSummary(currentPath).getLength();
379    }
380    return fileSize;
381  }
382
383  protected ReplicationSourceShipper createNewShipper(String walGroupId,
384      PriorityBlockingQueue<Path> queue) {
385    return new ReplicationSourceShipper(conf, walGroupId, queue, this);
386  }
387
388  private ReplicationSourceWALReader createNewWALReader(String walGroupId,
389      PriorityBlockingQueue<Path> queue, long startPosition) {
390    return replicationPeer.getPeerConfig().isSerial()
391      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
392      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
393  }
394
395  protected final void uncaughtException(Thread t, Throwable e) {
396    RSRpcServices.exitIfOOME(e);
397    LOG.error("Unexpected exception in {} currentPath={}",
398      t.getName(), getCurrentPath(), e);
399    server.abort("Unexpected exception in " + t.getName(), e);
400  }
401
402  @Override
403  public ReplicationEndpoint getReplicationEndpoint() {
404    return this.replicationEndpoint;
405  }
406
407  @Override
408  public ReplicationSourceManager getSourceManager() {
409    return this.manager;
410  }
411
412  @Override
413  public void tryThrottle(int batchSize) throws InterruptedException {
414    checkBandwidthChangeAndResetThrottler();
415    if (throttler.isEnabled()) {
416      long sleepTicks = throttler.getNextSleepInterval(batchSize);
417      if (sleepTicks > 0) {
418        if (LOG.isTraceEnabled()) {
419          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
420        }
421        Thread.sleep(sleepTicks);
422        // reset throttler's cycle start tick when sleep for throttling occurs
423        throttler.resetStartTick();
424      }
425    }
426  }
427
428  private void checkBandwidthChangeAndResetThrottler() {
429    long peerBandwidth = getCurrentBandwidth();
430    if (peerBandwidth != currentBandwidth) {
431      currentBandwidth = peerBandwidth;
432      throttler.setBandwidth((double) currentBandwidth / 10.0);
433      LOG.info("ReplicationSource : " + peerId
434          + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
435    }
436  }
437
438  private long getCurrentBandwidth() {
439    long peerBandwidth = replicationPeer.getPeerBandwidth();
440    // user can set peer bandwidth to 0 to use default bandwidth
441    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
442  }
443
444  /**
445   * Do the sleeping logic
446   * @param msg Why we sleep
447   * @param sleepMultiplier by how many times the default sleeping time is augmented
448   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
449   */
450  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
451    try {
452      if (LOG.isTraceEnabled()) {
453        LOG.trace("{} {}, sleeping {} times {}",
454          logPeerId(), msg, sleepForRetries, sleepMultiplier);
455      }
456      Thread.sleep(this.sleepForRetries * sleepMultiplier);
457    } catch (InterruptedException e) {
458      if(LOG.isDebugEnabled()) {
459        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
460      }
461      Thread.currentThread().interrupt();
462    }
463    return sleepMultiplier < maxRetriesMultiplier;
464  }
465
466  /**
467   * check whether the peer is enabled or not
468   * @return true if the peer is enabled, otherwise false
469   */
470  @Override
471  public boolean isPeerEnabled() {
472    return replicationPeer.isPeerEnabled();
473  }
474
475  private void initialize() {
476    int sleepMultiplier = 1;
477    while (this.isSourceActive()) {
478      ReplicationEndpoint replicationEndpoint;
479      try {
480        replicationEndpoint = createReplicationEndpoint();
481      } catch (Exception e) {
482        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
483        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
484          sleepMultiplier++;
485        }
486        continue;
487      }
488
489      try {
490        initAndStartReplicationEndpoint(replicationEndpoint);
491        this.replicationEndpoint = replicationEndpoint;
492        break;
493      } catch (Exception e) {
494        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
495        replicationEndpoint.stop();
496        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
497          sleepMultiplier++;
498        }
499      }
500    }
501
502    if (!this.isSourceActive()) {
503      return;
504    }
505
506    sleepMultiplier = 1;
507    UUID peerClusterId;
508    // delay this until we are in an asynchronous thread
509    for (;;) {
510      peerClusterId = replicationEndpoint.getPeerUUID();
511      if (this.isSourceActive() && peerClusterId == null) {
512        if(LOG.isDebugEnabled()) {
513          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
514            (this.sleepForRetries * sleepMultiplier));
515        }
516        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
517          sleepMultiplier++;
518        }
519      } else {
520        break;
521      }
522    }
523
524    if (!this.isSourceActive()) {
525      return;
526    }
527
528    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
529    // peerClusterId value, which is the same as the source clusterId
530    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
531      this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
532          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
533          + replicationEndpoint.getClass().getName(), null, false);
534      this.manager.removeSource(this);
535      return;
536    }
537    LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
538      logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
539
540    initializeWALEntryFilter(peerClusterId);
541    // start workers
542    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
543      String walGroupId = entry.getKey();
544      PriorityBlockingQueue<Path> queue = entry.getValue();
545      tryStartNewShipper(walGroupId, queue);
546    }
547  }
548
549  @Override
550  public void startup() {
551    // mark we are running now
552    this.sourceRunning = true;
553    initThread = new Thread(this::initialize);
554    Threads.setDaemonThreadRunning(initThread,
555      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
556      this::uncaughtException);
557  }
558
559  @Override
560  public void terminate(String reason) {
561    terminate(reason, null);
562  }
563
564  @Override
565  public void terminate(String reason, Exception cause) {
566    terminate(reason, cause, true);
567  }
568
569  @Override
570  public void terminate(String reason, Exception cause, boolean clearMetrics) {
571    terminate(reason, cause, clearMetrics, true);
572  }
573
574  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
575    if (cause == null) {
576      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
577    } else {
578      LOG.error("{} Closing source {} because an error occurred: {}",
579        logPeerId(), this.queueId, reason, cause);
580    }
581    this.sourceRunning = false;
582    if (initThread != null && Thread.currentThread() != initThread) {
583      // This usually won't happen but anyway, let's wait until the initialization thread exits.
584      // And notice that we may call terminate directly from the initThread so here we need to
585      // avoid join on ourselves.
586      initThread.interrupt();
587      Threads.shutdown(initThread, this.sleepForRetries);
588    }
589    Collection<ReplicationSourceShipper> workers = workerThreads.values();
590    for (ReplicationSourceShipper worker : workers) {
591      worker.stopWorker();
592      if(worker.entryReader != null) {
593        worker.entryReader.setReaderRunning(false);
594      }
595    }
596
597    for (ReplicationSourceShipper worker : workers) {
598      if (worker.isAlive() || worker.entryReader.isAlive()) {
599        try {
600          // Wait worker to stop
601          Thread.sleep(this.sleepForRetries);
602        } catch (InterruptedException e) {
603          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
604          Thread.currentThread().interrupt();
605        }
606        // If worker still is alive after waiting, interrupt it
607        if (worker.isAlive()) {
608          worker.interrupt();
609        }
610        // If entry reader is alive after waiting, interrupt it
611        if (worker.entryReader.isAlive()) {
612          worker.entryReader.interrupt();
613        }
614      }
615    }
616
617    if (this.replicationEndpoint != null) {
618      this.replicationEndpoint.stop();
619    }
620    if (join) {
621      for (ReplicationSourceShipper worker : workers) {
622        Threads.shutdown(worker, this.sleepForRetries);
623        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
624      }
625      if (this.replicationEndpoint != null) {
626        try {
627          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
628            TimeUnit.MILLISECONDS);
629        } catch (TimeoutException te) {
630          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
631            + "for replication source : {}", logPeerId(), this.queueId, te);
632        }
633      }
634    }
635    if (clearMetrics) {
636      this.metrics.clear();
637    }
638  }
639
640  @Override
641  public String getQueueId() {
642    return this.queueId;
643  }
644
645  @Override
646  public String getPeerId() {
647    return this.peerId;
648  }
649
650  @Override
651  public Path getCurrentPath() {
652    // only for testing
653    for (ReplicationSourceShipper worker : workerThreads.values()) {
654      if (worker.getCurrentPath() != null) {
655        return worker.getCurrentPath();
656      }
657    }
658    return null;
659  }
660
661  @Override
662  public boolean isSourceActive() {
663    return !this.server.isStopped() && this.sourceRunning;
664  }
665
666  public UUID getPeerClusterUUID(){
667    return this.clusterId;
668  }
669
670  /**
671   * Comparator used to compare logs together based on their start time
672   */
673  public static class LogsComparator implements Comparator<Path> {
674
675    @Override
676    public int compare(Path o1, Path o2) {
677      return Long.compare(getTS(o1), getTS(o2));
678    }
679
680    /**
681     * Split a path to get the start time
682     * For example: 10.20.20.171%3A60020.1277499063250
683     * @param p path to split
684     * @return start time
685     */
686    private static long getTS(Path p) {
687      int tsIndex = p.getName().lastIndexOf('.') + 1;
688      return Long.parseLong(p.getName().substring(tsIndex));
689    }
690  }
691
692  public ReplicationQueueInfo getReplicationQueueInfo() {
693    return replicationQueueInfo;
694  }
695
696  public boolean isWorkerRunning(){
697    for(ReplicationSourceShipper worker : this.workerThreads.values()){
698      if(worker.isActive()){
699        return worker.isActive();
700      }
701    }
702    return false;
703  }
704
705  @Override
706  public String getStats() {
707    StringBuilder sb = new StringBuilder();
708    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
709        .append(", current progress: \n");
710    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
711      String walGroupId = entry.getKey();
712      ReplicationSourceShipper worker = entry.getValue();
713      long position = worker.getCurrentPosition();
714      Path currentPath = worker.getCurrentPath();
715      sb.append("walGroup [").append(walGroupId).append("]: ");
716      if (currentPath != null) {
717        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
718            .append(position).append("\n");
719      } else {
720        sb.append("no replication ongoing, waiting for new log");
721      }
722    }
723    return sb.toString();
724  }
725
726  @Override
727  public MetricsSource getSourceMetrics() {
728    return this.metrics;
729  }
730
731  @Override
732  //offsets totalBufferUsed by deducting shipped batchSize.
733  public void postShipEdits(List<Entry> entries, int batchSize) {
734    if (throttler.isEnabled()) {
735      throttler.addPushSize(batchSize);
736    }
737    totalReplicatedEdits.addAndGet(entries.size());
738    totalBufferUsed.addAndGet(-batchSize);
739  }
740
741  @Override
742  public WALFileLengthProvider getWALFileLengthProvider() {
743    return walFileLengthProvider;
744  }
745
746  @Override
747  public ServerName getServerWALsBelongTo() {
748    return server.getServerName();
749  }
750
751  Server getServer() {
752    return server;
753  }
754
755  ReplicationQueueStorage getQueueStorage() {
756    return queueStorage;
757  }
758
759  private String logPeerId(){
760    return "[Source for peer " + this.getPeerId() + "]:";
761  }
762}