001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.UUID;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.PriorityBlockingQueue;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.commons.lang3.StringUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.Server;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.regionserver.RSRpcServices;
045import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
046import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
047import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
048import org.apache.hadoop.hbase.replication.ReplicationException;
049import org.apache.hadoop.hbase.replication.ReplicationPeer;
050import org.apache.hadoop.hbase.replication.ReplicationPeers;
051import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
052import org.apache.hadoop.hbase.replication.ReplicationQueues;
053import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
054import org.apache.hadoop.hbase.replication.WALEntryFilter;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.hbase.util.Threads;
058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
059import org.apache.hadoop.hbase.wal.WAL.Entry;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065
066/**
067 * Class that handles the source of a replication stream.
068 * Currently does not handle more than 1 slave
069 * For each slave cluster it selects a random number of peers
070 * using a replication ratio. For example, if replication ration = 0.1
071 * and slave cluster has 100 region servers, 10 will be selected.
072 * <p>
073 * A stream is considered down when we cannot contact a region server on the
074 * peer cluster for more than 55 seconds by default.
075 * </p>
076 */
077@InterfaceAudience.Private
078public class ReplicationSource extends Thread implements ReplicationSourceInterface {
079
080  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
081  // Queues of logs to process, entry in format of walGroupId->queue,
082  // each presents a queue for one wal group
083  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
084  // per group queue size, keep no more than this number of logs in each wal group
085  protected int queueSizePerGroup;
086  protected ReplicationQueues replicationQueues;
087  private ReplicationPeers replicationPeers;
088
089  protected Configuration conf;
090  protected ReplicationQueueInfo replicationQueueInfo;
091  // id of the peer cluster this source replicates to
092  private String peerId;
093
094  // The manager of all sources to which we ping back our progress
095  protected ReplicationSourceManager manager;
096  // Should we stop everything?
097  protected Server server;
098  // How long should we sleep for each retry
099  private long sleepForRetries;
100  protected FileSystem fs;
101  // id of this cluster
102  private UUID clusterId;
103  // id of the other cluster
104  private UUID peerClusterId;
105  // total number of edits we replicated
106  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
107  // The znode we currently play with
108  protected String peerClusterZnode;
109  // Maximum number of retries before taking bold actions
110  private int maxRetriesMultiplier;
111  // Indicates if this particular source is running
112  private volatile boolean sourceRunning = false;
113  // Metrics for this source
114  private MetricsSource metrics;
115  //WARN threshold for the number of queued logs, defaults to 2
116  private int logQueueWarnThreshold;
117  // ReplicationEndpoint which will handle the actual replication
118  private ReplicationEndpoint replicationEndpoint;
119  // A filter (or a chain of filters) for the WAL entries.
120  protected WALEntryFilter walEntryFilter;
121  // throttler
122  private ReplicationThrottler throttler;
123  private long defaultBandwidth;
124  private long currentBandwidth;
125  private WALFileLengthProvider walFileLengthProvider;
126  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
127      new ConcurrentHashMap<>();
128
129  private AtomicLong totalBufferUsed;
130
131  public static final String WAIT_ON_ENDPOINT_SECONDS =
132    "hbase.replication.wait.on.endpoint.seconds";
133  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
134  private int waitOnEndpointSeconds = -1;
135
136  /**
137   * Instantiation method used by region servers
138   *
139   * @param conf configuration to use
140   * @param fs file system to use
141   * @param manager replication manager to ping to
142   * @param server the server for this region server
143   * @param peerClusterZnode the name of our znode
144   * @param clusterId unique UUID for the cluster
145   * @param replicationEndpoint the replication endpoint implementation
146   * @param metrics metrics for replication source
147   * @throws IOException
148   */
149  @Override
150  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
151      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
152      String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
153      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
154    this.server = server;
155    this.conf = HBaseConfiguration.create(conf);
156    this.waitOnEndpointSeconds =
157      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
158    decorateConf();
159    this.sleepForRetries =
160        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
161    this.maxRetriesMultiplier =
162        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
163    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
164    this.replicationQueues = replicationQueues;
165    this.replicationPeers = replicationPeers;
166    this.manager = manager;
167    this.fs = fs;
168    this.metrics = metrics;
169    this.clusterId = clusterId;
170
171    this.peerClusterZnode = peerClusterZnode;
172    this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
173    // ReplicationQueueInfo parses the peerId out of the znode for us
174    this.peerId = this.replicationQueueInfo.getPeerId();
175    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
176    this.replicationEndpoint = replicationEndpoint;
177
178    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
179    currentBandwidth = getCurrentBandwidth();
180    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
181    this.totalBufferUsed = manager.getTotalBufferUsed();
182    this.walFileLengthProvider = walFileLengthProvider;
183    LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
184        + ", currentBandwidth=" + this.currentBandwidth);
185  }
186
187  private void decorateConf() {
188    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
189    if (StringUtils.isNotEmpty(replicationCodec)) {
190      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
191    }
192  }
193
194  @Override
195  public void enqueueLog(Path log) {
196    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
197    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
198    if (queue == null) {
199      queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
200      queues.put(logPrefix, queue);
201      if (this.sourceRunning) {
202        // new wal group observed after source startup, start a new worker thread to track it
203        // notice: it's possible that log enqueued when this.running is set but worker thread
204        // still not launched, so it's necessary to check workerThreads before start the worker
205        tryStartNewShipper(logPrefix, queue);
206      }
207    }
208    queue.put(log);
209    this.metrics.incrSizeOfLogQueue();
210    // This will log a warning for each new log that gets created above the warn threshold
211    int queueSize = queue.size();
212    if (queueSize > this.logQueueWarnThreshold) {
213      LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
214          + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
215    }
216  }
217
218  @Override
219  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
220      throws ReplicationException {
221    String peerId = peerClusterZnode;
222    if (peerId.contains("-")) {
223      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
224      // A peerId will not have "-" in its name, see HBASE-11394
225      peerId = peerClusterZnode.split("-")[0];
226    }
227    Map<TableName, List<String>> tableCFMap =
228        replicationPeers.getConnectedPeer(peerId).getTableCFs();
229    if (tableCFMap != null) {
230      List<String> tableCfs = tableCFMap.get(tableName);
231      if (tableCFMap.containsKey(tableName)
232          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
233        this.replicationQueues.addHFileRefs(peerId, pairs);
234        metrics.incrSizeOfHFileRefsQueue(pairs.size());
235      } else {
236        LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
237            + Bytes.toString(family) + " to peer id " + peerId);
238      }
239    } else {
240      // user has explicitly not defined any table cfs for replication, means replicate all the
241      // data
242      this.replicationQueues.addHFileRefs(peerId, pairs);
243      metrics.incrSizeOfHFileRefsQueue(pairs.size());
244    }
245  }
246
247  @Override
248  public void run() {
249    // mark we are running now
250    this.sourceRunning = true;
251    try {
252      // start the endpoint, connect to the cluster
253      this.replicationEndpoint.start();
254      this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
255    } catch (Exception ex) {
256      LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
257      uninitialize();
258      throw new RuntimeException(ex);
259    }
260
261    int sleepMultiplier = 1;
262    // delay this until we are in an asynchronous thread
263    while (this.isSourceActive() && this.peerClusterId == null) {
264      this.peerClusterId = replicationEndpoint.getPeerUUID();
265      if (this.isSourceActive() && this.peerClusterId == null) {
266        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
267          sleepMultiplier++;
268        }
269      }
270    }
271
272    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
273    // peerClusterId value, which is the same as the source clusterId
274    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
275      this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
276          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
277          + replicationEndpoint.getClass().getName(), null, false);
278      this.manager.closeQueue(this);
279      return;
280    }
281    LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
282
283    initializeWALEntryFilter();
284    // start workers
285    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
286      String walGroupId = entry.getKey();
287      PriorityBlockingQueue<Path> queue = entry.getValue();
288      tryStartNewShipper(walGroupId, queue);
289    }
290  }
291
292  private void initializeWALEntryFilter() {
293    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
294    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
295      (WALEntryFilter)new SystemTableWALEntryFilter());
296    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
297    if (filterFromEndpoint != null) {
298      filters.add(filterFromEndpoint);
299    }
300    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
301    this.walEntryFilter = new ChainWALEntryFilter(filters);
302  }
303
304  protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
305    final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
306        walGroupId, queue, this);
307    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
308    if (extant != null) {
309      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
310    } else {
311      LOG.debug("Starting up worker for wal group " + walGroupId);
312      worker.startup(getUncaughtExceptionHandler());
313      worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
314        worker.getStartPosition()));
315      workerThreads.put(walGroupId, worker);
316    }
317  }
318
319  protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
320      PriorityBlockingQueue<Path> queue, long startPosition) {
321    ReplicationSourceWALReader walReader =
322        new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
323    return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
324      threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
325      getUncaughtExceptionHandler());
326  }
327
328  public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
329    return new Thread.UncaughtExceptionHandler() {
330      @Override
331      public void uncaughtException(final Thread t, final Throwable e) {
332        RSRpcServices.exitIfOOME(e);
333        LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
334        server.stop("Unexpected exception in " + t.getName());
335      }
336    };
337  }
338
339  @Override
340  public ReplicationEndpoint getReplicationEndpoint() {
341    return this.replicationEndpoint;
342  }
343
344  @Override
345  public ReplicationSourceManager getSourceManager() {
346    return this.manager;
347  }
348
349  @Override
350  public void tryThrottle(int batchSize) throws InterruptedException {
351    checkBandwidthChangeAndResetThrottler();
352    if (throttler.isEnabled()) {
353      long sleepTicks = throttler.getNextSleepInterval(batchSize);
354      if (sleepTicks > 0) {
355        if (LOG.isTraceEnabled()) {
356          LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
357        }
358        Thread.sleep(sleepTicks);
359        // reset throttler's cycle start tick when sleep for throttling occurs
360        throttler.resetStartTick();
361      }
362    }
363  }
364
365  private void checkBandwidthChangeAndResetThrottler() {
366    long peerBandwidth = getCurrentBandwidth();
367    if (peerBandwidth != currentBandwidth) {
368      currentBandwidth = peerBandwidth;
369      throttler.setBandwidth((double) currentBandwidth / 10.0);
370      LOG.info("ReplicationSource : " + peerId
371          + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
372    }
373  }
374
375  private long getCurrentBandwidth() {
376    ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
377    long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
378    // user can set peer bandwidth to 0 to use default bandwidth
379    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
380  }
381
382  private void uninitialize() {
383    LOG.debug("Source exiting " + this.peerId);
384    metrics.clear();
385    if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) {
386      this.replicationEndpoint.stop();
387      try {
388        this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
389      } catch (TimeoutException e) {
390        LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds.");
391      }
392    }
393  }
394
395  /**
396   * Do the sleeping logic
397   * @param msg Why we sleep
398   * @param sleepMultiplier by how many times the default sleeping time is augmented
399   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
400   */
401  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
402    try {
403      if (LOG.isTraceEnabled()) {
404        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
405      }
406      Thread.sleep(this.sleepForRetries * sleepMultiplier);
407    } catch (InterruptedException e) {
408      LOG.debug("Interrupted while sleeping between retries");
409      Thread.currentThread().interrupt();
410    }
411    return sleepMultiplier < maxRetriesMultiplier;
412  }
413
414  /**
415   * check whether the peer is enabled or not
416   *
417   * @return true if the peer is enabled, otherwise false
418   */
419  @Override
420  public boolean isPeerEnabled() {
421    return this.replicationPeers.getStatusOfPeer(this.peerId);
422  }
423
424  @Override
425  public void startup() {
426    String n = Thread.currentThread().getName();
427    Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
428      @Override
429      public void uncaughtException(final Thread t, final Throwable e) {
430        LOG.error("Unexpected exception in ReplicationSource", e);
431      }
432    };
433    Threads
434        .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
435  }
436
437  @Override
438  public void terminate(String reason) {
439    terminate(reason, null);
440  }
441
442  @Override
443  public void terminate(String reason, Exception cause) {
444    terminate(reason, cause, true);
445  }
446
447  public void terminate(String reason, Exception cause, boolean join) {
448    if (cause == null) {
449      LOG.info("Closing source "
450          + this.peerClusterZnode + " because: " + reason);
451
452    } else {
453      LOG.error("Closing source " + this.peerClusterZnode
454          + " because an error occurred: " + reason, cause);
455    }
456    this.sourceRunning = false;
457    Collection<ReplicationSourceShipper> workers = workerThreads.values();
458    for (ReplicationSourceShipper worker : workers) {
459      worker.stopWorker();
460      worker.entryReader.interrupt();
461      worker.interrupt();
462    }
463    if (this.replicationEndpoint != null) {
464      this.replicationEndpoint.stop();
465    }
466    if (join) {
467      for (ReplicationSourceShipper worker : workers) {
468        Threads.shutdown(worker, this.sleepForRetries);
469        LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
470      }
471      if (this.replicationEndpoint != null) {
472        try {
473          this.replicationEndpoint
474              .awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
475        } catch (TimeoutException te) {
476          LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
477              + this.peerClusterZnode,
478            te);
479        }
480      }
481    }
482  }
483
484  @Override
485  public String getPeerClusterZnode() {
486    return this.peerClusterZnode;
487  }
488
489  @Override
490  public String getPeerId() {
491    return this.peerId;
492  }
493
494  @Override
495  public Path getCurrentPath() {
496    // only for testing
497    for (ReplicationSourceShipper worker : workerThreads.values()) {
498      if (worker.getCurrentPath() != null) {
499        return worker.getCurrentPath();
500      }
501    }
502    return null;
503  }
504
505  @Override
506  public boolean isSourceActive() {
507    return !this.server.isStopped() && this.sourceRunning;
508  }
509
510  /**
511   * Comparator used to compare logs together based on their start time
512   */
513  public static class LogsComparator implements Comparator<Path> {
514
515    @Override
516    public int compare(Path o1, Path o2) {
517      return Long.compare(getTS(o1), getTS(o2));
518    }
519
520    /**
521     * Split a path to get the start time
522     * For example: 10.20.20.171%3A60020.1277499063250
523     * @param p path to split
524     * @return start time
525     */
526    private static long getTS(Path p) {
527      int tsIndex = p.getName().lastIndexOf('.') + 1;
528      return Long.parseLong(p.getName().substring(tsIndex));
529    }
530  }
531
532  @Override
533  public String getStats() {
534    StringBuilder sb = new StringBuilder();
535    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
536        .append(", current progress: \n");
537    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
538      String walGroupId = entry.getKey();
539      ReplicationSourceShipper worker = entry.getValue();
540      long position = worker.getCurrentPosition();
541      Path currentPath = worker.getCurrentPath();
542      sb.append("walGroup [").append(walGroupId).append("]: ");
543      if (currentPath != null) {
544        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
545            .append(position).append("\n");
546      } else {
547        sb.append("no replication ongoing, waiting for new log");
548      }
549    }
550    return sb.toString();
551  }
552
553  @Override
554  public MetricsSource getSourceMetrics() {
555    return this.metrics;
556  }
557
558  @Override
559  public void postShipEdits(List<Entry> entries, int batchSize) {
560    if (throttler.isEnabled()) {
561      throttler.addPushSize(batchSize);
562    }
563    totalReplicatedEdits.addAndGet(entries.size());
564    totalBufferUsed.addAndGet(-batchSize);
565  }
566
567  @Override
568  public WALFileLengthProvider getWALFileLengthProvider() {
569    return walFileLengthProvider;
570  }
571
572  @Override
573  public ServerName getServerWALsBelongTo() {
574    return server.getServerName();
575  }
576}