002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.UUID;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.PriorityBlockingQueue;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import java.util.concurrent.atomic.AtomicLong;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.Server;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.regionserver.RSRpcServices;
045import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
046import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
047import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
048import org.apache.hadoop.hbase.replication.ReplicationException;
049import org.apache.hadoop.hbase.replication.ReplicationPeer;
050import org.apache.hadoop.hbase.replication.ReplicationPeers;
051import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
052import org.apache.hadoop.hbase.replication.ReplicationQueues;
053import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
054import org.apache.hadoop.hbase.replication.WALEntryFilter;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.hbase.util.Threads;
058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
059import org.apache.hadoop.hbase.wal.WAL.Entry;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
067 * Class that handles the source of a replication stream.
068 * Currently does not handle more than 1 slave
069 * For each slave cluster it selects a random number of peers
070 * using a replication ratio. For example, if replication ration = 0.1
071 * and slave cluster has 100 region servers, 10 will be selected.
072 * <p>
073 * A stream is considered down when we cannot contact a region server on the
074 * peer cluster for more than 55 seconds by default.
075 * </p>
076 */
078public class ReplicationSource extends Thread implements ReplicationSourceInterface {
080  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
081  // Queues of logs to process, entry in format of walGroupId->queue,
082  // each presents a queue for one wal group
083  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
084  // per group queue size, keep no more than this number of logs in each wal group
085  protected int queueSizePerGroup;
086  protected ReplicationQueues replicationQueues;
087  private ReplicationPeers replicationPeers;
089  protected Configuration conf;
090  protected ReplicationQueueInfo replicationQueueInfo;
091  // id of the peer cluster this source replicates to
092  private String peerId;
094  // The manager of all sources to which we ping back our progress
095  protected ReplicationSourceManager manager;
096  // Should we stop everything?
097  protected Server server;
098  // How long should we sleep for each retry
099  private long sleepForRetries;
100  protected FileSystem fs;
101  // id of this cluster
102  private UUID clusterId;
103  // id of the other cluster
104  private UUID peerClusterId;
105  // total number of edits we replicated
106  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
107  // The znode we currently play with
108  protected String peerClusterZnode;
109  // Maximum number of retries before taking bold actions
110  private int maxRetriesMultiplier;
111  // Indicates if this particular source is running
112  private volatile boolean sourceRunning = false;
113  // Metrics for this source
114  private MetricsSource metrics;
115  //WARN threshold for the number of queued logs, defaults to 2
116  private int logQueueWarnThreshold;
117  // ReplicationEndpoint which will handle the actual replication
118  private ReplicationEndpoint replicationEndpoint;
119  // A filter (or a chain of filters) for the WAL entries.
120  protected WALEntryFilter walEntryFilter;
121  // throttler
122  private ReplicationThrottler throttler;
123  private long defaultBandwidth;
124  private long currentBandwidth;
125  private WALFileLengthProvider walFileLengthProvider;
126  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
127      new ConcurrentHashMap<>();
129  private AtomicLong totalBufferUsed;
131  public static final String WAIT_ON_ENDPOINT_SECONDS =
132    "hbase.replication.wait.on.endpoint.seconds";
133  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
134  private int waitOnEndpointSeconds = -1;
136  /**
137   * Instantiation method used by region servers
138   *
139   * @param conf configuration to use
140   * @param fs file system to use
141   * @param manager replication manager to ping to
142   * @param server the server for this region server
143   * @param peerClusterZnode the name of our znode
144   * @param clusterId unique UUID for the cluster
145   * @param replicationEndpoint the replication endpoint implementation
146   * @param metrics metrics for replication source
147   * @throws IOException
148   */
149  @Override
150  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
151      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
152      String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
153      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
154    this.server = server;
155    this.conf = HBaseConfiguration.create(conf);
156    this.waitOnEndpointSeconds =
158    decorateConf();
159    this.sleepForRetries =
160        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
161    this.maxRetriesMultiplier =
162        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
163    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
164    this.replicationQueues = replicationQueues;
165    this.replicationPeers = replicationPeers;
166    this.manager = manager;
167    this.fs = fs;
168    this.metrics = metrics;
169    this.clusterId = clusterId;
171    this.peerClusterZnode = peerClusterZnode;
172    this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
173    // ReplicationQueueInfo parses the peerId out of the znode for us
174    this.peerId = this.replicationQueueInfo.getPeerId();
175    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
176    this.replicationEndpoint = replicationEndpoint;
178    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
179    currentBandwidth = getCurrentBandwidth();
180    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
181    this.totalBufferUsed = manager.getTotalBufferUsed();
182    this.walFileLengthProvider = walFileLengthProvider;
183    LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
184        + ", currentBandwidth=" + this.currentBandwidth);
185  }
187  private void decorateConf() {
188    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
189    if (StringUtils.isNotEmpty(replicationCodec)) {
190      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
191    }
192  }
194  @Override
195  public void enqueueLog(Path log) {
196    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
197    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
198    if (queue == null) {
199      queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
200      queues.put(logPrefix, queue);
201      if (this.sourceRunning) {
202        // new wal group observed after source startup, start a new worker thread to track it
203        // notice: it's possible that log enqueued when this.running is set but worker thread
204        // still not launched, so it's necessary to check workerThreads before start the worker
205        tryStartNewShipper(logPrefix, queue);
206      }
207    }
208    queue.put(log);
209    this.metrics.incrSizeOfLogQueue();
210    // This will log a warning for each new log that gets created above the warn threshold
211    int queueSize = queue.size();
212    if (queueSize > this.logQueueWarnThreshold) {
213      LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
214          + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
215    }
216  }
218  @Override
219  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
220      throws ReplicationException {
221    String peerId = peerClusterZnode;
222    if (peerId.contains("-")) {
223      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
224      // A peerId will not have "-" in its name, see HBASE-11394
225      peerId = peerClusterZnode.split("-")[0];
226    }
227    Map<TableName, List<String>> tableCFMap =
228        replicationPeers.getConnectedPeer(peerId).getTableCFs();
229    if (tableCFMap != null) {
230      List<String> tableCfs = tableCFMap.get(tableName);
231      if (tableCFMap.containsKey(tableName)
232          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
233        this.replicationQueues.addHFileRefs(peerId, pairs);
234        metrics.incrSizeOfHFileRefsQueue(pairs.size());
235      } else {
236        LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
237            + Bytes.toString(family) + " to peer id " + peerId);
238      }
239    } else {
240      // user has explicitly not defined any table cfs for replication, means replicate all the
241      // data
242      this.replicationQueues.addHFileRefs(peerId, pairs);
243      metrics.incrSizeOfHFileRefsQueue(pairs.size());
244    }
245  }
247  @Override
248  public void run() {
249    // mark we are running now
250    this.sourceRunning = true;
251    try {
252      // start the endpoint, connect to the cluster
253      this.replicationEndpoint.start();
254      this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
255    } catch (Exception ex) {
256      LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
257      uninitialize();
258      throw new RuntimeException(ex);
259    }
261    int sleepMultiplier = 1;
262    // delay this until we are in an asynchronous thread
263    while (this.isSourceActive() && this.peerClusterId == null) {
264      this.peerClusterId = replicationEndpoint.getPeerUUID();
265      if (this.isSourceActive() && this.peerClusterId == null) {
266        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
267          sleepMultiplier++;
268        }
269      }
270    }
272    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
273    // peerClusterId value, which is the same as the source clusterId
274    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
275      this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
276          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
277          + replicationEndpoint.getClass().getName(), null, false);
278      this.manager.closeQueue(this);
279      return;
280    }
281    LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
283    initializeWALEntryFilter();
284    // start workers
285    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
286      String walGroupId = entry.getKey();
287      PriorityBlockingQueue<Path> queue = entry.getValue();
288      tryStartNewShipper(walGroupId, queue);
289    }
290  }
292  private void initializeWALEntryFilter() {
293    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
294    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
295      (WALEntryFilter)new SystemTableWALEntryFilter());
296    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
297    if (filterFromEndpoint != null) {
298      filters.add(filterFromEndpoint);
299    }
300    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
301    this.walEntryFilter = new ChainWALEntryFilter(filters);
302  }
304  protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
305    final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
306        walGroupId, queue, this);
307    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
308    if (extant != null) {
309      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
310    } else {
311      LOG.debug("Starting up worker for wal group " + walGroupId);
312      worker.startup(getUncaughtExceptionHandler());
313      worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
314        worker.getStartPosition()));
315      workerThreads.put(walGroupId, worker);
316    }
317  }
319  protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
320      PriorityBlockingQueue<Path> queue, long startPosition) {
321    ReplicationSourceWALReader walReader =
322        new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
323    return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
324      threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
325      getUncaughtExceptionHandler());
326  }
328  public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
329    return new Thread.UncaughtExceptionHandler() {
330      @Override
331      public void uncaughtException(final Thread t, final Throwable e) {
332        RSRpcServices.exitIfOOME(e);
333        LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
334        server.stop("Unexpected exception in " + t.getName());
335      }
336    };
337  }
339  @Override
340  public ReplicationEndpoint getReplicationEndpoint() {
341    return this.replicationEndpoint;
342  }
344  @Override
345  public ReplicationSourceManager getSourceManager() {
346    return this.manager;
347  }
349  @Override
350  public void tryThrottle(int batchSize) throws InterruptedException {
351    checkBandwidthChangeAndResetThrottler();
352    if (throttler.isEnabled()) {
353      long sleepTicks = throttler.getNextSleepInterval(batchSize);
354      if (sleepTicks > 0) {
355        if (LOG.isTraceEnabled()) {
356          LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
357        }
358        Thread.sleep(sleepTicks);
359        // reset throttler's cycle start tick when sleep for throttling occurs
360        throttler.resetStartTick();
361      }
362    }
363  }
365  private void checkBandwidthChangeAndResetThrottler() {
366    long peerBandwidth = getCurrentBandwidth();
367    if (peerBandwidth != currentBandwidth) {
368      currentBandwidth = peerBandwidth;
369      throttler.setBandwidth((double) currentBandwidth / 10.0);
370      LOG.info("ReplicationSource : " + peerId
371          + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
372    }
373  }
375  private long getCurrentBandwidth() {
376    ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
377    long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
378    // user can set peer bandwidth to 0 to use default bandwidth
379    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
380  }
382  private void uninitialize() {
383    LOG.debug("Source exiting " + this.peerId);
384    metrics.clear();
385    if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) {
386      this.replicationEndpoint.stop();
387      try {
388        this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
389      } catch (TimeoutException e) {
390        LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds.");
391      }
392    }
393  }
395  /**
396   * Do the sleeping logic
397   * @param msg Why we sleep
398   * @param sleepMultiplier by how many times the default sleeping time is augmented
399   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
400   */
401  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
402    try {
403      if (LOG.isTraceEnabled()) {
404        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
405      }
406      Thread.sleep(this.sleepForRetries * sleepMultiplier);
407    } catch (InterruptedException e) {
408      LOG.debug("Interrupted while sleeping between retries");
409      Thread.currentThread().interrupt();
410    }
411    return sleepMultiplier < maxRetriesMultiplier;
412  }
414  /**
415   * check whether the peer is enabled or not
416   *
417   * @return true if the peer is enabled, otherwise false
418   */
419  @Override
420  public boolean isPeerEnabled() {
421    return this.replicationPeers.getStatusOfPeer(this.peerId);
422  }
424  @Override
425  public void startup() {
426    String n = Thread.currentThread().getName();
427    Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
428      @Override
429      public void uncaughtException(final Thread t, final Throwable e) {
430        LOG.error("Unexpected exception in ReplicationSource", e);
431      }
432    };
433    Threads
434        .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
435  }
437  @Override
438  public void terminate(String reason) {
439    terminate(reason, null);
440  }
442  @Override
443  public void terminate(String reason, Exception cause) {
444    terminate(reason, cause, true);
445  }
447  public void terminate(String reason, Exception cause, boolean join) {
448    if (cause == null) {
449      LOG.info("Closing source "
450          + this.peerClusterZnode + " because: " + reason);
452    } else {
453      LOG.error("Closing source " + this.peerClusterZnode
454          + " because an error occurred: " + reason, cause);
455    }
456    this.sourceRunning = false;
457    Collection<ReplicationSourceShipper> workers = workerThreads.values();
458    for (ReplicationSourceShipper worker : workers) {
459      worker.stopWorker();
460      worker.entryReader.interrupt();
461      worker.interrupt();
462    }
463    if (this.replicationEndpoint != null) {
464      this.replicationEndpoint.stop();
465    }
466    if (join) {
467      for (ReplicationSourceShipper worker : workers) {
468        Threads.shutdown(worker, this.sleepForRetries);
469        LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
470      }
471      if (this.replicationEndpoint != null) {
472        try {
473          this.replicationEndpoint
474              .awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
475        } catch (TimeoutException te) {
476          LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
477              + this.peerClusterZnode,
478            te);
479        }
480      }
481    }
482  }
484  @Override
485  public String getPeerClusterZnode() {
486    return this.peerClusterZnode;
487  }
489  @Override
490  public String getPeerId() {
491    return this.peerId;
492  }
494  @Override
495  public Path getCurrentPath() {
496    // only for testing
497    for (ReplicationSourceShipper worker : workerThreads.values()) {
498      if (worker.getCurrentPath() != null) {
499        return worker.getCurrentPath();
500      }
501    }
502    return null;
503  }
505  @Override
506  public boolean isSourceActive() {
507    return !this.server.isStopped() && this.sourceRunning;
508  }
510  /**
511   * Comparator used to compare logs together based on their start time
512   */
513  public static class LogsComparator implements Comparator<Path> {
515    @Override
516    public int compare(Path o1, Path o2) {
517      return Long.compare(getTS(o1), getTS(o2));
518    }
520    /**
521     * Split a path to get the start time
522     * For example:
523     * @param p path to split
524     * @return start time
525     */
526    private static long getTS(Path p) {
527      int tsIndex = p.getName().lastIndexOf('.') + 1;
528      return Long.parseLong(p.getName().substring(tsIndex));
529    }
530  }
532  @Override
533  public String getStats() {
534    StringBuilder sb = new StringBuilder();
535    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
536        .append(", current progress: \n");
537    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
538      String walGroupId = entry.getKey();
539      ReplicationSourceShipper worker = entry.getValue();
540      long position = worker.getCurrentPosition();
541      Path currentPath = worker.getCurrentPath();
542      sb.append("walGroup [").append(walGroupId).append("]: ");
543      if (currentPath != null) {
544        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
545            .append(position).append("\n");
546      } else {
547        sb.append("no replication ongoing, waiting for new log");
548      }
549    }
550    return sb.toString();
551  }
553  @Override
554  public MetricsSource getSourceMetrics() {
555    return this.metrics;
556  }
558  @Override
559  public void postShipEdits(List<Entry> entries, int batchSize) {
560    if (throttler.isEnabled()) {
561      throttler.addPushSize(batchSize);
562    }
563    totalReplicatedEdits.addAndGet(entries.size());
564    totalBufferUsed.addAndGet(-batchSize);
565  }
567  @Override
568  public WALFileLengthProvider getWALFileLengthProvider() {
569    return walFileLengthProvider;
570  }
572  @Override
573  public ServerName getServerWALsBelongTo() {
574    return server.getServerName();
575  }