001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.net.ConnectException;
022import java.net.SocketTimeoutException;
023import java.net.UnknownHostException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.concurrent.Callable;
033import java.util.concurrent.CompletionService;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.ExecutorCompletionService;
036import java.util.concurrent.Future;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Abortable;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.TableNotFoundException;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.ClusterConnection;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.client.ConnectionFactory;
054import org.apache.hadoop.hbase.ipc.CallTimeoutException;
055import org.apache.hadoop.hbase.ipc.RpcServer;
056import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
057import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
058import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
060import org.apache.hadoop.hbase.replication.ReplicationUtils;
061import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.Threads;
066import org.apache.hadoop.hbase.wal.WAL.Entry;
067import org.apache.hadoop.hbase.wal.WALEdit;
068import org.apache.hadoop.ipc.RemoteException;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
075
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
077
078/**
079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
080 * to another HBase cluster. For the slave cluster it selects a random number of peers using a
081 * replication ratio. For example, if replication ration = 0.1 and slave cluster has 100 region
082 * servers, 10 will be selected.
083 * <p>
084 * A stream is considered down when we cannot contact a region server on the peer cluster for more
085 * than 55 seconds by default.
086 * </p>
087 */
088@InterfaceAudience.Private
089public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
090  private static final Logger LOG =
091    LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
092
093  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
094
095  /** Drop edits for tables that been deleted from the replication source and target */
096  public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
097    "hbase.replication.drop.on.deleted.table";
098  /** Drop edits for CFs that been deleted from the replication source and target */
099  public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
100    "hbase.replication.drop.on.deleted.columnfamily";
101
102  private ClusterConnection conn;
103  private Configuration localConf;
104  private Configuration conf;
105  // How long should we sleep for each retry
106  private long sleepForRetries;
107  // Maximum number of retries before taking bold actions
108  private int maxRetriesMultiplier;
109  // Socket timeouts require even bolder actions since we don't want to DDOS
110  private int socketTimeoutMultiplier;
111  // Amount of time for shutdown to wait for all tasks to complete
112  private long maxTerminationWait;
113  // Size limit for replication RPCs, in bytes
114  private int replicationRpcLimit;
115  // Metrics for this source
116  private MetricsSource metrics;
117  // Handles connecting to peer region servers
118  private ReplicationSinkManager replicationSinkMgr;
119  private boolean peersSelected = false;
120  private String replicationClusterId = "";
121  private ThreadPoolExecutor exec;
122  private int maxThreads;
123  private Path baseNamespaceDir;
124  private Path hfileArchiveDir;
125  private boolean replicationBulkLoadDataEnabled;
126  private Abortable abortable;
127  private boolean dropOnDeletedTables;
128  private boolean dropOnDeletedColumnFamilies;
129  private boolean isSerial = false;
130  // Initialising as 0 to guarantee at least one logging message
131  private long lastSinkFetchTime = 0;
132
133  /*
134   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
135   * different Connection implementations, or initialize it in a different way, so defining
136   * createConnection as protected for possible overridings.
137   */
138  protected Connection createConnection(Configuration conf) throws IOException {
139    return ConnectionFactory.createConnection(conf);
140  }
141
142  /*
143   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
144   * different ReplicationSinkManager implementations, or initialize it in a different way, so
145   * defining createReplicationSinkManager as protected for possible overridings.
146   */
147  protected ReplicationSinkManager createReplicationSinkManager(Connection conn) {
148    return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(), this,
149      this.conf);
150  }
151
152  @Override
153  public void init(Context context) throws IOException {
154    super.init(context);
155    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
156    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
157    decorateConf();
158    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
159    this.socketTimeoutMultiplier =
160      this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
161    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
162    // tasks to terminate when doStop() is called.
163    long maxTerminationWaitMultiplier = this.conf.getLong(
164      "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
165    this.maxTerminationWait = maxTerminationWaitMultiplier
166      * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
167    // TODO: This connection is replication specific or we should make it particular to
168    // replication and make replication specific settings such as compression or codec to use
169    // passing Cells.
170    Connection connection = createConnection(this.conf);
171    // Since createConnection method may be overridden by extending classes, we need to make sure
172    // it's indeed returning a ClusterConnection instance.
173    Preconditions.checkState(connection instanceof ClusterConnection);
174    this.conn = (ClusterConnection) connection;
175    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
176    this.metrics = context.getMetrics();
177    // ReplicationQueueInfo parses the peerId out of the znode for us
178    this.replicationSinkMgr = createReplicationSinkManager(conn);
179    // per sink thread pool
180    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
181      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
182    this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
183      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
184    this.abortable = ctx.getAbortable();
185    // Set the size limit for replication RPCs to 95% of the max request size.
186    // We could do with less slop if we have an accurate estimate of encoded size. Being
187    // conservative for now.
188    this.replicationRpcLimit =
189      (int) (0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE));
190    this.dropOnDeletedTables = this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
191    this.dropOnDeletedColumnFamilies =
192      this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false);
193
194    this.replicationBulkLoadDataEnabled = conf.getBoolean(
195      HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
196    if (this.replicationBulkLoadDataEnabled) {
197      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
198    }
199    // Construct base namespace directory and hfile archive directory path
200    Path rootDir = CommonFSUtils.getRootDir(conf);
201    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
202    baseNamespaceDir = new Path(rootDir, baseNSDir);
203    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
204    isSerial = context.getPeerConfig().isSerial();
205  }
206
207  private void decorateConf() {
208    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
209    if (StringUtils.isNotEmpty(replicationCodec)) {
210      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
211    }
212  }
213
214  private void connectToPeers() {
215    getRegionServers();
216
217    int sleepMultiplier = 1;
218
219    // Connect to peer cluster first, unless we have to stop
220    while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
221      replicationSinkMgr.chooseSinks();
222      if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
223        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
224          sleepMultiplier++;
225        }
226      }
227    }
228  }
229
230  /**
231   * Do the sleeping logic
232   * @param msg             Why we sleep
233   * @param sleepMultiplier by how many times the default sleeping time is augmented
234   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
235   */
236  private boolean sleepForRetries(String msg, int sleepMultiplier) {
237    try {
238      if (LOG.isTraceEnabled()) {
239        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
240          sleepMultiplier);
241      }
242      Thread.sleep(this.sleepForRetries * sleepMultiplier);
243    } catch (InterruptedException e) {
244      Thread.currentThread().interrupt();
245      if (LOG.isDebugEnabled()) {
246        LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId());
247      }
248    }
249    return sleepMultiplier < maxRetriesMultiplier;
250  }
251
252  private int getEstimatedEntrySize(Entry e) {
253    long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
254    return (int) size;
255  }
256
257  private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
258    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
259    int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
260    List<List<Entry>> entryLists =
261      Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
262    int[] sizes = new int[n];
263    for (Entry e : entries) {
264      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
265      int entrySize = getEstimatedEntrySize(e);
266      // If this batch has at least one entry and is over sized, move it to the tail of list and
267      // initialize the entryLists[index] to be a empty list.
268      if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
269        entryLists.add(entryLists.get(index));
270        entryLists.set(index, new ArrayList<>());
271        sizes[index] = 0;
272      }
273      entryLists.get(index).add(e);
274      sizes[index] += entrySize;
275    }
276    return entryLists;
277  }
278
279  private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
280    Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
281    for (Entry e : entries) {
282      regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
283        .add(e);
284    }
285    return new ArrayList<>(regionEntries.values());
286  }
287
288  /**
289   * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
290   * concurrently. Note that, for serial replication, we need to make sure that entries from the
291   * same region to be replicated serially, so entries from the same region consist of a batch, and
292   * we will divide a batch into several batches by replicationRpcLimit in method
293   * serialReplicateRegionEntries()
294   */
295  private List<List<Entry>> createBatches(final List<Entry> entries) {
296    if (isSerial) {
297      return createSerialBatches(entries);
298    } else {
299      return createParallelBatches(entries);
300    }
301  }
302
303  /**
304   * Check if there's an {@link TableNotFoundException} in the caused by stacktrace.
305   */
306  public static boolean isTableNotFoundException(Throwable io) {
307    if (io instanceof RemoteException) {
308      io = ((RemoteException) io).unwrapRemoteException();
309    }
310    if (io != null && io.getMessage().contains("TableNotFoundException")) {
311      return true;
312    }
313    for (; io != null; io = io.getCause()) {
314      if (io instanceof TableNotFoundException) {
315        return true;
316      }
317    }
318    return false;
319  }
320
321  /**
322   * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace.
323   */
324  public static boolean isNoSuchColumnFamilyException(Throwable io) {
325    if (io instanceof RemoteException) {
326      io = ((RemoteException) io).unwrapRemoteException();
327    }
328    if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) {
329      return true;
330    }
331    for (; io != null; io = io.getCause()) {
332      if (io instanceof NoSuchColumnFamilyException) {
333        return true;
334      }
335    }
336    return false;
337  }
338
339  List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) {
340    List<List<Entry>> entryList = new ArrayList<>();
341    Map<TableName, Boolean> existMap = new HashMap<>();
342    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
343      Admin localAdmin = localConn.getAdmin()) {
344      for (List<Entry> oldEntries : oldEntryList) {
345        List<Entry> entries = new ArrayList<>();
346        for (Entry e : oldEntries) {
347          TableName tableName = e.getKey().getTableName();
348          boolean exist = true;
349          if (existMap.containsKey(tableName)) {
350            exist = existMap.get(tableName);
351          } else {
352            try {
353              exist = localAdmin.tableExists(tableName);
354              existMap.put(tableName, exist);
355            } catch (IOException iox) {
356              LOG.warn("Exception checking for local table " + tableName, iox);
357              // we can't drop edits without full assurance, so we assume table exists.
358              exist = true;
359            }
360          }
361          if (exist) {
362            entries.add(e);
363          } else {
364            // Would potentially be better to retry in one of the outer loops
365            // and add a table filter there; but that would break the encapsulation,
366            // so we're doing the filtering here.
367            LOG.warn("Missing table detected at sink, local table also does not exist, "
368              + "filtering edits for table '{}'", tableName);
369          }
370        }
371        if (!entries.isEmpty()) {
372          entryList.add(entries);
373        }
374      }
375    } catch (IOException iox) {
376      LOG.warn("Exception when creating connection to check local table", iox);
377      return oldEntryList;
378    }
379    return entryList;
380  }
381
382  List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) {
383    List<List<Entry>> entryList = new ArrayList<>();
384    Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>();
385    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
386      Admin localAdmin = localConn.getAdmin()) {
387      for (List<Entry> oldEntries : oldEntryList) {
388        List<Entry> entries = new ArrayList<>();
389        for (Entry e : oldEntries) {
390          TableName tableName = e.getKey().getTableName();
391          if (!existColumnFamilyMap.containsKey(tableName)) {
392            try {
393              Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream()
394                .map(Bytes::toString).collect(Collectors.toSet());
395              existColumnFamilyMap.put(tableName, cfs);
396            } catch (Exception ex) {
397              LOG.warn("Exception getting cf names for local table {}", tableName, ex);
398              // if catch any exception, we are not sure about table's description,
399              // so replicate raw entry
400              entries.add(e);
401              continue;
402            }
403          }
404
405          Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName);
406          Set<String> missingCFs = new HashSet<>();
407          WALEdit walEdit = new WALEdit();
408          walEdit.getCells().addAll(e.getEdit().getCells());
409          WALUtil.filterCells(walEdit, cell -> {
410            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
411            if (existColumnFamilies.contains(cf)) {
412              return cell;
413            } else {
414              missingCFs.add(cf);
415              return null;
416            }
417          });
418          if (!walEdit.isEmpty()) {
419            Entry newEntry = new Entry(e.getKey(), walEdit);
420            entries.add(newEntry);
421          }
422
423          if (!missingCFs.isEmpty()) {
424            // Would potentially be better to retry in one of the outer loops
425            // and add a table filter there; but that would break the encapsulation,
426            // so we're doing the filtering here.
427            LOG.warn(
428              "Missing column family detected at sink, local column family also does not exist,"
429                + " filtering edits for table '{}',column family '{}'",
430              tableName, missingCFs);
431          }
432        }
433        if (!entries.isEmpty()) {
434          entryList.add(entries);
435        }
436      }
437    } catch (IOException iox) {
438      LOG.warn("Exception when creating connection to check local table", iox);
439      return oldEntryList;
440    }
441    return entryList;
442  }
443
444  private void reconnectToPeerCluster() {
445    ClusterConnection connection = null;
446    try {
447      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
448    } catch (IOException ioe) {
449      LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe);
450    }
451    if (connection != null) {
452      this.conn = connection;
453    }
454  }
455
456  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
457    List<List<Entry>> batches) throws IOException {
458    int futures = 0;
459    for (int i = 0; i < batches.size(); i++) {
460      List<Entry> entries = batches.get(i);
461      if (!entries.isEmpty()) {
462        if (LOG.isTraceEnabled()) {
463          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
464            replicateContext.getSize());
465        }
466        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
467        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
468        futures++;
469      }
470    }
471
472    IOException iox = null;
473    long lastWriteTime = 0;
474    for (int i = 0; i < futures; i++) {
475      try {
476        // wait for all futures, remove successful parts
477        // (only the remaining parts will be retried)
478        Future<Integer> f = pool.take();
479        int index = f.get();
480        List<Entry> batch = batches.get(index);
481        batches.set(index, Collections.emptyList()); // remove successful batch
482        // Find the most recent write time in the batch
483        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
484        if (writeTime > lastWriteTime) {
485          lastWriteTime = writeTime;
486        }
487      } catch (InterruptedException ie) {
488        iox = new IOException(ie);
489      } catch (ExecutionException ee) {
490        iox = ee.getCause() instanceof IOException
491          ? (IOException) ee.getCause()
492          : new IOException(ee.getCause());
493      }
494    }
495    if (iox != null) {
496      // if we had any exceptions, try again
497      throw iox;
498    }
499    return lastWriteTime;
500  }
501
502  /**
503   * Do the shipping logic
504   */
505  @Override
506  public boolean replicate(ReplicateContext replicateContext) {
507    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
508    int sleepMultiplier = 1;
509    int initialTimeout = replicateContext.getTimeout();
510
511    if (!peersSelected && this.isRunning()) {
512      connectToPeers();
513      peersSelected = true;
514    }
515
516    int numSinks = replicationSinkMgr.getNumSinks();
517    if (numSinks == 0) {
518      if (
519        (EnvironmentEdgeManager.currentTime() - lastSinkFetchTime) >= (maxRetriesMultiplier * 1000)
520      ) {
521        LOG.warn("No replication sinks found, returning without replicating. "
522          + "The source should retry with the same set of edits. Not logging this again for "
523          + "the next {} seconds.", maxRetriesMultiplier);
524        lastSinkFetchTime = EnvironmentEdgeManager.currentTime();
525      }
526      sleepForRetries("No sinks available at peer", sleepMultiplier);
527      return false;
528    }
529
530    List<List<Entry>> batches = createBatches(replicateContext.getEntries());
531    while (this.isRunning() && !exec.isShutdown()) {
532      if (!isPeerEnabled()) {
533        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
534          sleepMultiplier++;
535        }
536        continue;
537      }
538      if (this.conn == null || this.conn.isClosed()) {
539        reconnectToPeerCluster();
540      }
541      try {
542        // replicate the batches to sink side.
543        parallelReplicate(pool, replicateContext, batches);
544        return true;
545      } catch (IOException ioe) {
546        if (ioe instanceof RemoteException) {
547          if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
548            // Only filter the edits to replicate and don't change the entries in replicateContext
549            // as the upper layer rely on it.
550            batches = filterNotExistTableEdits(batches);
551            if (batches.isEmpty()) {
552              LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
553              return true;
554            }
555          } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
556            batches = filterNotExistColumnFamilyEdits(batches);
557            if (batches.isEmpty()) {
558              LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
559                + "just return");
560              return true;
561            }
562          } else {
563            LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
564              ioe);
565            replicationSinkMgr.chooseSinks();
566          }
567        } else {
568          if (ioe instanceof SocketTimeoutException) {
569            // This exception means we waited for more than 60s and nothing
570            // happened, the cluster is alive and calling it right away
571            // even for a test just makes things worse.
572            sleepForRetries(
573              "Encountered a SocketTimeoutException. Since the "
574                + "call to the remote cluster timed out, which is usually "
575                + "caused by a machine failure or a massive slowdown",
576              this.socketTimeoutMultiplier);
577          } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
578            LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
579            replicationSinkMgr.chooseSinks();
580          } else if (ioe instanceof CallTimeoutException) {
581            replicateContext
582              .setTimeout(ReplicationUtils.getAdaptiveTimeout(initialTimeout, sleepMultiplier));
583          } else {
584            LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
585          }
586        }
587        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
588          sleepMultiplier++;
589        }
590      }
591    }
592    return false; // in case we exited before replicating
593  }
594
595  protected boolean isPeerEnabled() {
596    return ctx.getReplicationPeer().isPeerEnabled();
597  }
598
599  @Override
600  protected void doStop() {
601    disconnect(); // don't call super.doStop()
602    if (this.conn != null) {
603      try {
604        this.conn.close();
605        this.conn = null;
606      } catch (IOException e) {
607        LOG.warn("{} Failed to close the connection", logPeerId());
608      }
609    }
610    // Allow currently running replication tasks to finish
611    exec.shutdown();
612    try {
613      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
614    } catch (InterruptedException e) {
615    }
616    // Abort if the tasks did not terminate in time
617    if (!exec.isTerminated()) {
618      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
619        + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
620        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
621      abortable.abort(errMsg, new IOException(errMsg));
622    }
623    notifyStopped();
624  }
625
626  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
627    throws IOException {
628    SinkPeer sinkPeer = null;
629    try {
630      int entriesHashCode = System.identityHashCode(entries);
631      if (LOG.isTraceEnabled()) {
632        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
633        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
634          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
635      }
636      sinkPeer = replicationSinkMgr.getReplicationSink();
637      BlockingInterface rrs = sinkPeer.getRegionServer();
638      try {
639        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
640          replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout);
641        if (LOG.isTraceEnabled()) {
642          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
643        }
644      } catch (IOException e) {
645        if (LOG.isTraceEnabled()) {
646          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
647        }
648        throw e;
649      }
650      replicationSinkMgr.reportSinkSuccess(sinkPeer);
651    } catch (IOException ioe) {
652      if (sinkPeer != null) {
653        replicationSinkMgr.reportBadSink(sinkPeer);
654      }
655      throw ioe;
656    }
657    return batchIndex;
658  }
659
660  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
661    throws IOException {
662    int batchSize = 0, index = 0;
663    List<Entry> batch = new ArrayList<>();
664    for (Entry entry : entries) {
665      int entrySize = getEstimatedEntrySize(entry);
666      if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
667        replicateEntries(batch, index++, timeout);
668        batch.clear();
669        batchSize = 0;
670      }
671      batch.add(entry);
672      batchSize += entrySize;
673    }
674    if (batchSize > 0) {
675      replicateEntries(batch, index, timeout);
676    }
677    return batchIndex;
678  }
679
680  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
681    return isSerial
682      ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
683      : () -> replicateEntries(entries, batchIndex, timeout);
684  }
685
686  private String logPeerId() {
687    return "[Source for peer " + this.ctx.getPeerId() + "]:";
688  }
689
690}