001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.net.SocketTimeoutException;
024import java.net.UnknownHostException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.TreeMap;
030import java.util.concurrent.Callable;
031import java.util.concurrent.CompletionService;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorCompletionService;
034import java.util.concurrent.Future;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.regex.Matcher;
038import java.util.regex.Pattern;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041
042import org.apache.commons.lang3.StringUtils;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Abortable;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.TableNotFoundException;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.ipc.RpcServer;
054import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
055import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
056import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.wal.WAL.Entry;
061import org.apache.hadoop.ipc.RemoteException;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
068
069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
070
071/**
072 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
073 * implementation for replicating to another HBase cluster.
074 * For the slave cluster it selects a random number of peers
075 * using a replication ratio. For example, if replication ration = 0.1
076 * and slave cluster has 100 region servers, 10 will be selected.
077 * <p>
078 * A stream is considered down when we cannot contact a region server on the
079 * peer cluster for more than 55 seconds by default.
080 * </p>
081 */
082@InterfaceAudience.Private
083public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
084  private static final Logger LOG =
085      LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
086
087  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
088
089  private ClusterConnection conn;
090  private Configuration localConf;
091  private Configuration conf;
092  // How long should we sleep for each retry
093  private long sleepForRetries;
094  // Maximum number of retries before taking bold actions
095  private int maxRetriesMultiplier;
096  // Socket timeouts require even bolder actions since we don't want to DDOS
097  private int socketTimeoutMultiplier;
098  // Amount of time for shutdown to wait for all tasks to complete
099  private long maxTerminationWait;
100  // Size limit for replication RPCs, in bytes
101  private int replicationRpcLimit;
102  //Metrics for this source
103  private MetricsSource metrics;
104  // Handles connecting to peer region servers
105  private ReplicationSinkManager replicationSinkMgr;
106  private boolean peersSelected = false;
107  private String replicationClusterId = "";
108  private ThreadPoolExecutor exec;
109  private int maxThreads;
110  private Path baseNamespaceDir;
111  private Path hfileArchiveDir;
112  private boolean replicationBulkLoadDataEnabled;
113  private Abortable abortable;
114  private boolean dropOnDeletedTables;
115  private boolean isSerial = false;
116
117  @Override
118  public void init(Context context) throws IOException {
119    super.init(context);
120    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
121    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
122    decorateConf();
123    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
124    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
125        maxRetriesMultiplier);
126    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
127    // tasks to terminate when doStop() is called.
128    long maxTerminationWaitMultiplier = this.conf.getLong(
129        "replication.source.maxterminationmultiplier",
130        DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
131    this.maxTerminationWait = maxTerminationWaitMultiplier *
132        this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
133    // TODO: This connection is replication specific or we should make it particular to
134    // replication and make replication specific settings such as compression or codec to use
135    // passing Cells.
136    this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
137    this.sleepForRetries =
138        this.conf.getLong("replication.source.sleepforretries", 1000);
139    this.metrics = context.getMetrics();
140    // ReplicationQueueInfo parses the peerId out of the znode for us
141    this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
142    // per sink thread pool
143    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
144      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
145    this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
146        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
147    this.abortable = ctx.getAbortable();
148    // Set the size limit for replication RPCs to 95% of the max request size.
149    // We could do with less slop if we have an accurate estimate of encoded size. Being
150    // conservative for now.
151    this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE,
152      RpcServer.DEFAULT_MAX_REQUEST_SIZE));
153    this.dropOnDeletedTables =
154        this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
155
156    this.replicationBulkLoadDataEnabled =
157        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
158          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
159    if (this.replicationBulkLoadDataEnabled) {
160      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
161    }
162    // Construct base namespace directory and hfile archive directory path
163    Path rootDir = FSUtils.getRootDir(conf);
164    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
165    baseNamespaceDir = new Path(rootDir, baseNSDir);
166    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
167    isSerial = context.getPeerConfig().isSerial();
168  }
169
170  private void decorateConf() {
171    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
172    if (StringUtils.isNotEmpty(replicationCodec)) {
173      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
174    }
175  }
176
177  private void connectToPeers() {
178    getRegionServers();
179
180    int sleepMultiplier = 1;
181
182    // Connect to peer cluster first, unless we have to stop
183    while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
184      replicationSinkMgr.chooseSinks();
185      if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
186        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
187          sleepMultiplier++;
188        }
189      }
190    }
191  }
192
193  /**
194   * Do the sleeping logic
195   * @param msg Why we sleep
196   * @param sleepMultiplier by how many times the default sleeping time is augmented
197   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
198   */
199  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
200    try {
201      if (LOG.isTraceEnabled()) {
202        LOG.trace("{} {}, sleeping {} times {}",
203          logPeerId(), msg, sleepForRetries, sleepMultiplier);
204      }
205      Thread.sleep(this.sleepForRetries * sleepMultiplier);
206    } catch (InterruptedException e) {
207      if (LOG.isDebugEnabled()) {
208        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
209      }
210    }
211    return sleepMultiplier < maxRetriesMultiplier;
212  }
213
214  private int getEstimatedEntrySize(Entry e) {
215    long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
216    return (int) size;
217  }
218
219  private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
220    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
221    int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
222    List<List<Entry>> entryLists =
223        Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
224    int[] sizes = new int[n];
225    for (Entry e : entries) {
226      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
227      int entrySize = getEstimatedEntrySize(e);
228      // If this batch has at least one entry and is over sized, move it to the tail of list and
229      // initialize the entryLists[index] to be a empty list.
230      if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
231        entryLists.add(entryLists.get(index));
232        entryLists.set(index, new ArrayList<>());
233        sizes[index] = 0;
234      }
235      entryLists.get(index).add(e);
236      sizes[index] += entrySize;
237    }
238    return entryLists;
239  }
240
241  private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
242    Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
243    for (Entry e : entries) {
244      regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
245          .add(e);
246    }
247    return new ArrayList<>(regionEntries.values());
248  }
249
250  /**
251   * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
252   * concurrently. Note that, for serial replication, we need to make sure that entries from the
253   * same region to be replicated serially, so entries from the same region consist of a batch, and
254   * we will divide a batch into several batches by replicationRpcLimit in method
255   * serialReplicateRegionEntries()
256   */
257  private List<List<Entry>> createBatches(final List<Entry> entries) {
258    if (isSerial) {
259      return createSerialBatches(entries);
260    } else {
261      return createParallelBatches(entries);
262    }
263  }
264
265  private TableName parseTable(String msg) {
266    // ... TableNotFoundException: '<table>'/n...
267    Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'");
268    Matcher m = p.matcher(msg);
269    if (m.find()) {
270      String table = m.group(1);
271      try {
272        // double check that table is a valid table name
273        TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
274        return TableName.valueOf(table);
275      } catch (IllegalArgumentException ignore) {
276      }
277    }
278    return null;
279  }
280
281  // Filter a set of batches by TableName
282  private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
283    return oldEntryList
284        .stream().map(entries -> entries.stream()
285            .filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
286        .collect(Collectors.toList());
287  }
288
289  private void reconnectToPeerCluster() {
290    ClusterConnection connection = null;
291    try {
292      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
293    } catch (IOException ioe) {
294      LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe);
295    }
296    if (connection != null) {
297      this.conn = connection;
298    }
299  }
300
301  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
302      List<List<Entry>> batches) throws IOException {
303    int futures = 0;
304    for (int i = 0; i < batches.size(); i++) {
305      List<Entry> entries = batches.get(i);
306      if (!entries.isEmpty()) {
307        if (LOG.isTraceEnabled()) {
308          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
309            replicateContext.getSize());
310        }
311        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
312        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
313        futures++;
314      }
315    }
316
317    IOException iox = null;
318    long lastWriteTime = 0;
319    for (int i = 0; i < futures; i++) {
320      try {
321        // wait for all futures, remove successful parts
322        // (only the remaining parts will be retried)
323        Future<Integer> f = pool.take();
324        int index = f.get();
325        List<Entry> batch = batches.get(index);
326        batches.set(index, Collections.emptyList()); // remove successful batch
327        // Find the most recent write time in the batch
328        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
329        if (writeTime > lastWriteTime) {
330          lastWriteTime = writeTime;
331        }
332      } catch (InterruptedException ie) {
333        iox = new IOException(ie);
334      } catch (ExecutionException ee) {
335        // cause must be an IOException
336        iox = (IOException) ee.getCause();
337      }
338    }
339    if (iox != null) {
340      // if we had any exceptions, try again
341      throw iox;
342    }
343    return lastWriteTime;
344  }
345
346  /**
347   * Do the shipping logic
348   */
349  @Override
350  public boolean replicate(ReplicateContext replicateContext) {
351    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
352    String walGroupId = replicateContext.getWalGroupId();
353    int sleepMultiplier = 1;
354
355    if (!peersSelected && this.isRunning()) {
356      connectToPeers();
357      peersSelected = true;
358    }
359
360    int numSinks = replicationSinkMgr.getNumSinks();
361    if (numSinks == 0) {
362      LOG.warn("{} No replication sinks found, returning without replicating. "
363        + "The source should retry with the same set of edits.", logPeerId());
364      return false;
365    }
366
367    List<List<Entry>> batches = createBatches(replicateContext.getEntries());
368    while (this.isRunning() && !exec.isShutdown()) {
369      if (!isPeerEnabled()) {
370        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
371          sleepMultiplier++;
372        }
373        continue;
374      }
375      if (this.conn == null || this.conn.isClosed()) {
376        reconnectToPeerCluster();
377      }
378      try {
379        long lastWriteTime;
380
381        // replicate the batches to sink side.
382        lastWriteTime = parallelReplicate(pool, replicateContext, batches);
383
384        // update metrics
385        if (lastWriteTime > 0) {
386          this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
387        }
388        return true;
389      } catch (IOException ioe) {
390        // Didn't ship anything, but must still age the last time we did
391        this.metrics.refreshAgeOfLastShippedOp(walGroupId);
392        if (ioe instanceof RemoteException) {
393          ioe = ((RemoteException) ioe).unwrapRemoteException();
394          LOG.warn("{} Can't replicate because of an error on the remote cluster: ",
395            logPeerId(), ioe);
396          if (ioe instanceof TableNotFoundException) {
397            if (dropOnDeletedTables) {
398              // this is a bit fragile, but cannot change how TNFE is serialized
399              // at least check whether the table name is legal
400              TableName table = parseTable(ioe.getMessage());
401              if (table != null) {
402                try (Connection localConn =
403                    ConnectionFactory.createConnection(ctx.getLocalConfiguration())) {
404                  if (!localConn.getAdmin().tableExists(table)) {
405                    // Would potentially be better to retry in one of the outer loops
406                    // and add a table filter there; but that would break the encapsulation,
407                    // so we're doing the filtering here.
408                    LOG.info("{} Missing table detected at sink, local table also does not "
409                      + "exist, filtering edits for '{}'", logPeerId(), table);
410                    batches = filterBatches(batches, table);
411                    continue;
412                  }
413                } catch (IOException iox) {
414                  LOG.warn("{} Exception checking for local table: ", logPeerId(), iox);
415                }
416              }
417            }
418            // fall through and sleep below
419          } else {
420            LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
421              ioe);
422            replicationSinkMgr.chooseSinks();
423          }
424        } else {
425          if (ioe instanceof SocketTimeoutException) {
426            // This exception means we waited for more than 60s and nothing
427            // happened, the cluster is alive and calling it right away
428            // even for a test just makes things worse.
429            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
430                  "call to the remote cluster timed out, which is usually " +
431                  "caused by a machine failure or a massive slowdown",
432              this.socketTimeoutMultiplier);
433          } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
434            LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
435            replicationSinkMgr.chooseSinks();
436          } else {
437            LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
438          }
439        }
440        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
441          sleepMultiplier++;
442        }
443      }
444    }
445    return false; // in case we exited before replicating
446  }
447
448  protected boolean isPeerEnabled() {
449    return ctx.getReplicationPeer().isPeerEnabled();
450  }
451
452  @Override
453  protected void doStop() {
454    disconnect(); // don't call super.doStop()
455    if (this.conn != null) {
456      try {
457        this.conn.close();
458        this.conn = null;
459      } catch (IOException e) {
460        LOG.warn("{} Failed to close the connection", logPeerId());
461      }
462    }
463    // Allow currently running replication tasks to finish
464    exec.shutdown();
465    try {
466      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
467    } catch (InterruptedException e) {
468    }
469    // Abort if the tasks did not terminate in time
470    if (!exec.isTerminated()) {
471      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
472          "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
473          "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
474      abortable.abort(errMsg, new IOException(errMsg));
475    }
476    notifyStopped();
477  }
478
479  @VisibleForTesting
480  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
481      throws IOException {
482    SinkPeer sinkPeer = null;
483    try {
484      int entriesHashCode = System.identityHashCode(entries);
485      if (LOG.isTraceEnabled()) {
486        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
487        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
488          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
489      }
490      sinkPeer = replicationSinkMgr.getReplicationSink();
491      BlockingInterface rrs = sinkPeer.getRegionServer();
492      try {
493        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
494          replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout);
495        if (LOG.isTraceEnabled()) {
496          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
497        }
498      } catch (IOException e) {
499        if (LOG.isTraceEnabled()) {
500          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
501        }
502        throw e;
503      }
504      replicationSinkMgr.reportSinkSuccess(sinkPeer);
505    } catch (IOException ioe) {
506      if (sinkPeer != null) {
507        replicationSinkMgr.reportBadSink(sinkPeer);
508      }
509      throw ioe;
510    }
511    return batchIndex;
512  }
513
514  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
515      throws IOException {
516    int batchSize = 0, index = 0;
517    List<Entry> batch = new ArrayList<>();
518    for (Entry entry : entries) {
519      int entrySize = getEstimatedEntrySize(entry);
520      if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
521        replicateEntries(batch, index++, timeout);
522        batch.clear();
523        batchSize = 0;
524      }
525      batch.add(entry);
526      batchSize += entrySize;
527    }
528    if (batchSize > 0) {
529      replicateEntries(batch, index, timeout);
530    }
531    return batchIndex;
532  }
533
534  @VisibleForTesting
535  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
536    return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
537        : () -> replicateEntries(entries, batchIndex, timeout);
538  }
539
540  private String logPeerId(){
541    return "[Source for peer " + this.ctx.getPeerId() + "]:";
542  }
543
544}