001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.UUID;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.PriorityBlockingQueue;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicLong;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableDescriptors;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.regionserver.RSRpcServices;
051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
054import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationException;
056import org.apache.hadoop.hbase.replication.ReplicationPeer;
057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072
073/**
074 * Class that handles the source of a replication stream.
075 * Currently does not handle more than 1 slave
076 * For each slave cluster it selects a random number of peers
077 * using a replication ratio. For example, if replication ration = 0.1
078 * and slave cluster has 100 region servers, 10 will be selected.
079 * <p>
080 * A stream is considered down when we cannot contact a region server on the
081 * peer cluster for more than 55 seconds by default.
082 * </p>
083 */
084@InterfaceAudience.Private
085public class ReplicationSource implements ReplicationSourceInterface {
086
087  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
088  // Queues of logs to process, entry in format of walGroupId->queue,
089  // each presents a queue for one wal group
090  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
091  // per group queue size, keep no more than this number of logs in each wal group
092  protected int queueSizePerGroup;
093  protected ReplicationQueueStorage queueStorage;
094  protected ReplicationPeer replicationPeer;
095
096  protected Configuration conf;
097  protected ReplicationQueueInfo replicationQueueInfo;
098
099  // The manager of all sources to which we ping back our progress
100  protected ReplicationSourceManager manager;
101  // Should we stop everything?
102  protected Server server;
103  // How long should we sleep for each retry
104  private long sleepForRetries;
105  protected FileSystem fs;
106  // id of this cluster
107  private UUID clusterId;
108  // total number of edits we replicated
109  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
110  // The znode we currently play with
111  protected String queueId;
112  // Maximum number of retries before taking bold actions
113  private int maxRetriesMultiplier;
114  // Indicates if this particular source is running
115  private volatile boolean sourceRunning = false;
116  // Metrics for this source
117  private MetricsSource metrics;
118  // WARN threshold for the number of queued logs, defaults to 2
119  private int logQueueWarnThreshold;
120  // ReplicationEndpoint which will handle the actual replication
121  private volatile ReplicationEndpoint replicationEndpoint;
122  // A filter (or a chain of filters) for the WAL entries.
123  protected volatile WALEntryFilter walEntryFilter;
124  // throttler
125  private ReplicationThrottler throttler;
126  private long defaultBandwidth;
127  private long currentBandwidth;
128  private WALFileLengthProvider walFileLengthProvider;
129  @VisibleForTesting
130  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
131      new ConcurrentHashMap<>();
132
133  private AtomicLong totalBufferUsed;
134
135  public static final String WAIT_ON_ENDPOINT_SECONDS =
136    "hbase.replication.wait.on.endpoint.seconds";
137  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
138  private int waitOnEndpointSeconds = -1;
139
140  private Thread initThread;
141
142  /**
143   * Instantiation method used by region servers
144   * @param conf configuration to use
145   * @param fs file system to use
146   * @param manager replication manager to ping to
147   * @param server the server for this region server
148   * @param queueId the id of our replication queue
149   * @param clusterId unique UUID for the cluster
150   * @param metrics metrics for replication source
151   */
152  @Override
153  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
154      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
155      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
156      MetricsSource metrics) throws IOException {
157    this.server = server;
158    this.conf = HBaseConfiguration.create(conf);
159    this.waitOnEndpointSeconds =
160      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
161    decorateConf();
162    this.sleepForRetries =
163        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
164    this.maxRetriesMultiplier =
165        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
166    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
167    this.queueStorage = queueStorage;
168    this.replicationPeer = replicationPeer;
169    this.manager = manager;
170    this.fs = fs;
171    this.metrics = metrics;
172    this.clusterId = clusterId;
173
174    this.queueId = queueId;
175    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
176    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
177
178    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
179    currentBandwidth = getCurrentBandwidth();
180    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
181    this.totalBufferUsed = manager.getTotalBufferUsed();
182    this.walFileLengthProvider = walFileLengthProvider;
183    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
184      replicationPeer.getId(), this.currentBandwidth);
185  }
186
187  private void decorateConf() {
188    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
189    if (StringUtils.isNotEmpty(replicationCodec)) {
190      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
191    }
192  }
193
194  @Override
195  public void enqueueLog(Path log) {
196    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
197    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
198    if (queue == null) {
199      queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
200      // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
201      // the shipper may quit immediately
202      queue.put(log);
203      queues.put(logPrefix, queue);
204      if (this.isSourceActive() && this.walEntryFilter != null) {
205        // new wal group observed after source startup, start a new worker thread to track it
206        // notice: it's possible that log enqueued when this.running is set but worker thread
207        // still not launched, so it's necessary to check workerThreads before start the worker
208        tryStartNewShipper(logPrefix, queue);
209      }
210    } else {
211      queue.put(log);
212    }
213    if (LOG.isTraceEnabled()) {
214      LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
215        this.replicationQueueInfo.getQueueId());
216    }
217    this.metrics.incrSizeOfLogQueue();
218    // This will log a warning for each new log that gets created above the warn threshold
219    int queueSize = queue.size();
220    if (queueSize > this.logQueueWarnThreshold) {
221      LOG.warn("{} WAL group {} queue size: {} exceeds value of "
222        + "replication.source.log.queue.warn: {}", logPeerId(),
223        logPrefix, queueSize, logQueueWarnThreshold);
224    }
225  }
226
227  @Override
228  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
229      throws ReplicationException {
230    String peerId = replicationPeer.getId();
231    Set<String> namespaces = replicationPeer.getNamespaces();
232    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
233    if (tableCFMap != null) { // All peers with TableCFs
234      List<String> tableCfs = tableCFMap.get(tableName);
235      if (tableCFMap.containsKey(tableName)
236          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
237        this.queueStorage.addHFileRefs(peerId, pairs);
238        metrics.incrSizeOfHFileRefsQueue(pairs.size());
239      } else {
240        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
241            tableName, Bytes.toString(family), peerId);
242      }
243    } else if (namespaces != null) { // Only for set NAMESPACES peers
244      if (namespaces.contains(tableName.getNamespaceAsString())) {
245        this.queueStorage.addHFileRefs(peerId, pairs);
246        metrics.incrSizeOfHFileRefsQueue(pairs.size());
247      } else {
248        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
249            tableName, Bytes.toString(family), peerId);
250      }
251    } else {
252      // user has explicitly not defined any table cfs for replication, means replicate all the
253      // data
254      this.queueStorage.addHFileRefs(peerId, pairs);
255      metrics.incrSizeOfHFileRefsQueue(pairs.size());
256    }
257  }
258
259  private ReplicationEndpoint createReplicationEndpoint()
260      throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
261    RegionServerCoprocessorHost rsServerHost = null;
262    if (server instanceof HRegionServer) {
263      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
264    }
265    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
266
267    ReplicationEndpoint replicationEndpoint;
268    if (replicationEndpointImpl == null) {
269      // Default to HBase inter-cluster replication endpoint; skip reflection
270      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
271    } else {
272      try {
273        replicationEndpoint = Class.forName(replicationEndpointImpl)
274            .asSubclass(ReplicationEndpoint.class)
275            .getDeclaredConstructor()
276            .newInstance();
277      } catch (NoSuchMethodException | InvocationTargetException e) {
278        throw new IllegalArgumentException(e);
279      }
280    }
281    if (rsServerHost != null) {
282      ReplicationEndpoint newReplicationEndPoint =
283        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
284      if (newReplicationEndPoint != null) {
285        // Override the newly created endpoint from the hook with configured end point
286        replicationEndpoint = newReplicationEndPoint;
287      }
288    }
289    return replicationEndpoint;
290  }
291
292  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
293      throws IOException, TimeoutException {
294    TableDescriptors tableDescriptors = null;
295    if (server instanceof HRegionServer) {
296      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
297    }
298    replicationEndpoint
299      .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
300        replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
301    replicationEndpoint.start();
302    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
303  }
304
305  private void initializeWALEntryFilter(UUID peerClusterId) {
306    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
307    ArrayList<WALEntryFilter> filters =
308      Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
309    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
310    if (filterFromEndpoint != null) {
311      filters.add(filterFromEndpoint);
312    }
313    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
314    this.walEntryFilter = new ChainWALEntryFilter(filters);
315  }
316
317  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
318    workerThreads.compute(walGroupId, (key, value) -> {
319      if (value != null) {
320        if (LOG.isDebugEnabled()) {
321          LOG.debug(
322              "{} Someone has beat us to start a worker thread for wal group {}",
323              logPeerId(), key);
324        }
325        return value;
326      } else {
327        if (LOG.isDebugEnabled()) {
328          LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
329        }
330        ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
331        ReplicationSourceWALReader walReader =
332            createNewWALReader(walGroupId, queue, worker.getStartPosition());
333        Threads.setDaemonThreadRunning(
334            walReader, Thread.currentThread().getName()
335                + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
336            this::uncaughtException);
337        worker.setWALReader(walReader);
338        worker.startup(this::uncaughtException);
339        return worker;
340      }
341    });
342  }
343
344  @Override
345  public Map<String, ReplicationStatus> getWalGroupStatus() {
346    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
347    long ageOfLastShippedOp, replicationDelay, fileSize;
348    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
349      String walGroupId = walGroupShipper.getKey();
350      ReplicationSourceShipper shipper = walGroupShipper.getValue();
351      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
352      int queueSize = queues.get(walGroupId).size();
353      replicationDelay = metrics.getReplicationDelay();
354      Path currentPath = shipper.getCurrentPath();
355      fileSize = -1;
356      if (currentPath != null) {
357        try {
358          fileSize = getFileSize(currentPath);
359        } catch (IOException e) {
360          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
361        }
362      } else {
363        currentPath = new Path("NO_LOGS_IN_QUEUE");
364        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
365      }
366      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
367      statusBuilder.withPeerId(this.getPeerId())
368          .withQueueSize(queueSize)
369          .withWalGroup(walGroupId)
370          .withCurrentPath(currentPath)
371          .withCurrentPosition(shipper.getCurrentPosition())
372          .withFileSize(fileSize)
373          .withAgeOfLastShippedOp(ageOfLastShippedOp)
374          .withReplicationDelay(replicationDelay);
375      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
376    }
377    return sourceReplicationStatus;
378  }
379
380  private long getFileSize(Path currentPath) throws IOException {
381    long fileSize;
382    try {
383      fileSize = fs.getContentSummary(currentPath).getLength();
384    } catch (FileNotFoundException e) {
385      currentPath = getArchivedLogPath(currentPath, conf);
386      fileSize = fs.getContentSummary(currentPath).getLength();
387    }
388    return fileSize;
389  }
390
391  protected ReplicationSourceShipper createNewShipper(String walGroupId,
392      PriorityBlockingQueue<Path> queue) {
393    return new ReplicationSourceShipper(conf, walGroupId, queue, this);
394  }
395
396  private ReplicationSourceWALReader createNewWALReader(String walGroupId,
397      PriorityBlockingQueue<Path> queue, long startPosition) {
398    return replicationPeer.getPeerConfig().isSerial()
399      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
400      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
401  }
402
403  protected final void uncaughtException(Thread t, Throwable e) {
404    RSRpcServices.exitIfOOME(e);
405    LOG.error("Unexpected exception in {} currentPath={}",
406      t.getName(), getCurrentPath(), e);
407    server.abort("Unexpected exception in " + t.getName(), e);
408  }
409
410  @Override
411  public ReplicationEndpoint getReplicationEndpoint() {
412    return this.replicationEndpoint;
413  }
414
415  @Override
416  public ReplicationSourceManager getSourceManager() {
417    return this.manager;
418  }
419
420  @Override
421  public void tryThrottle(int batchSize) throws InterruptedException {
422    checkBandwidthChangeAndResetThrottler();
423    if (throttler.isEnabled()) {
424      long sleepTicks = throttler.getNextSleepInterval(batchSize);
425      if (sleepTicks > 0) {
426        if (LOG.isTraceEnabled()) {
427          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
428        }
429        Thread.sleep(sleepTicks);
430        // reset throttler's cycle start tick when sleep for throttling occurs
431        throttler.resetStartTick();
432      }
433    }
434  }
435
436  private void checkBandwidthChangeAndResetThrottler() {
437    long peerBandwidth = getCurrentBandwidth();
438    if (peerBandwidth != currentBandwidth) {
439      currentBandwidth = peerBandwidth;
440      throttler.setBandwidth((double) currentBandwidth / 10.0);
441      LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}",
442        replicationPeer.getId(), currentBandwidth);
443    }
444  }
445
446  private long getCurrentBandwidth() {
447    long peerBandwidth = replicationPeer.getPeerBandwidth();
448    // user can set peer bandwidth to 0 to use default bandwidth
449    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
450  }
451
452  /**
453   * Do the sleeping logic
454   * @param msg Why we sleep
455   * @param sleepMultiplier by how many times the default sleeping time is augmented
456   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
457   */
458  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
459    try {
460      if (LOG.isTraceEnabled()) {
461        LOG.trace("{} {}, sleeping {} times {}",
462          logPeerId(), msg, sleepForRetries, sleepMultiplier);
463      }
464      Thread.sleep(this.sleepForRetries * sleepMultiplier);
465    } catch (InterruptedException e) {
466      if(LOG.isDebugEnabled()) {
467        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
468      }
469      Thread.currentThread().interrupt();
470    }
471    return sleepMultiplier < maxRetriesMultiplier;
472  }
473
474  private void initialize() {
475    int sleepMultiplier = 1;
476    while (this.isSourceActive()) {
477      ReplicationEndpoint replicationEndpoint;
478      try {
479        replicationEndpoint = createReplicationEndpoint();
480      } catch (Exception e) {
481        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
482        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
483          sleepMultiplier++;
484        }
485        continue;
486      }
487
488      try {
489        initAndStartReplicationEndpoint(replicationEndpoint);
490        this.replicationEndpoint = replicationEndpoint;
491        break;
492      } catch (Exception e) {
493        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
494        replicationEndpoint.stop();
495        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
496          sleepMultiplier++;
497        }
498      }
499    }
500
501    if (!this.isSourceActive()) {
502      return;
503    }
504
505    sleepMultiplier = 1;
506    UUID peerClusterId;
507    // delay this until we are in an asynchronous thread
508    for (;;) {
509      peerClusterId = replicationEndpoint.getPeerUUID();
510      if (this.isSourceActive() && peerClusterId == null) {
511        if(LOG.isDebugEnabled()) {
512          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
513            (this.sleepForRetries * sleepMultiplier));
514        }
515        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
516          sleepMultiplier++;
517        }
518      } else {
519        break;
520      }
521    }
522
523    if(!this.isSourceActive()) {
524      return;
525    }
526
527    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
528    // peerClusterId value, which is the same as the source clusterId
529    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
530      this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
531          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
532          + replicationEndpoint.getClass().getName(), null, false);
533      this.manager.removeSource(this);
534      return;
535    }
536    LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
537      logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
538
539    initializeWALEntryFilter(peerClusterId);
540    // start workers
541    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
542      String walGroupId = entry.getKey();
543      PriorityBlockingQueue<Path> queue = entry.getValue();
544      tryStartNewShipper(walGroupId, queue);
545    }
546  }
547
548  @Override
549  public void startup() {
550    // mark we are running now
551    this.sourceRunning = true;
552    initThread = new Thread(this::initialize);
553    Threads.setDaemonThreadRunning(initThread,
554      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
555      this::uncaughtException);
556  }
557
558  @Override
559  public void terminate(String reason) {
560    terminate(reason, null);
561  }
562
563  @Override
564  public void terminate(String reason, Exception cause) {
565    terminate(reason, cause, true);
566  }
567
568  @Override
569  public void terminate(String reason, Exception cause, boolean clearMetrics) {
570    terminate(reason, cause, clearMetrics, true);
571  }
572
573  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
574    if (cause == null) {
575      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
576    } else {
577      LOG.error("{} Closing source {} because an error occurred: {}",
578        logPeerId(), this.queueId, reason, cause);
579    }
580    this.sourceRunning = false;
581    if (initThread != null && Thread.currentThread() != initThread) {
582      // This usually won't happen but anyway, let's wait until the initialization thread exits.
583      // And notice that we may call terminate directly from the initThread so here we need to
584      // avoid join on ourselves.
585      initThread.interrupt();
586      Threads.shutdown(initThread, this.sleepForRetries);
587    }
588    Collection<ReplicationSourceShipper> workers = workerThreads.values();
589    for (ReplicationSourceShipper worker : workers) {
590      worker.stopWorker();
591      if(worker.entryReader != null) {
592        worker.entryReader.setReaderRunning(false);
593      }
594    }
595
596    for (ReplicationSourceShipper worker : workers) {
597      if (worker.isAlive() || worker.entryReader.isAlive()) {
598        try {
599          // Wait worker to stop
600          Thread.sleep(this.sleepForRetries);
601        } catch (InterruptedException e) {
602          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
603          Thread.currentThread().interrupt();
604        }
605        // If worker still is alive after waiting, interrupt it
606        if (worker.isAlive()) {
607          worker.interrupt();
608        }
609        // If entry reader is alive after waiting, interrupt it
610        if (worker.entryReader.isAlive()) {
611          worker.entryReader.interrupt();
612        }
613      }
614    }
615
616    if (this.replicationEndpoint != null) {
617      this.replicationEndpoint.stop();
618    }
619    if (join) {
620      for (ReplicationSourceShipper worker : workers) {
621        Threads.shutdown(worker, this.sleepForRetries);
622        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
623      }
624      if (this.replicationEndpoint != null) {
625        try {
626          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
627            TimeUnit.MILLISECONDS);
628        } catch (TimeoutException te) {
629          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
630            + "for replication source : {}", logPeerId(), this.queueId, te);
631        }
632      }
633    }
634    if (clearMetrics) {
635      this.metrics.clear();
636    }
637  }
638
639  @Override
640  public String getQueueId() {
641    return this.queueId;
642  }
643
644  @Override
645  public Path getCurrentPath() {
646    // only for testing
647    for (ReplicationSourceShipper worker : workerThreads.values()) {
648      if (worker.getCurrentPath() != null) {
649        return worker.getCurrentPath();
650      }
651    }
652    return null;
653  }
654
655  @Override
656  public boolean isSourceActive() {
657    return !this.server.isStopped() && this.sourceRunning;
658  }
659
660  public UUID getPeerClusterUUID(){
661    return this.clusterId;
662  }
663
664  /**
665   * Comparator used to compare logs together based on their start time
666   */
667  public static class LogsComparator implements Comparator<Path> {
668
669    @Override
670    public int compare(Path o1, Path o2) {
671      return Long.compare(getTS(o1), getTS(o2));
672    }
673
674    /**
675     * <p>
676     * Split a path to get the start time
677     * </p>
678     * <p>
679     * For example: 10.20.20.171%3A60020.1277499063250
680     * </p>
681     * @param p path to split
682     * @return start time
683     */
684    private static long getTS(Path p) {
685      return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
686    }
687  }
688
689  public ReplicationQueueInfo getReplicationQueueInfo() {
690    return replicationQueueInfo;
691  }
692
693  public boolean isWorkerRunning(){
694    for(ReplicationSourceShipper worker : this.workerThreads.values()){
695      if(worker.isActive()){
696        return worker.isActive();
697      }
698    }
699    return false;
700  }
701
702  @Override
703  public String getStats() {
704    StringBuilder sb = new StringBuilder();
705    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
706        .append(", current progress: \n");
707    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
708      String walGroupId = entry.getKey();
709      ReplicationSourceShipper worker = entry.getValue();
710      long position = worker.getCurrentPosition();
711      Path currentPath = worker.getCurrentPath();
712      sb.append("walGroup [").append(walGroupId).append("]: ");
713      if (currentPath != null) {
714        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
715            .append(position).append("\n");
716      } else {
717        sb.append("no replication ongoing, waiting for new log");
718      }
719    }
720    return sb.toString();
721  }
722
723  @Override
724  public MetricsSource getSourceMetrics() {
725    return this.metrics;
726  }
727
728  @Override
729  //offsets totalBufferUsed by deducting shipped batchSize.
730  public void postShipEdits(List<Entry> entries, int batchSize) {
731    if (throttler.isEnabled()) {
732      throttler.addPushSize(batchSize);
733    }
734    totalReplicatedEdits.addAndGet(entries.size());
735    totalBufferUsed.addAndGet(-batchSize);
736  }
737
738  @Override
739  public WALFileLengthProvider getWALFileLengthProvider() {
740    return walFileLengthProvider;
741  }
742
743  @Override
744  public ServerName getServerWALsBelongTo() {
745    return server.getServerName();
746  }
747
748  @Override
749  public ReplicationPeer getPeer() {
750    return replicationPeer;
751  }
752
753  Server getServer() {
754    return server;
755  }
756
757  ReplicationQueueStorage getQueueStorage() {
758    return queueStorage;
759  }
760
761  void removeWorker(ReplicationSourceShipper worker) {
762    workerThreads.remove(worker.walGroupId, worker);
763  }
764
765  private String logPeerId(){
766    return "[Source for peer " + this.getPeer().getId() + "]:";
767  }
768}