001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.net.SocketTimeoutException;
024import java.net.UnknownHostException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.concurrent.Callable;
034import java.util.concurrent.CompletionService;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.ExecutorCompletionService;
037import java.util.concurrent.Future;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import java.util.stream.Collectors;
041import java.util.stream.Stream;
042
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Abortable;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HBaseConfiguration;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.TableNotFoundException;
052import org.apache.hadoop.hbase.client.Admin;
053import org.apache.hadoop.hbase.client.ClusterConnection;
054import org.apache.hadoop.hbase.client.Connection;
055import org.apache.hadoop.hbase.client.ConnectionFactory;
056import org.apache.hadoop.hbase.ipc.RpcServer;
057import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
058import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
059import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
060import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
061import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.Threads;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.hadoop.hbase.wal.WALEdit;
067import org.apache.hadoop.ipc.RemoteException;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
073
074import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
076import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
077
078/**
079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
080 * implementation for replicating to another HBase cluster.
081 * For the slave cluster it selects a random number of peers
082 * using a replication ratio. For example, if replication ration = 0.1
083 * and slave cluster has 100 region servers, 10 will be selected.
084 * <p>
085 * A stream is considered down when we cannot contact a region server on the
086 * peer cluster for more than 55 seconds by default.
087 * </p>
088 */
089@InterfaceAudience.Private
090public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
091  private static final Logger LOG =
092      LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
093
094  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
095
096  /** Drop edits for tables that been deleted from the replication source and target */
097  public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
098      "hbase.replication.drop.on.deleted.table";
099  /** Drop edits for CFs that been deleted from the replication source and target */
100  public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
101      "hbase.replication.drop.on.deleted.columnfamily";
102
103  private ClusterConnection conn;
104  private Configuration localConf;
105  private Configuration conf;
106  // How long should we sleep for each retry
107  private long sleepForRetries;
108  // Maximum number of retries before taking bold actions
109  private int maxRetriesMultiplier;
110  // Socket timeouts require even bolder actions since we don't want to DDOS
111  private int socketTimeoutMultiplier;
112  // Amount of time for shutdown to wait for all tasks to complete
113  private long maxTerminationWait;
114  // Size limit for replication RPCs, in bytes
115  private int replicationRpcLimit;
116  //Metrics for this source
117  private MetricsSource metrics;
118  // Handles connecting to peer region servers
119  private ReplicationSinkManager replicationSinkMgr;
120  private boolean peersSelected = false;
121  private String replicationClusterId = "";
122  private ThreadPoolExecutor exec;
123  private int maxThreads;
124  private Path baseNamespaceDir;
125  private Path hfileArchiveDir;
126  private boolean replicationBulkLoadDataEnabled;
127  private Abortable abortable;
128  private boolean dropOnDeletedTables;
129  private boolean dropOnDeletedColumnFamilies;
130  private boolean isSerial = false;
131
132  /*
133   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
134   * different Connection implementations, or initialize it in a different way,
135   * so defining createConnection as protected for possible overridings.
136   */
137  protected Connection createConnection(Configuration conf) throws IOException {
138    return ConnectionFactory.createConnection(conf);
139  }
140
141  /*
142   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
143   * different ReplicationSinkManager implementations, or initialize it in a different way,
144   * so defining createReplicationSinkManager as protected for possible overridings.
145   */
146  protected ReplicationSinkManager createReplicationSinkManager(Connection conn) {
147    return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(),
148      this, this.conf);
149  }
150
151  @Override
152  public void init(Context context) throws IOException {
153    super.init(context);
154    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
155    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
156    decorateConf();
157    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
158    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
159        maxRetriesMultiplier);
160    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
161    // tasks to terminate when doStop() is called.
162    long maxTerminationWaitMultiplier = this.conf.getLong(
163        "replication.source.maxterminationmultiplier",
164        DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
165    this.maxTerminationWait = maxTerminationWaitMultiplier *
166        this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
167    // TODO: This connection is replication specific or we should make it particular to
168    // replication and make replication specific settings such as compression or codec to use
169    // passing Cells.
170    Connection connection = createConnection(this.conf);
171    //Since createConnection method may be overridden by extending classes, we need to make sure
172    //it's indeed returning a ClusterConnection instance.
173    Preconditions.checkState(connection instanceof ClusterConnection);
174    this.conn = (ClusterConnection) connection;
175    this.sleepForRetries =
176        this.conf.getLong("replication.source.sleepforretries", 1000);
177    this.metrics = context.getMetrics();
178    // ReplicationQueueInfo parses the peerId out of the znode for us
179    this.replicationSinkMgr = createReplicationSinkManager(conn);
180    // per sink thread pool
181    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
182      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
183    this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
184        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
185    this.abortable = ctx.getAbortable();
186    // Set the size limit for replication RPCs to 95% of the max request size.
187    // We could do with less slop if we have an accurate estimate of encoded size. Being
188    // conservative for now.
189    this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE,
190      RpcServer.DEFAULT_MAX_REQUEST_SIZE));
191    this.dropOnDeletedTables =
192        this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
193    this.dropOnDeletedColumnFamilies = this.conf
194        .getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false);
195
196    this.replicationBulkLoadDataEnabled =
197        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
198          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
199    if (this.replicationBulkLoadDataEnabled) {
200      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
201    }
202    // Construct base namespace directory and hfile archive directory path
203    Path rootDir = CommonFSUtils.getRootDir(conf);
204    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
205    baseNamespaceDir = new Path(rootDir, baseNSDir);
206    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
207    isSerial = context.getPeerConfig().isSerial();
208  }
209
210  private void decorateConf() {
211    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
212    if (StringUtils.isNotEmpty(replicationCodec)) {
213      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
214    }
215  }
216
217  private void connectToPeers() {
218    getRegionServers();
219
220    int sleepMultiplier = 1;
221
222    // Connect to peer cluster first, unless we have to stop
223    while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
224      replicationSinkMgr.chooseSinks();
225      if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
226        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
227          sleepMultiplier++;
228        }
229      }
230    }
231  }
232
233  /**
234   * Do the sleeping logic
235   * @param msg Why we sleep
236   * @param sleepMultiplier by how many times the default sleeping time is augmented
237   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
238   */
239  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
240    try {
241      if (LOG.isTraceEnabled()) {
242        LOG.trace("{} {}, sleeping {} times {}",
243          logPeerId(), msg, sleepForRetries, sleepMultiplier);
244      }
245      Thread.sleep(this.sleepForRetries * sleepMultiplier);
246    } catch (InterruptedException e) {
247      if (LOG.isDebugEnabled()) {
248        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
249      }
250    }
251    return sleepMultiplier < maxRetriesMultiplier;
252  }
253
254  private int getEstimatedEntrySize(Entry e) {
255    long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
256    return (int) size;
257  }
258
259  private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
260    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
261    int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
262    List<List<Entry>> entryLists =
263        Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
264    int[] sizes = new int[n];
265    for (Entry e : entries) {
266      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
267      int entrySize = getEstimatedEntrySize(e);
268      // If this batch has at least one entry and is over sized, move it to the tail of list and
269      // initialize the entryLists[index] to be a empty list.
270      if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
271        entryLists.add(entryLists.get(index));
272        entryLists.set(index, new ArrayList<>());
273        sizes[index] = 0;
274      }
275      entryLists.get(index).add(e);
276      sizes[index] += entrySize;
277    }
278    return entryLists;
279  }
280
281  private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
282    Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
283    for (Entry e : entries) {
284      regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
285          .add(e);
286    }
287    return new ArrayList<>(regionEntries.values());
288  }
289
290  /**
291   * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
292   * concurrently. Note that, for serial replication, we need to make sure that entries from the
293   * same region to be replicated serially, so entries from the same region consist of a batch, and
294   * we will divide a batch into several batches by replicationRpcLimit in method
295   * serialReplicateRegionEntries()
296   */
297  private List<List<Entry>> createBatches(final List<Entry> entries) {
298    if (isSerial) {
299      return createSerialBatches(entries);
300    } else {
301      return createParallelBatches(entries);
302    }
303  }
304
305  /**
306   * Check if there's an {@link TableNotFoundException} in the caused by stacktrace.
307   */
308  @VisibleForTesting
309  public static boolean isTableNotFoundException(Throwable io) {
310    if (io instanceof RemoteException) {
311      io = ((RemoteException) io).unwrapRemoteException();
312    }
313    if (io != null && io.getMessage().contains("TableNotFoundException")) {
314      return true;
315    }
316    for (; io != null; io = io.getCause()) {
317      if (io instanceof TableNotFoundException) {
318        return true;
319      }
320    }
321    return false;
322  }
323
324  /**
325   * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace.
326   */
327  @VisibleForTesting
328  public static boolean isNoSuchColumnFamilyException(Throwable io) {
329    if (io instanceof RemoteException) {
330      io = ((RemoteException) io).unwrapRemoteException();
331    }
332    if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) {
333      return true;
334    }
335    for (; io != null; io = io.getCause()) {
336      if (io instanceof NoSuchColumnFamilyException) {
337        return true;
338      }
339    }
340    return false;
341  }
342
343  @VisibleForTesting
344  List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) {
345    List<List<Entry>> entryList = new ArrayList<>();
346    Map<TableName, Boolean> existMap = new HashMap<>();
347    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
348         Admin localAdmin = localConn.getAdmin()) {
349      for (List<Entry> oldEntries : oldEntryList) {
350        List<Entry> entries = new ArrayList<>();
351        for (Entry e : oldEntries) {
352          TableName tableName = e.getKey().getTableName();
353          boolean exist = true;
354          if (existMap.containsKey(tableName)) {
355            exist = existMap.get(tableName);
356          } else {
357            try {
358              exist = localAdmin.tableExists(tableName);
359              existMap.put(tableName, exist);
360            } catch (IOException iox) {
361              LOG.warn("Exception checking for local table " + tableName, iox);
362              // we can't drop edits without full assurance, so we assume table exists.
363              exist = true;
364            }
365          }
366          if (exist) {
367            entries.add(e);
368          } else {
369            // Would potentially be better to retry in one of the outer loops
370            // and add a table filter there; but that would break the encapsulation,
371            // so we're doing the filtering here.
372            LOG.warn("Missing table detected at sink, local table also does not exist, "
373                + "filtering edits for table '{}'", tableName);
374          }
375        }
376        if (!entries.isEmpty()) {
377          entryList.add(entries);
378        }
379      }
380    } catch (IOException iox) {
381      LOG.warn("Exception when creating connection to check local table", iox);
382      return oldEntryList;
383    }
384    return entryList;
385  }
386
387  @VisibleForTesting
388  List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) {
389    List<List<Entry>> entryList = new ArrayList<>();
390    Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>();
391    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
392         Admin localAdmin = localConn.getAdmin()) {
393      for (List<Entry> oldEntries : oldEntryList) {
394        List<Entry> entries = new ArrayList<>();
395        for (Entry e : oldEntries) {
396          TableName tableName = e.getKey().getTableName();
397          if (!existColumnFamilyMap.containsKey(tableName)) {
398            try {
399              Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream()
400                  .map(Bytes::toString).collect(Collectors.toSet());
401              existColumnFamilyMap.put(tableName, cfs);
402            } catch (Exception ex) {
403              LOG.warn("Exception getting cf names for local table {}", tableName, ex);
404              // if catch any exception, we are not sure about table's description,
405              // so replicate raw entry
406              entries.add(e);
407              continue;
408            }
409          }
410
411          Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName);
412          Set<String> missingCFs = new HashSet<>();
413          WALEdit walEdit = new WALEdit();
414          walEdit.getCells().addAll(e.getEdit().getCells());
415          WALUtil.filterCells(walEdit, cell -> {
416            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
417            if (existColumnFamilies.contains(cf)) {
418              return cell;
419            } else {
420              missingCFs.add(cf);
421              return null;
422            }
423          });
424          if (!walEdit.isEmpty()) {
425            Entry newEntry = new Entry(e.getKey(), walEdit);
426            entries.add(newEntry);
427          }
428
429          if (!missingCFs.isEmpty()) {
430            // Would potentially be better to retry in one of the outer loops
431            // and add a table filter there; but that would break the encapsulation,
432            // so we're doing the filtering here.
433            LOG.warn(
434                "Missing column family detected at sink, local column family also does not exist,"
435                    + " filtering edits for table '{}',column family '{}'", tableName, missingCFs);
436          }
437        }
438        if (!entries.isEmpty()) {
439          entryList.add(entries);
440        }
441      }
442    } catch (IOException iox) {
443      LOG.warn("Exception when creating connection to check local table", iox);
444      return oldEntryList;
445    }
446    return entryList;
447  }
448
449  private void reconnectToPeerCluster() {
450    ClusterConnection connection = null;
451    try {
452      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
453    } catch (IOException ioe) {
454      LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe);
455    }
456    if (connection != null) {
457      this.conn = connection;
458    }
459  }
460
461  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
462      List<List<Entry>> batches) throws IOException {
463    int futures = 0;
464    for (int i = 0; i < batches.size(); i++) {
465      List<Entry> entries = batches.get(i);
466      if (!entries.isEmpty()) {
467        if (LOG.isTraceEnabled()) {
468          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
469            replicateContext.getSize());
470        }
471        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
472        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
473        futures++;
474      }
475    }
476
477    IOException iox = null;
478    long lastWriteTime = 0;
479    for (int i = 0; i < futures; i++) {
480      try {
481        // wait for all futures, remove successful parts
482        // (only the remaining parts will be retried)
483        Future<Integer> f = pool.take();
484        int index = f.get();
485        List<Entry> batch = batches.get(index);
486        batches.set(index, Collections.emptyList()); // remove successful batch
487        // Find the most recent write time in the batch
488        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
489        if (writeTime > lastWriteTime) {
490          lastWriteTime = writeTime;
491        }
492      } catch (InterruptedException ie) {
493        iox = new IOException(ie);
494      } catch (ExecutionException ee) {
495        iox = ee.getCause() instanceof IOException?
496          (IOException)ee.getCause(): new IOException(ee.getCause());
497      }
498    }
499    if (iox != null) {
500      // if we had any exceptions, try again
501      throw iox;
502    }
503    return lastWriteTime;
504  }
505
506  /**
507   * Do the shipping logic
508   */
509  @Override
510  public boolean replicate(ReplicateContext replicateContext) {
511    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
512    int sleepMultiplier = 1;
513
514    if (!peersSelected && this.isRunning()) {
515      connectToPeers();
516      peersSelected = true;
517    }
518
519    int numSinks = replicationSinkMgr.getNumSinks();
520    if (numSinks == 0) {
521      LOG.warn("{} No replication sinks found, returning without replicating. "
522        + "The source should retry with the same set of edits.", logPeerId());
523      return false;
524    }
525
526    List<List<Entry>> batches = createBatches(replicateContext.getEntries());
527    while (this.isRunning() && !exec.isShutdown()) {
528      if (!isPeerEnabled()) {
529        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
530          sleepMultiplier++;
531        }
532        continue;
533      }
534      if (this.conn == null || this.conn.isClosed()) {
535        reconnectToPeerCluster();
536      }
537      try {
538        // replicate the batches to sink side.
539        parallelReplicate(pool, replicateContext, batches);
540        return true;
541      } catch (IOException ioe) {
542        if (ioe instanceof RemoteException) {
543          if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
544            // Only filter the edits to replicate and don't change the entries in replicateContext
545            // as the upper layer rely on it.
546            batches = filterNotExistTableEdits(batches);
547            if (batches.isEmpty()) {
548              LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
549              return true;
550            }
551          } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
552            batches = filterNotExistColumnFamilyEdits(batches);
553            if (batches.isEmpty()) {
554              LOG.warn(
555                  "After filter not exist column family's edits, 0 edits to replicate, just return");
556              return true;
557            }
558          }
559        } else {
560          if (ioe instanceof SocketTimeoutException) {
561            // This exception means we waited for more than 60s and nothing
562            // happened, the cluster is alive and calling it right away
563            // even for a test just makes things worse.
564            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
565                  "call to the remote cluster timed out, which is usually " +
566                  "caused by a machine failure or a massive slowdown",
567              this.socketTimeoutMultiplier);
568          } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
569            LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
570            replicationSinkMgr.chooseSinks();
571          } else {
572            LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
573          }
574        }
575        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
576          sleepMultiplier++;
577        }
578      }
579    }
580    return false; // in case we exited before replicating
581  }
582
583  protected boolean isPeerEnabled() {
584    return ctx.getReplicationPeer().isPeerEnabled();
585  }
586
587  @Override
588  protected void doStop() {
589    disconnect(); // don't call super.doStop()
590    if (this.conn != null) {
591      try {
592        this.conn.close();
593        this.conn = null;
594      } catch (IOException e) {
595        LOG.warn("{} Failed to close the connection", logPeerId());
596      }
597    }
598    // Allow currently running replication tasks to finish
599    exec.shutdown();
600    try {
601      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
602    } catch (InterruptedException e) {
603    }
604    // Abort if the tasks did not terminate in time
605    if (!exec.isTerminated()) {
606      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
607          "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
608          "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
609      abortable.abort(errMsg, new IOException(errMsg));
610    }
611    notifyStopped();
612  }
613
614  @VisibleForTesting
615  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
616      throws IOException {
617    SinkPeer sinkPeer = null;
618    try {
619      int entriesHashCode = System.identityHashCode(entries);
620      if (LOG.isTraceEnabled()) {
621        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
622        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
623          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
624      }
625      sinkPeer = replicationSinkMgr.getReplicationSink();
626      BlockingInterface rrs = sinkPeer.getRegionServer();
627      try {
628        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
629          replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout);
630        if (LOG.isTraceEnabled()) {
631          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
632        }
633      } catch (IOException e) {
634        if (LOG.isTraceEnabled()) {
635          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
636        }
637        throw e;
638      }
639      replicationSinkMgr.reportSinkSuccess(sinkPeer);
640    } catch (IOException ioe) {
641      if (sinkPeer != null) {
642        replicationSinkMgr.reportBadSink(sinkPeer);
643      }
644      throw ioe;
645    }
646    return batchIndex;
647  }
648
649  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
650      throws IOException {
651    int batchSize = 0, index = 0;
652    List<Entry> batch = new ArrayList<>();
653    for (Entry entry : entries) {
654      int entrySize = getEstimatedEntrySize(entry);
655      if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
656        replicateEntries(batch, index++, timeout);
657        batch.clear();
658        batchSize = 0;
659      }
660      batch.add(entry);
661      batchSize += entrySize;
662    }
663    if (batchSize > 0) {
664      replicateEntries(batch, index, timeout);
665    }
666    return batchIndex;
667  }
668
669  @VisibleForTesting
670  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
671    return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
672        : () -> replicateEntries(entries, batchIndex, timeout);
673  }
674
675  private String logPeerId(){
676    return "[Source for peer " + this.ctx.getPeerId() + "]:";
677  }
678
679}