001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.PriorityBlockingQueue;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.atomic.AtomicLong;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.Server;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableDescriptors;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RSRpcServices;
050import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
051import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
052import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
053import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
054import org.apache.hadoop.hbase.replication.ReplicationException;
055import org.apache.hadoop.hbase.replication.ReplicationPeer;
056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
058import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
059import org.apache.hadoop.hbase.replication.WALEntryFilter;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
064import org.apache.hadoop.hbase.wal.WAL.Entry;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
070import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
071
072/**
073 * Class that handles the source of a replication stream.
074 * Currently does not handle more than 1 slave
075 * For each slave cluster it selects a random number of peers
076 * using a replication ratio. For example, if replication ration = 0.1
077 * and slave cluster has 100 region servers, 10 will be selected.
078 * <p>
079 * A stream is considered down when we cannot contact a region server on the
080 * peer cluster for more than 55 seconds by default.
081 * </p>
082 */
083@InterfaceAudience.Private
084public class ReplicationSource implements ReplicationSourceInterface {
085
086  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
087  // Queues of logs to process, entry in format of walGroupId->queue,
088  // each presents a queue for one wal group
089  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
090  // per group queue size, keep no more than this number of logs in each wal group
091  protected int queueSizePerGroup;
092  protected ReplicationQueueStorage queueStorage;
093  protected ReplicationPeer replicationPeer;
094
095  protected Configuration conf;
096  protected ReplicationQueueInfo replicationQueueInfo;
097
098  // The manager of all sources to which we ping back our progress
099  protected ReplicationSourceManager manager;
100  // Should we stop everything?
101  protected Server server;
102  // How long should we sleep for each retry
103  private long sleepForRetries;
104  protected FileSystem fs;
105  // id of this cluster
106  private UUID clusterId;
107  // total number of edits we replicated
108  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
109  // The znode we currently play with
110  protected String queueId;
111  // Maximum number of retries before taking bold actions
112  private int maxRetriesMultiplier;
113  // Indicates if this particular source is running
114  private volatile boolean sourceRunning = false;
115  // Metrics for this source
116  private MetricsSource metrics;
117  // WARN threshold for the number of queued logs, defaults to 2
118  private int logQueueWarnThreshold;
119  // ReplicationEndpoint which will handle the actual replication
120  private volatile ReplicationEndpoint replicationEndpoint;
121  // A filter (or a chain of filters) for the WAL entries.
122  protected volatile WALEntryFilter walEntryFilter;
123  // throttler
124  private ReplicationThrottler throttler;
125  private long defaultBandwidth;
126  private long currentBandwidth;
127  private WALFileLengthProvider walFileLengthProvider;
128  @VisibleForTesting
129  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
130      new ConcurrentHashMap<>();
131
132  private AtomicLong totalBufferUsed;
133
134  public static final String WAIT_ON_ENDPOINT_SECONDS =
135    "hbase.replication.wait.on.endpoint.seconds";
136  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
137  private int waitOnEndpointSeconds = -1;
138
139  private Thread initThread;
140
141  /**
142   * Instantiation method used by region servers
143   * @param conf configuration to use
144   * @param fs file system to use
145   * @param manager replication manager to ping to
146   * @param server the server for this region server
147   * @param queueId the id of our replication queue
148   * @param clusterId unique UUID for the cluster
149   * @param metrics metrics for replication source
150   */
151  @Override
152  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
153      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
154      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
155      MetricsSource metrics) throws IOException {
156    this.server = server;
157    this.conf = HBaseConfiguration.create(conf);
158    this.waitOnEndpointSeconds =
159      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
160    decorateConf();
161    this.sleepForRetries =
162        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
163    this.maxRetriesMultiplier =
164        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
165    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
166    this.queueStorage = queueStorage;
167    this.replicationPeer = replicationPeer;
168    this.manager = manager;
169    this.fs = fs;
170    this.metrics = metrics;
171    this.clusterId = clusterId;
172
173    this.queueId = queueId;
174    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
175    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
176
177    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
178    currentBandwidth = getCurrentBandwidth();
179    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
180    this.totalBufferUsed = manager.getTotalBufferUsed();
181    this.walFileLengthProvider = walFileLengthProvider;
182    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
183      replicationPeer.getId(), this.currentBandwidth);
184  }
185
186  private void decorateConf() {
187    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
188    if (StringUtils.isNotEmpty(replicationCodec)) {
189      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
190    }
191  }
192
193  @Override
194  public void enqueueLog(Path log) {
195    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
196    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
197    if (queue == null) {
198      queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
199      // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
200      // the shipper may quit immediately
201      queue.put(log);
202      queues.put(logPrefix, queue);
203      if (this.isSourceActive() && this.walEntryFilter != null) {
204        // new wal group observed after source startup, start a new worker thread to track it
205        // notice: it's possible that log enqueued when this.running is set but worker thread
206        // still not launched, so it's necessary to check workerThreads before start the worker
207        tryStartNewShipper(logPrefix, queue);
208      }
209    } else {
210      queue.put(log);
211    }
212    if (LOG.isTraceEnabled()) {
213      LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
214        this.replicationQueueInfo.getQueueId());
215    }
216    this.metrics.incrSizeOfLogQueue();
217    // This will log a warning for each new log that gets created above the warn threshold
218    int queueSize = queue.size();
219    if (queueSize > this.logQueueWarnThreshold) {
220      LOG.warn("{} WAL group {} queue size: {} exceeds value of "
221        + "replication.source.log.queue.warn: {}", logPeerId(),
222        logPrefix, queueSize, logQueueWarnThreshold);
223    }
224  }
225
226  @Override
227  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
228      throws ReplicationException {
229    String peerId = replicationPeer.getId();
230    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
231    if (tableCFMap != null) {
232      List<String> tableCfs = tableCFMap.get(tableName);
233      if (tableCFMap.containsKey(tableName)
234          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
235        this.queueStorage.addHFileRefs(peerId, pairs);
236        metrics.incrSizeOfHFileRefsQueue(pairs.size());
237      } else {
238        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
239          tableName, Bytes.toString(family), peerId);
240      }
241    } else {
242      // user has explicitly not defined any table cfs for replication, means replicate all the
243      // data
244      this.queueStorage.addHFileRefs(peerId, pairs);
245      metrics.incrSizeOfHFileRefsQueue(pairs.size());
246    }
247  }
248
249  private ReplicationEndpoint createReplicationEndpoint()
250      throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
251    RegionServerCoprocessorHost rsServerHost = null;
252    if (server instanceof HRegionServer) {
253      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
254    }
255    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
256
257    ReplicationEndpoint replicationEndpoint;
258    if (replicationEndpointImpl == null) {
259      // Default to HBase inter-cluster replication endpoint; skip reflection
260      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
261    } else {
262      try {
263        replicationEndpoint = Class.forName(replicationEndpointImpl)
264            .asSubclass(ReplicationEndpoint.class)
265            .getDeclaredConstructor()
266            .newInstance();
267      } catch (NoSuchMethodException | InvocationTargetException e) {
268        throw new IllegalArgumentException(e);
269      }
270    }
271    if (rsServerHost != null) {
272      ReplicationEndpoint newReplicationEndPoint =
273        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
274      if (newReplicationEndPoint != null) {
275        // Override the newly created endpoint from the hook with configured end point
276        replicationEndpoint = newReplicationEndPoint;
277      }
278    }
279    return replicationEndpoint;
280  }
281
282  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
283      throws IOException, TimeoutException {
284    TableDescriptors tableDescriptors = null;
285    if (server instanceof HRegionServer) {
286      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
287    }
288    replicationEndpoint
289      .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
290        replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
291    replicationEndpoint.start();
292    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
293  }
294
295  private void initializeWALEntryFilter(UUID peerClusterId) {
296    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
297    ArrayList<WALEntryFilter> filters =
298      Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
299    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
300    if (filterFromEndpoint != null) {
301      filters.add(filterFromEndpoint);
302    }
303    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
304    this.walEntryFilter = new ChainWALEntryFilter(filters);
305  }
306
307  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
308    workerThreads.compute(walGroupId, (key, value) -> {
309      if (value != null) {
310        if (LOG.isDebugEnabled()) {
311          LOG.debug(
312              "{} Someone has beat us to start a worker thread for wal group {}",
313              logPeerId(), key);
314        }
315        return value;
316      } else {
317        if (LOG.isDebugEnabled()) {
318          LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
319        }
320        ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
321        ReplicationSourceWALReader walReader =
322            createNewWALReader(walGroupId, queue, worker.getStartPosition());
323        Threads.setDaemonThreadRunning(
324            walReader, Thread.currentThread().getName()
325                + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
326            this::uncaughtException);
327        worker.setWALReader(walReader);
328        worker.startup(this::uncaughtException);
329        return worker;
330      }
331    });
332  }
333
334  @Override
335  public Map<String, ReplicationStatus> getWalGroupStatus() {
336    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
337    long ageOfLastShippedOp, replicationDelay, fileSize;
338    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
339      String walGroupId = walGroupShipper.getKey();
340      ReplicationSourceShipper shipper = walGroupShipper.getValue();
341      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
342      int queueSize = queues.get(walGroupId).size();
343      replicationDelay = metrics.getReplicationDelay();
344      Path currentPath = shipper.getCurrentPath();
345      fileSize = -1;
346      if (currentPath != null) {
347        try {
348          fileSize = getFileSize(currentPath);
349        } catch (IOException e) {
350          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
351        }
352      } else {
353        currentPath = new Path("NO_LOGS_IN_QUEUE");
354        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
355      }
356      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
357      statusBuilder.withPeerId(this.getPeerId())
358          .withQueueSize(queueSize)
359          .withWalGroup(walGroupId)
360          .withCurrentPath(currentPath)
361          .withCurrentPosition(shipper.getCurrentPosition())
362          .withFileSize(fileSize)
363          .withAgeOfLastShippedOp(ageOfLastShippedOp)
364          .withReplicationDelay(replicationDelay);
365      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
366    }
367    return sourceReplicationStatus;
368  }
369
370  private long getFileSize(Path currentPath) throws IOException {
371    long fileSize;
372    try {
373      fileSize = fs.getContentSummary(currentPath).getLength();
374    } catch (FileNotFoundException e) {
375      currentPath = getArchivedLogPath(currentPath, conf);
376      fileSize = fs.getContentSummary(currentPath).getLength();
377    }
378    return fileSize;
379  }
380
381  protected ReplicationSourceShipper createNewShipper(String walGroupId,
382      PriorityBlockingQueue<Path> queue) {
383    return new ReplicationSourceShipper(conf, walGroupId, queue, this);
384  }
385
386  private ReplicationSourceWALReader createNewWALReader(String walGroupId,
387      PriorityBlockingQueue<Path> queue, long startPosition) {
388    return replicationPeer.getPeerConfig().isSerial()
389      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
390      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
391  }
392
393  protected final void uncaughtException(Thread t, Throwable e) {
394    RSRpcServices.exitIfOOME(e);
395    LOG.error("Unexpected exception in {} currentPath={}",
396      t.getName(), getCurrentPath(), e);
397    server.abort("Unexpected exception in " + t.getName(), e);
398  }
399
400  @Override
401  public ReplicationEndpoint getReplicationEndpoint() {
402    return this.replicationEndpoint;
403  }
404
405  @Override
406  public ReplicationSourceManager getSourceManager() {
407    return this.manager;
408  }
409
410  @Override
411  public void tryThrottle(int batchSize) throws InterruptedException {
412    checkBandwidthChangeAndResetThrottler();
413    if (throttler.isEnabled()) {
414      long sleepTicks = throttler.getNextSleepInterval(batchSize);
415      if (sleepTicks > 0) {
416        if (LOG.isTraceEnabled()) {
417          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
418        }
419        Thread.sleep(sleepTicks);
420        // reset throttler's cycle start tick when sleep for throttling occurs
421        throttler.resetStartTick();
422      }
423    }
424  }
425
426  private void checkBandwidthChangeAndResetThrottler() {
427    long peerBandwidth = getCurrentBandwidth();
428    if (peerBandwidth != currentBandwidth) {
429      currentBandwidth = peerBandwidth;
430      throttler.setBandwidth((double) currentBandwidth / 10.0);
431      LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}",
432        replicationPeer.getId(), currentBandwidth);
433    }
434  }
435
436  private long getCurrentBandwidth() {
437    long peerBandwidth = replicationPeer.getPeerBandwidth();
438    // user can set peer bandwidth to 0 to use default bandwidth
439    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
440  }
441
442  /**
443   * Do the sleeping logic
444   * @param msg Why we sleep
445   * @param sleepMultiplier by how many times the default sleeping time is augmented
446   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
447   */
448  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
449    try {
450      if (LOG.isTraceEnabled()) {
451        LOG.trace("{} {}, sleeping {} times {}",
452          logPeerId(), msg, sleepForRetries, sleepMultiplier);
453      }
454      Thread.sleep(this.sleepForRetries * sleepMultiplier);
455    } catch (InterruptedException e) {
456      if(LOG.isDebugEnabled()) {
457        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
458      }
459      Thread.currentThread().interrupt();
460    }
461    return sleepMultiplier < maxRetriesMultiplier;
462  }
463
464  private void initialize() {
465    int sleepMultiplier = 1;
466    while (this.isSourceActive()) {
467      ReplicationEndpoint replicationEndpoint;
468      try {
469        replicationEndpoint = createReplicationEndpoint();
470      } catch (Exception e) {
471        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
472        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
473          sleepMultiplier++;
474        }
475        continue;
476      }
477
478      try {
479        initAndStartReplicationEndpoint(replicationEndpoint);
480        this.replicationEndpoint = replicationEndpoint;
481        break;
482      } catch (Exception e) {
483        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
484        replicationEndpoint.stop();
485        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
486          sleepMultiplier++;
487        }
488      }
489    }
490
491    if (!this.isSourceActive()) {
492      return;
493    }
494
495    sleepMultiplier = 1;
496    UUID peerClusterId;
497    // delay this until we are in an asynchronous thread
498    for (;;) {
499      peerClusterId = replicationEndpoint.getPeerUUID();
500      if (this.isSourceActive() && peerClusterId == null) {
501        if(LOG.isDebugEnabled()) {
502          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
503            (this.sleepForRetries * sleepMultiplier));
504        }
505        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
506          sleepMultiplier++;
507        }
508      } else {
509        break;
510      }
511    }
512
513    if(!this.isSourceActive()) {
514      return;
515    }
516
517    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
518    // peerClusterId value, which is the same as the source clusterId
519    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
520      this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
521          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
522          + replicationEndpoint.getClass().getName(), null, false);
523      this.manager.removeSource(this);
524      return;
525    }
526    LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
527      logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
528
529    initializeWALEntryFilter(peerClusterId);
530    // start workers
531    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
532      String walGroupId = entry.getKey();
533      PriorityBlockingQueue<Path> queue = entry.getValue();
534      tryStartNewShipper(walGroupId, queue);
535    }
536  }
537
538  @Override
539  public void startup() {
540    // mark we are running now
541    this.sourceRunning = true;
542    initThread = new Thread(this::initialize);
543    Threads.setDaemonThreadRunning(initThread,
544      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
545      this::uncaughtException);
546  }
547
548  @Override
549  public void terminate(String reason) {
550    terminate(reason, null);
551  }
552
553  @Override
554  public void terminate(String reason, Exception cause) {
555    terminate(reason, cause, true);
556  }
557
558  @Override
559  public void terminate(String reason, Exception cause, boolean clearMetrics) {
560    terminate(reason, cause, clearMetrics, true);
561  }
562
563  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
564    if (cause == null) {
565      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
566    } else {
567      LOG.error("{} Closing source {} because an error occurred: {}",
568        logPeerId(), this.queueId, reason, cause);
569    }
570    this.sourceRunning = false;
571    if (initThread != null && Thread.currentThread() != initThread) {
572      // This usually won't happen but anyway, let's wait until the initialization thread exits.
573      // And notice that we may call terminate directly from the initThread so here we need to
574      // avoid join on ourselves.
575      initThread.interrupt();
576      Threads.shutdown(initThread, this.sleepForRetries);
577    }
578    Collection<ReplicationSourceShipper> workers = workerThreads.values();
579    for (ReplicationSourceShipper worker : workers) {
580      worker.stopWorker();
581      if(worker.entryReader != null) {
582        worker.entryReader.setReaderRunning(false);
583      }
584    }
585
586    for (ReplicationSourceShipper worker : workers) {
587      if (worker.isAlive() || worker.entryReader.isAlive()) {
588        try {
589          // Wait worker to stop
590          Thread.sleep(this.sleepForRetries);
591        } catch (InterruptedException e) {
592          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
593          Thread.currentThread().interrupt();
594        }
595        // If worker still is alive after waiting, interrupt it
596        if (worker.isAlive()) {
597          worker.interrupt();
598        }
599        // If entry reader is alive after waiting, interrupt it
600        if (worker.entryReader.isAlive()) {
601          worker.entryReader.interrupt();
602        }
603      }
604    }
605
606    if (this.replicationEndpoint != null) {
607      this.replicationEndpoint.stop();
608    }
609    if (join) {
610      for (ReplicationSourceShipper worker : workers) {
611        Threads.shutdown(worker, this.sleepForRetries);
612        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
613      }
614      if (this.replicationEndpoint != null) {
615        try {
616          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
617            TimeUnit.MILLISECONDS);
618        } catch (TimeoutException te) {
619          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
620            + "for replication source : {}", logPeerId(), this.queueId, te);
621        }
622      }
623    }
624    if (clearMetrics) {
625      this.metrics.clear();
626    }
627  }
628
629  @Override
630  public String getQueueId() {
631    return this.queueId;
632  }
633
634  @Override
635  public Path getCurrentPath() {
636    // only for testing
637    for (ReplicationSourceShipper worker : workerThreads.values()) {
638      if (worker.getCurrentPath() != null) {
639        return worker.getCurrentPath();
640      }
641    }
642    return null;
643  }
644
645  @Override
646  public boolean isSourceActive() {
647    return !this.server.isStopped() && this.sourceRunning;
648  }
649
650  public UUID getPeerClusterUUID(){
651    return this.clusterId;
652  }
653
654  /**
655   * Comparator used to compare logs together based on their start time
656   */
657  public static class LogsComparator implements Comparator<Path> {
658
659    @Override
660    public int compare(Path o1, Path o2) {
661      return Long.compare(getTS(o1), getTS(o2));
662    }
663
664    /**
665     * <p>
666     * Split a path to get the start time
667     * </p>
668     * <p>
669     * For example: 10.20.20.171%3A60020.1277499063250
670     * </p>
671     * @param p path to split
672     * @return start time
673     */
674    private static long getTS(Path p) {
675      return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
676    }
677  }
678
679  public ReplicationQueueInfo getReplicationQueueInfo() {
680    return replicationQueueInfo;
681  }
682
683  public boolean isWorkerRunning(){
684    for(ReplicationSourceShipper worker : this.workerThreads.values()){
685      if(worker.isActive()){
686        return worker.isActive();
687      }
688    }
689    return false;
690  }
691
692  @Override
693  public String getStats() {
694    StringBuilder sb = new StringBuilder();
695    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
696        .append(", current progress: \n");
697    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
698      String walGroupId = entry.getKey();
699      ReplicationSourceShipper worker = entry.getValue();
700      long position = worker.getCurrentPosition();
701      Path currentPath = worker.getCurrentPath();
702      sb.append("walGroup [").append(walGroupId).append("]: ");
703      if (currentPath != null) {
704        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
705            .append(position).append("\n");
706      } else {
707        sb.append("no replication ongoing, waiting for new log");
708      }
709    }
710    return sb.toString();
711  }
712
713  @Override
714  public MetricsSource getSourceMetrics() {
715    return this.metrics;
716  }
717
718  @Override
719  //offsets totalBufferUsed by deducting shipped batchSize.
720  public void postShipEdits(List<Entry> entries, int batchSize) {
721    if (throttler.isEnabled()) {
722      throttler.addPushSize(batchSize);
723    }
724    totalReplicatedEdits.addAndGet(entries.size());
725    totalBufferUsed.addAndGet(-batchSize);
726  }
727
728  @Override
729  public WALFileLengthProvider getWALFileLengthProvider() {
730    return walFileLengthProvider;
731  }
732
733  @Override
734  public ServerName getServerWALsBelongTo() {
735    return server.getServerName();
736  }
737
738  @Override
739  public ReplicationPeer getPeer() {
740    return replicationPeer;
741  }
742
743  Server getServer() {
744    return server;
745  }
746
747  ReplicationQueueStorage getQueueStorage() {
748    return queueStorage;
749  }
750
751  void removeWorker(ReplicationSourceShipper worker) {
752    workerThreads.remove(worker.walGroupId, worker);
753  }
754
755  private String logPeerId(){
756    return "[Source for peer " + this.getPeer().getId() + "]:";
757  }
758}