001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.PriorityBlockingQueue;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.AtomicLong;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.Server;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableDescriptors;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RSRpcServices;
050import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
051import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
052import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
053import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
054import org.apache.hadoop.hbase.replication.ReplicationException;
055import org.apache.hadoop.hbase.replication.ReplicationPeer;
056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
058import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
059import org.apache.hadoop.hbase.replication.WALEntryFilter;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
064import org.apache.hadoop.hbase.wal.WAL.Entry;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070
071/**
072 * Class that handles the source of a replication stream.
073 * Currently does not handle more than 1 slave
074 * For each slave cluster it selects a random number of peers
075 * using a replication ratio. For example, if replication ration = 0.1
076 * and slave cluster has 100 region servers, 10 will be selected.
077 * <p>
078 * A stream is considered down when we cannot contact a region server on the
079 * peer cluster for more than 55 seconds by default.
080 * </p>
081 */
082@InterfaceAudience.Private
083public class ReplicationSource implements ReplicationSourceInterface {
084
085  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
086  // Queues of logs to process, entry in format of walGroupId->queue,
087  // each presents a queue for one wal group
088  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
089  // per group queue size, keep no more than this number of logs in each wal group
090  protected int queueSizePerGroup;
091  protected ReplicationQueueStorage queueStorage;
092  protected ReplicationPeer replicationPeer;
093
094  protected Configuration conf;
095  protected ReplicationQueueInfo replicationQueueInfo;
096  // id of the peer cluster this source replicates to
097  private String peerId;
098
099  // The manager of all sources to which we ping back our progress
100  protected ReplicationSourceManager manager;
101  // Should we stop everything?
102  protected Server server;
103  // How long should we sleep for each retry
104  private long sleepForRetries;
105  protected FileSystem fs;
106  // id of this cluster
107  private UUID clusterId;
108  // total number of edits we replicated
109  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
110  // The znode we currently play with
111  protected String queueId;
112  // Maximum number of retries before taking bold actions
113  private int maxRetriesMultiplier;
114  // Indicates if this particular source is running
115  private volatile boolean sourceRunning = false;
116  // Metrics for this source
117  private MetricsSource metrics;
118  // WARN threshold for the number of queued logs, defaults to 2
119  private int logQueueWarnThreshold;
120  // ReplicationEndpoint which will handle the actual replication
121  private volatile ReplicationEndpoint replicationEndpoint;
122  // A filter (or a chain of filters) for the WAL entries.
123  protected volatile WALEntryFilter walEntryFilter;
124  // throttler
125  private ReplicationThrottler throttler;
126  private long defaultBandwidth;
127  private long currentBandwidth;
128  private WALFileLengthProvider walFileLengthProvider;
129  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
130      new ConcurrentHashMap<>();
131
132  private AtomicLong totalBufferUsed;
133
134  public static final String WAIT_ON_ENDPOINT_SECONDS =
135    "hbase.replication.wait.on.endpoint.seconds";
136  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
137  private int waitOnEndpointSeconds = -1;
138
139  private Thread initThread;
140
141  /**
142   * Instantiation method used by region servers
143   * @param conf configuration to use
144   * @param fs file system to use
145   * @param manager replication manager to ping to
146   * @param server the server for this region server
147   * @param queueId the id of our replication queue
148   * @param clusterId unique UUID for the cluster
149   * @param metrics metrics for replication source
150   */
151  @Override
152  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
153      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
154      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
155      MetricsSource metrics) throws IOException {
156    this.server = server;
157    this.conf = HBaseConfiguration.create(conf);
158    this.waitOnEndpointSeconds =
159      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
160    decorateConf();
161    this.sleepForRetries =
162        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
163    this.maxRetriesMultiplier =
164        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
165    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
166    this.queueStorage = queueStorage;
167    this.replicationPeer = replicationPeer;
168    this.manager = manager;
169    this.fs = fs;
170    this.metrics = metrics;
171    this.clusterId = clusterId;
172
173    this.queueId = queueId;
174    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
175    // ReplicationQueueInfo parses the peerId out of the znode for us
176    this.peerId = this.replicationQueueInfo.getPeerId();
177    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
178
179    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
180    currentBandwidth = getCurrentBandwidth();
181    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
182    this.totalBufferUsed = manager.getTotalBufferUsed();
183    this.walFileLengthProvider = walFileLengthProvider;
184    LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
185        + ", currentBandwidth=" + this.currentBandwidth);
186  }
187
188  private void decorateConf() {
189    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
190    if (StringUtils.isNotEmpty(replicationCodec)) {
191      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
192    }
193  }
194
195  @Override
196  public void enqueueLog(Path log) {
197    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
198    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
199    if (queue == null) {
200      queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
201      queues.put(logPrefix, queue);
202      if (this.isSourceActive() && this.walEntryFilter != null) {
203        // new wal group observed after source startup, start a new worker thread to track it
204        // notice: it's possible that log enqueued when this.running is set but worker thread
205        // still not launched, so it's necessary to check workerThreads before start the worker
206        tryStartNewShipper(logPrefix, queue);
207      }
208    }
209    queue.put(log);
210    if (LOG.isTraceEnabled()) {
211      LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
212        this.replicationQueueInfo.getQueueId());
213    }
214    this.metrics.incrSizeOfLogQueue();
215    // This will log a warning for each new log that gets created above the warn threshold
216    int queueSize = queue.size();
217    if (queueSize > this.logQueueWarnThreshold) {
218      LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
219          + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
220    }
221  }
222
223  @Override
224  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
225      throws ReplicationException {
226    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
227    if (tableCFMap != null) {
228      List<String> tableCfs = tableCFMap.get(tableName);
229      if (tableCFMap.containsKey(tableName)
230          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
231        this.queueStorage.addHFileRefs(peerId, pairs);
232        metrics.incrSizeOfHFileRefsQueue(pairs.size());
233      } else {
234        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
235          tableName, Bytes.toString(family), peerId);
236      }
237    } else {
238      // user has explicitly not defined any table cfs for replication, means replicate all the
239      // data
240      this.queueStorage.addHFileRefs(peerId, pairs);
241      metrics.incrSizeOfHFileRefsQueue(pairs.size());
242    }
243  }
244
245  private ReplicationEndpoint createReplicationEndpoint()
246      throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
247    RegionServerCoprocessorHost rsServerHost = null;
248    if (server instanceof HRegionServer) {
249      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
250    }
251    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
252
253    ReplicationEndpoint replicationEndpoint;
254    if (replicationEndpointImpl == null) {
255      // Default to HBase inter-cluster replication endpoint; skip reflection
256      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
257    } else {
258      try {
259        replicationEndpoint = Class.forName(replicationEndpointImpl)
260            .asSubclass(ReplicationEndpoint.class)
261            .getDeclaredConstructor()
262            .newInstance();
263      } catch (NoSuchMethodException | InvocationTargetException e) {
264        throw new IllegalArgumentException(e);
265      }
266    }
267    if (rsServerHost != null) {
268      ReplicationEndpoint newReplicationEndPoint =
269        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
270      if (newReplicationEndPoint != null) {
271        // Override the newly created endpoint from the hook with configured end point
272        replicationEndpoint = newReplicationEndPoint;
273      }
274    }
275    return replicationEndpoint;
276  }
277
278  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
279      throws IOException, TimeoutException {
280    TableDescriptors tableDescriptors = null;
281    if (server instanceof HRegionServer) {
282      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
283    }
284    replicationEndpoint
285        .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
286            clusterId, replicationPeer, metrics, tableDescriptors, server));
287    replicationEndpoint.start();
288    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
289  }
290
291  private void initializeWALEntryFilter(UUID peerClusterId) {
292    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
293    ArrayList<WALEntryFilter> filters =
294      Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
295    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
296    if (filterFromEndpoint != null) {
297      filters.add(filterFromEndpoint);
298    }
299    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
300    this.walEntryFilter = new ChainWALEntryFilter(filters);
301  }
302
303  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
304    ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
305    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
306    if (extant != null) {
307      if(LOG.isDebugEnabled()) {
308        LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
309          walGroupId);
310      }
311    } else {
312      if(LOG.isDebugEnabled()) {
313        LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
314      }
315      ReplicationSourceWALReader walReader =
316        createNewWALReader(walGroupId, queue, worker.getStartPosition());
317      Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
318        ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
319      worker.setWALReader(walReader);
320      worker.startup(this::uncaughtException);
321    }
322  }
323
324  @Override
325  public Map<String, ReplicationStatus> getWalGroupStatus() {
326    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
327    long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
328    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
329      String walGroupId = walGroupShipper.getKey();
330      ReplicationSourceShipper shipper = walGroupShipper.getValue();
331      lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
332      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
333      int queueSize = queues.get(walGroupId).size();
334      replicationDelay =
335          ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
336      Path currentPath = shipper.getCurrentPath();
337      fileSize = -1;
338      if (currentPath != null) {
339        try {
340          fileSize = getFileSize(currentPath);
341        } catch (IOException e) {
342          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
343        }
344      } else {
345        currentPath = new Path("NO_LOGS_IN_QUEUE");
346        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
347      }
348      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
349      statusBuilder.withPeerId(this.getPeerId())
350          .withQueueSize(queueSize)
351          .withWalGroup(walGroupId)
352          .withCurrentPath(currentPath)
353          .withCurrentPosition(shipper.getCurrentPosition())
354          .withFileSize(fileSize)
355          .withAgeOfLastShippedOp(ageOfLastShippedOp)
356          .withReplicationDelay(replicationDelay);
357      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
358    }
359    return sourceReplicationStatus;
360  }
361
362  private long getFileSize(Path currentPath) throws IOException {
363    long fileSize;
364    try {
365      fileSize = fs.getContentSummary(currentPath).getLength();
366    } catch (FileNotFoundException e) {
367      currentPath = getArchivedLogPath(currentPath, conf);
368      fileSize = fs.getContentSummary(currentPath).getLength();
369    }
370    return fileSize;
371  }
372
373  protected ReplicationSourceShipper createNewShipper(String walGroupId,
374      PriorityBlockingQueue<Path> queue) {
375    return new ReplicationSourceShipper(conf, walGroupId, queue, this);
376  }
377
378  private ReplicationSourceWALReader createNewWALReader(String walGroupId,
379      PriorityBlockingQueue<Path> queue, long startPosition) {
380    return replicationPeer.getPeerConfig().isSerial()
381      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
382      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
383  }
384
385  protected final void uncaughtException(Thread t, Throwable e) {
386    RSRpcServices.exitIfOOME(e);
387    LOG.error("Unexpected exception in {} currentPath={}",
388      t.getName(), getCurrentPath(), e);
389    server.abort("Unexpected exception in " + t.getName(), e);
390  }
391
392  @Override
393  public ReplicationEndpoint getReplicationEndpoint() {
394    return this.replicationEndpoint;
395  }
396
397  @Override
398  public ReplicationSourceManager getSourceManager() {
399    return this.manager;
400  }
401
402  @Override
403  public void tryThrottle(int batchSize) throws InterruptedException {
404    checkBandwidthChangeAndResetThrottler();
405    if (throttler.isEnabled()) {
406      long sleepTicks = throttler.getNextSleepInterval(batchSize);
407      if (sleepTicks > 0) {
408        if (LOG.isTraceEnabled()) {
409          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
410        }
411        Thread.sleep(sleepTicks);
412        // reset throttler's cycle start tick when sleep for throttling occurs
413        throttler.resetStartTick();
414      }
415    }
416  }
417
418  private void checkBandwidthChangeAndResetThrottler() {
419    long peerBandwidth = getCurrentBandwidth();
420    if (peerBandwidth != currentBandwidth) {
421      currentBandwidth = peerBandwidth;
422      throttler.setBandwidth((double) currentBandwidth / 10.0);
423      LOG.info("ReplicationSource : " + peerId
424          + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
425    }
426  }
427
428  private long getCurrentBandwidth() {
429    long peerBandwidth = replicationPeer.getPeerBandwidth();
430    // user can set peer bandwidth to 0 to use default bandwidth
431    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
432  }
433
434  /**
435   * Do the sleeping logic
436   * @param msg Why we sleep
437   * @param sleepMultiplier by how many times the default sleeping time is augmented
438   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
439   */
440  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
441    try {
442      if (LOG.isTraceEnabled()) {
443        LOG.trace("{} {}, sleeping {} times {}",
444          logPeerId(), msg, sleepForRetries, sleepMultiplier);
445      }
446      Thread.sleep(this.sleepForRetries * sleepMultiplier);
447    } catch (InterruptedException e) {
448      if(LOG.isDebugEnabled()) {
449        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
450      }
451      Thread.currentThread().interrupt();
452    }
453    return sleepMultiplier < maxRetriesMultiplier;
454  }
455
456  /**
457   * check whether the peer is enabled or not
458   * @return true if the peer is enabled, otherwise false
459   */
460  @Override
461  public boolean isPeerEnabled() {
462    return replicationPeer.isPeerEnabled();
463  }
464
465  private void initialize() {
466    int sleepMultiplier = 1;
467    while (this.isSourceActive()) {
468      ReplicationEndpoint replicationEndpoint;
469      try {
470        replicationEndpoint = createReplicationEndpoint();
471      } catch (Exception e) {
472        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
473        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
474          sleepMultiplier++;
475        }
476        continue;
477      }
478
479      try {
480        initAndStartReplicationEndpoint(replicationEndpoint);
481        this.replicationEndpoint = replicationEndpoint;
482        break;
483      } catch (Exception e) {
484        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
485        replicationEndpoint.stop();
486        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
487          sleepMultiplier++;
488        }
489      }
490    }
491
492    if (!this.isSourceActive()) {
493      return;
494    }
495
496    sleepMultiplier = 1;
497    UUID peerClusterId;
498    // delay this until we are in an asynchronous thread
499    for (;;) {
500      peerClusterId = replicationEndpoint.getPeerUUID();
501      if (this.isSourceActive() && peerClusterId == null) {
502        if(LOG.isDebugEnabled()) {
503          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
504            (this.sleepForRetries * sleepMultiplier));
505        }
506        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
507          sleepMultiplier++;
508        }
509      } else {
510        break;
511      }
512    }
513
514    if (!this.isSourceActive()) {
515      return;
516    }
517
518    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
519    // peerClusterId value, which is the same as the source clusterId
520    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
521      this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
522          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
523          + replicationEndpoint.getClass().getName(), null, false);
524      this.manager.removeSource(this);
525      return;
526    }
527    LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
528      logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
529
530    initializeWALEntryFilter(peerClusterId);
531    // start workers
532    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
533      String walGroupId = entry.getKey();
534      PriorityBlockingQueue<Path> queue = entry.getValue();
535      tryStartNewShipper(walGroupId, queue);
536    }
537  }
538
539  @Override
540  public void startup() {
541    // mark we are running now
542    this.sourceRunning = true;
543    initThread = new Thread(this::initialize);
544    Threads.setDaemonThreadRunning(initThread,
545      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
546      this::uncaughtException);
547  }
548
549  @Override
550  public void terminate(String reason) {
551    terminate(reason, null);
552  }
553
554  @Override
555  public void terminate(String reason, Exception cause) {
556    terminate(reason, cause, true);
557  }
558
559  @Override
560  public void terminate(String reason, Exception cause, boolean clearMetrics) {
561    terminate(reason, cause, clearMetrics, true);
562  }
563
564  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
565    if (cause == null) {
566      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
567    } else {
568      LOG.error("{} Closing source {} because an error occurred: {}",
569        logPeerId(), this.queueId, reason, cause);
570    }
571    this.sourceRunning = false;
572    if (initThread != null && Thread.currentThread() != initThread) {
573      // This usually won't happen but anyway, let's wait until the initialization thread exits.
574      // And notice that we may call terminate directly from the initThread so here we need to
575      // avoid join on ourselves.
576      initThread.interrupt();
577      Threads.shutdown(initThread, this.sleepForRetries);
578    }
579    Collection<ReplicationSourceShipper> workers = workerThreads.values();
580    for (ReplicationSourceShipper worker : workers) {
581      worker.stopWorker();
582      worker.entryReader.setReaderRunning(false);
583    }
584
585    for (ReplicationSourceShipper worker : workers) {
586      if (worker.isAlive() || worker.entryReader.isAlive()) {
587        try {
588          // Wait worker to stop
589          Thread.sleep(this.sleepForRetries);
590        } catch (InterruptedException e) {
591          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
592          Thread.currentThread().interrupt();
593        }
594        // If worker still is alive after waiting, interrupt it
595        if (worker.isAlive()) {
596          worker.interrupt();
597        }
598        // If entry reader is alive after waiting, interrupt it
599        if (worker.entryReader.isAlive()) {
600          worker.entryReader.interrupt();
601        }
602      }
603    }
604
605    if (this.replicationEndpoint != null) {
606      this.replicationEndpoint.stop();
607    }
608    if (join) {
609      for (ReplicationSourceShipper worker : workers) {
610        Threads.shutdown(worker, this.sleepForRetries);
611        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
612      }
613      if (this.replicationEndpoint != null) {
614        try {
615          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
616            TimeUnit.MILLISECONDS);
617        } catch (TimeoutException te) {
618          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
619            + "for replication source : {}", logPeerId(), this.queueId, te);
620        }
621      }
622    }
623    if (clearMetrics) {
624      this.metrics.clear();
625    }
626  }
627
628  @Override
629  public String getQueueId() {
630    return this.queueId;
631  }
632
633  @Override
634  public String getPeerId() {
635    return this.peerId;
636  }
637
638  @Override
639  public Path getCurrentPath() {
640    // only for testing
641    for (ReplicationSourceShipper worker : workerThreads.values()) {
642      if (worker.getCurrentPath() != null) {
643        return worker.getCurrentPath();
644      }
645    }
646    return null;
647  }
648
649  @Override
650  public boolean isSourceActive() {
651    return !this.server.isStopped() && this.sourceRunning;
652  }
653
654  /**
655   * Comparator used to compare logs together based on their start time
656   */
657  public static class LogsComparator implements Comparator<Path> {
658
659    @Override
660    public int compare(Path o1, Path o2) {
661      return Long.compare(getTS(o1), getTS(o2));
662    }
663
664    /**
665     * Split a path to get the start time
666     * For example: 10.20.20.171%3A60020.1277499063250
667     * @param p path to split
668     * @return start time
669     */
670    private static long getTS(Path p) {
671      int tsIndex = p.getName().lastIndexOf('.') + 1;
672      return Long.parseLong(p.getName().substring(tsIndex));
673    }
674  }
675
676  @Override
677  public String getStats() {
678    StringBuilder sb = new StringBuilder();
679    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
680        .append(", current progress: \n");
681    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
682      String walGroupId = entry.getKey();
683      ReplicationSourceShipper worker = entry.getValue();
684      long position = worker.getCurrentPosition();
685      Path currentPath = worker.getCurrentPath();
686      sb.append("walGroup [").append(walGroupId).append("]: ");
687      if (currentPath != null) {
688        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
689            .append(position).append("\n");
690      } else {
691        sb.append("no replication ongoing, waiting for new log");
692      }
693    }
694    return sb.toString();
695  }
696
697  @Override
698  public MetricsSource getSourceMetrics() {
699    return this.metrics;
700  }
701
702  @Override
703  //offsets totalBufferUsed by deducting shipped batchSize.
704  public void postShipEdits(List<Entry> entries, int batchSize) {
705    if (throttler.isEnabled()) {
706      throttler.addPushSize(batchSize);
707    }
708    totalReplicatedEdits.addAndGet(entries.size());
709    totalBufferUsed.addAndGet(-batchSize);
710  }
711
712  @Override
713  public WALFileLengthProvider getWALFileLengthProvider() {
714    return walFileLengthProvider;
715  }
716
717  @Override
718  public ServerName getServerWALsBelongTo() {
719    return server.getServerName();
720  }
721
722  Server getServer() {
723    return server;
724  }
725
726  ReplicationQueueStorage getQueueStorage() {
727    return queueStorage;
728  }
729
730  private String logPeerId(){
731    return "[Source for peer " + this.getPeerId() + "]:";
732  }
733}