001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.net.ConnectException;
023import java.net.SocketTimeoutException;
024import java.net.UnknownHostException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.concurrent.Callable;
034import java.util.concurrent.CompletionService;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.ExecutorCompletionService;
037import java.util.concurrent.Future;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import java.util.stream.Collectors;
041import java.util.stream.Stream;
042import org.apache.commons.lang3.StringUtils;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Abortable;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.TableNotFoundException;
051import org.apache.hadoop.hbase.client.Admin;
052import org.apache.hadoop.hbase.client.ClusterConnection;
053import org.apache.hadoop.hbase.client.Connection;
054import org.apache.hadoop.hbase.client.ConnectionFactory;
055import org.apache.hadoop.hbase.ipc.RpcServer;
056import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
057import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
058import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
060import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.WAL.Entry;
065import org.apache.hadoop.hbase.wal.WALEdit;
066import org.apache.hadoop.ipc.RemoteException;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
072import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
073
074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
075
076/**
077 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
078 * implementation for replicating to another HBase cluster.
079 * For the slave cluster it selects a random number of peers
080 * using a replication ratio. For example, if replication ration = 0.1
081 * and slave cluster has 100 region servers, 10 will be selected.
082 * <p>
083 * A stream is considered down when we cannot contact a region server on the
084 * peer cluster for more than 55 seconds by default.
085 * </p>
086 */
087@InterfaceAudience.Private
088public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
089  private static final Logger LOG =
090      LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
091
092  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
093
094  /** Drop edits for tables that been deleted from the replication source and target */
095  public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
096      "hbase.replication.drop.on.deleted.table";
097  /** Drop edits for CFs that been deleted from the replication source and target */
098  public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
099      "hbase.replication.drop.on.deleted.columnfamily";
100
101  private ClusterConnection conn;
102  private Configuration localConf;
103  private Configuration conf;
104  // How long should we sleep for each retry
105  private long sleepForRetries;
106  // Maximum number of retries before taking bold actions
107  private int maxRetriesMultiplier;
108  // Socket timeouts require even bolder actions since we don't want to DDOS
109  private int socketTimeoutMultiplier;
110  // Amount of time for shutdown to wait for all tasks to complete
111  private long maxTerminationWait;
112  // Size limit for replication RPCs, in bytes
113  private int replicationRpcLimit;
114  //Metrics for this source
115  private MetricsSource metrics;
116  // Handles connecting to peer region servers
117  private ReplicationSinkManager replicationSinkMgr;
118  private boolean peersSelected = false;
119  private String replicationClusterId = "";
120  private ThreadPoolExecutor exec;
121  private int maxThreads;
122  private Path baseNamespaceDir;
123  private Path hfileArchiveDir;
124  private boolean replicationBulkLoadDataEnabled;
125  private Abortable abortable;
126  private boolean dropOnDeletedTables;
127  private boolean dropOnDeletedColumnFamilies;
128  private boolean isSerial = false;
129  //Initialising as 0 to guarantee at least one logging message
130  private long lastSinkFetchTime = 0;
131
132  /*
133   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
134   * different Connection implementations, or initialize it in a different way,
135   * so defining createConnection as protected for possible overridings.
136   */
137  protected Connection createConnection(Configuration conf) throws IOException {
138    return ConnectionFactory.createConnection(conf);
139  }
140
141  /*
142   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating
143   * different ReplicationSinkManager implementations, or initialize it in a different way,
144   * so defining createReplicationSinkManager as protected for possible overridings.
145   */
146  protected ReplicationSinkManager createReplicationSinkManager(Connection conn) {
147    return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(),
148      this, this.conf);
149  }
150
151  @Override
152  public void init(Context context) throws IOException {
153    super.init(context);
154    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
155    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
156    decorateConf();
157    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
158    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
159        maxRetriesMultiplier);
160    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
161    // tasks to terminate when doStop() is called.
162    long maxTerminationWaitMultiplier = this.conf.getLong(
163        "replication.source.maxterminationmultiplier",
164        DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
165    this.maxTerminationWait = maxTerminationWaitMultiplier *
166        this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
167    // TODO: This connection is replication specific or we should make it particular to
168    // replication and make replication specific settings such as compression or codec to use
169    // passing Cells.
170    Connection connection = createConnection(this.conf);
171    //Since createConnection method may be overridden by extending classes, we need to make sure
172    //it's indeed returning a ClusterConnection instance.
173    Preconditions.checkState(connection instanceof ClusterConnection);
174    this.conn = (ClusterConnection) connection;
175    this.sleepForRetries =
176        this.conf.getLong("replication.source.sleepforretries", 1000);
177    this.metrics = context.getMetrics();
178    // ReplicationQueueInfo parses the peerId out of the znode for us
179    this.replicationSinkMgr = createReplicationSinkManager(conn);
180    // per sink thread pool
181    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
182      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
183    this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
184        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
185    this.abortable = ctx.getAbortable();
186    // Set the size limit for replication RPCs to 95% of the max request size.
187    // We could do with less slop if we have an accurate estimate of encoded size. Being
188    // conservative for now.
189    this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE,
190      RpcServer.DEFAULT_MAX_REQUEST_SIZE));
191    this.dropOnDeletedTables =
192        this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
193    this.dropOnDeletedColumnFamilies = this.conf
194        .getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false);
195
196    this.replicationBulkLoadDataEnabled =
197        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
198          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
199    if (this.replicationBulkLoadDataEnabled) {
200      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
201    }
202    // Construct base namespace directory and hfile archive directory path
203    Path rootDir = CommonFSUtils.getRootDir(conf);
204    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
205    baseNamespaceDir = new Path(rootDir, baseNSDir);
206    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
207    isSerial = context.getPeerConfig().isSerial();
208  }
209
210  private void decorateConf() {
211    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
212    if (StringUtils.isNotEmpty(replicationCodec)) {
213      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
214    }
215  }
216
217  private void connectToPeers() {
218    getRegionServers();
219
220    int sleepMultiplier = 1;
221
222    // Connect to peer cluster first, unless we have to stop
223    while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
224      replicationSinkMgr.chooseSinks();
225      if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
226        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
227          sleepMultiplier++;
228        }
229      }
230    }
231  }
232
233  /**
234   * Do the sleeping logic
235   * @param msg Why we sleep
236   * @param sleepMultiplier by how many times the default sleeping time is augmented
237   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
238   */
239  private boolean sleepForRetries(String msg, int sleepMultiplier) {
240    try {
241      if (LOG.isTraceEnabled()) {
242        LOG.trace("{} {}, sleeping {} times {}",
243          logPeerId(), msg, sleepForRetries, sleepMultiplier);
244      }
245      Thread.sleep(this.sleepForRetries * sleepMultiplier);
246    } catch (InterruptedException e) {
247      Thread.currentThread().interrupt();
248      if (LOG.isDebugEnabled()) {
249        LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId());
250      }
251    }
252    return sleepMultiplier < maxRetriesMultiplier;
253  }
254
255  private int getEstimatedEntrySize(Entry e) {
256    long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
257    return (int) size;
258  }
259
260  private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
261    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
262    int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
263    List<List<Entry>> entryLists =
264        Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
265    int[] sizes = new int[n];
266    for (Entry e : entries) {
267      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
268      int entrySize = getEstimatedEntrySize(e);
269      // If this batch has at least one entry and is over sized, move it to the tail of list and
270      // initialize the entryLists[index] to be a empty list.
271      if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
272        entryLists.add(entryLists.get(index));
273        entryLists.set(index, new ArrayList<>());
274        sizes[index] = 0;
275      }
276      entryLists.get(index).add(e);
277      sizes[index] += entrySize;
278    }
279    return entryLists;
280  }
281
282  private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
283    Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
284    for (Entry e : entries) {
285      regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
286          .add(e);
287    }
288    return new ArrayList<>(regionEntries.values());
289  }
290
291  /**
292   * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
293   * concurrently. Note that, for serial replication, we need to make sure that entries from the
294   * same region to be replicated serially, so entries from the same region consist of a batch, and
295   * we will divide a batch into several batches by replicationRpcLimit in method
296   * serialReplicateRegionEntries()
297   */
298  private List<List<Entry>> createBatches(final List<Entry> entries) {
299    if (isSerial) {
300      return createSerialBatches(entries);
301    } else {
302      return createParallelBatches(entries);
303    }
304  }
305
306  /**
307   * Check if there's an {@link TableNotFoundException} in the caused by stacktrace.
308   */
309  public static boolean isTableNotFoundException(Throwable io) {
310    if (io instanceof RemoteException) {
311      io = ((RemoteException) io).unwrapRemoteException();
312    }
313    if (io != null && io.getMessage().contains("TableNotFoundException")) {
314      return true;
315    }
316    for (; io != null; io = io.getCause()) {
317      if (io instanceof TableNotFoundException) {
318        return true;
319      }
320    }
321    return false;
322  }
323
324  /**
325   * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace.
326   */
327  public static boolean isNoSuchColumnFamilyException(Throwable io) {
328    if (io instanceof RemoteException) {
329      io = ((RemoteException) io).unwrapRemoteException();
330    }
331    if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) {
332      return true;
333    }
334    for (; io != null; io = io.getCause()) {
335      if (io instanceof NoSuchColumnFamilyException) {
336        return true;
337      }
338    }
339    return false;
340  }
341
342  List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) {
343    List<List<Entry>> entryList = new ArrayList<>();
344    Map<TableName, Boolean> existMap = new HashMap<>();
345    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
346         Admin localAdmin = localConn.getAdmin()) {
347      for (List<Entry> oldEntries : oldEntryList) {
348        List<Entry> entries = new ArrayList<>();
349        for (Entry e : oldEntries) {
350          TableName tableName = e.getKey().getTableName();
351          boolean exist = true;
352          if (existMap.containsKey(tableName)) {
353            exist = existMap.get(tableName);
354          } else {
355            try {
356              exist = localAdmin.tableExists(tableName);
357              existMap.put(tableName, exist);
358            } catch (IOException iox) {
359              LOG.warn("Exception checking for local table " + tableName, iox);
360              // we can't drop edits without full assurance, so we assume table exists.
361              exist = true;
362            }
363          }
364          if (exist) {
365            entries.add(e);
366          } else {
367            // Would potentially be better to retry in one of the outer loops
368            // and add a table filter there; but that would break the encapsulation,
369            // so we're doing the filtering here.
370            LOG.warn("Missing table detected at sink, local table also does not exist, "
371                + "filtering edits for table '{}'", tableName);
372          }
373        }
374        if (!entries.isEmpty()) {
375          entryList.add(entries);
376        }
377      }
378    } catch (IOException iox) {
379      LOG.warn("Exception when creating connection to check local table", iox);
380      return oldEntryList;
381    }
382    return entryList;
383  }
384
385  List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) {
386    List<List<Entry>> entryList = new ArrayList<>();
387    Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>();
388    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
389         Admin localAdmin = localConn.getAdmin()) {
390      for (List<Entry> oldEntries : oldEntryList) {
391        List<Entry> entries = new ArrayList<>();
392        for (Entry e : oldEntries) {
393          TableName tableName = e.getKey().getTableName();
394          if (!existColumnFamilyMap.containsKey(tableName)) {
395            try {
396              Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream()
397                  .map(Bytes::toString).collect(Collectors.toSet());
398              existColumnFamilyMap.put(tableName, cfs);
399            } catch (Exception ex) {
400              LOG.warn("Exception getting cf names for local table {}", tableName, ex);
401              // if catch any exception, we are not sure about table's description,
402              // so replicate raw entry
403              entries.add(e);
404              continue;
405            }
406          }
407
408          Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName);
409          Set<String> missingCFs = new HashSet<>();
410          WALEdit walEdit = new WALEdit();
411          walEdit.getCells().addAll(e.getEdit().getCells());
412          WALUtil.filterCells(walEdit, cell -> {
413            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
414            if (existColumnFamilies.contains(cf)) {
415              return cell;
416            } else {
417              missingCFs.add(cf);
418              return null;
419            }
420          });
421          if (!walEdit.isEmpty()) {
422            Entry newEntry = new Entry(e.getKey(), walEdit);
423            entries.add(newEntry);
424          }
425
426          if (!missingCFs.isEmpty()) {
427            // Would potentially be better to retry in one of the outer loops
428            // and add a table filter there; but that would break the encapsulation,
429            // so we're doing the filtering here.
430            LOG.warn(
431                "Missing column family detected at sink, local column family also does not exist,"
432                    + " filtering edits for table '{}',column family '{}'", tableName, missingCFs);
433          }
434        }
435        if (!entries.isEmpty()) {
436          entryList.add(entries);
437        }
438      }
439    } catch (IOException iox) {
440      LOG.warn("Exception when creating connection to check local table", iox);
441      return oldEntryList;
442    }
443    return entryList;
444  }
445
446  private void reconnectToPeerCluster() {
447    ClusterConnection connection = null;
448    try {
449      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
450    } catch (IOException ioe) {
451      LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe);
452    }
453    if (connection != null) {
454      this.conn = connection;
455    }
456  }
457
458  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
459      List<List<Entry>> batches) throws IOException {
460    int futures = 0;
461    for (int i = 0; i < batches.size(); i++) {
462      List<Entry> entries = batches.get(i);
463      if (!entries.isEmpty()) {
464        if (LOG.isTraceEnabled()) {
465          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
466            replicateContext.getSize());
467        }
468        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
469        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
470        futures++;
471      }
472    }
473
474    IOException iox = null;
475    long lastWriteTime = 0;
476    for (int i = 0; i < futures; i++) {
477      try {
478        // wait for all futures, remove successful parts
479        // (only the remaining parts will be retried)
480        Future<Integer> f = pool.take();
481        int index = f.get();
482        List<Entry> batch = batches.get(index);
483        batches.set(index, Collections.emptyList()); // remove successful batch
484        // Find the most recent write time in the batch
485        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
486        if (writeTime > lastWriteTime) {
487          lastWriteTime = writeTime;
488        }
489      } catch (InterruptedException ie) {
490        iox = new IOException(ie);
491      } catch (ExecutionException ee) {
492        iox = ee.getCause() instanceof IOException?
493          (IOException)ee.getCause(): new IOException(ee.getCause());
494      }
495    }
496    if (iox != null) {
497      // if we had any exceptions, try again
498      throw iox;
499    }
500    return lastWriteTime;
501  }
502
503  /**
504   * Do the shipping logic
505   */
506  @Override
507  public boolean replicate(ReplicateContext replicateContext) {
508    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
509    int sleepMultiplier = 1;
510
511    if (!peersSelected && this.isRunning()) {
512      connectToPeers();
513      peersSelected = true;
514    }
515
516    int numSinks = replicationSinkMgr.getNumSinks();
517    if (numSinks == 0) {
518      if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) {
519        LOG.warn(
520          "No replication sinks found, returning without replicating. "
521            + "The source should retry with the same set of edits. Not logging this again for "
522            + "the next {} seconds.", maxRetriesMultiplier);
523        lastSinkFetchTime = System.currentTimeMillis();
524      }
525      sleepForRetries("No sinks available at peer", sleepMultiplier);
526      return false;
527    }
528
529    List<List<Entry>> batches = createBatches(replicateContext.getEntries());
530    while (this.isRunning() && !exec.isShutdown()) {
531      if (!isPeerEnabled()) {
532        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
533          sleepMultiplier++;
534        }
535        continue;
536      }
537      if (this.conn == null || this.conn.isClosed()) {
538        reconnectToPeerCluster();
539      }
540      try {
541        // replicate the batches to sink side.
542        parallelReplicate(pool, replicateContext, batches);
543        return true;
544      } catch (IOException ioe) {
545        if (ioe instanceof RemoteException) {
546          if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
547            // Only filter the edits to replicate and don't change the entries in replicateContext
548            // as the upper layer rely on it.
549            batches = filterNotExistTableEdits(batches);
550            if (batches.isEmpty()) {
551              LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
552              return true;
553            }
554          } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
555            batches = filterNotExistColumnFamilyEdits(batches);
556            if (batches.isEmpty()) {
557              LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
558                      + "just return");
559              return true;
560            }
561          } else {
562            LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
563                ioe);
564            replicationSinkMgr.chooseSinks();
565          }
566        } else {
567          if (ioe instanceof SocketTimeoutException) {
568            // This exception means we waited for more than 60s and nothing
569            // happened, the cluster is alive and calling it right away
570            // even for a test just makes things worse.
571            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
572                  "call to the remote cluster timed out, which is usually " +
573                  "caused by a machine failure or a massive slowdown",
574              this.socketTimeoutMultiplier);
575          } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
576            LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
577            replicationSinkMgr.chooseSinks();
578          } else {
579            LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
580          }
581        }
582        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
583          sleepMultiplier++;
584        }
585      }
586    }
587    return false; // in case we exited before replicating
588  }
589
590  protected boolean isPeerEnabled() {
591    return ctx.getReplicationPeer().isPeerEnabled();
592  }
593
594  @Override
595  protected void doStop() {
596    disconnect(); // don't call super.doStop()
597    if (this.conn != null) {
598      try {
599        this.conn.close();
600        this.conn = null;
601      } catch (IOException e) {
602        LOG.warn("{} Failed to close the connection", logPeerId());
603      }
604    }
605    // Allow currently running replication tasks to finish
606    exec.shutdown();
607    try {
608      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
609    } catch (InterruptedException e) {
610    }
611    // Abort if the tasks did not terminate in time
612    if (!exec.isTerminated()) {
613      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
614          "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
615          "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
616      abortable.abort(errMsg, new IOException(errMsg));
617    }
618    notifyStopped();
619  }
620
621  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
622      throws IOException {
623    SinkPeer sinkPeer = null;
624    try {
625      int entriesHashCode = System.identityHashCode(entries);
626      if (LOG.isTraceEnabled()) {
627        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
628        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
629          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
630      }
631      sinkPeer = replicationSinkMgr.getReplicationSink();
632      BlockingInterface rrs = sinkPeer.getRegionServer();
633      try {
634        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
635          replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout);
636        if (LOG.isTraceEnabled()) {
637          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
638        }
639      } catch (IOException e) {
640        if (LOG.isTraceEnabled()) {
641          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
642        }
643        throw e;
644      }
645      replicationSinkMgr.reportSinkSuccess(sinkPeer);
646    } catch (IOException ioe) {
647      if (sinkPeer != null) {
648        replicationSinkMgr.reportBadSink(sinkPeer);
649      }
650      throw ioe;
651    }
652    return batchIndex;
653  }
654
655  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
656      throws IOException {
657    int batchSize = 0, index = 0;
658    List<Entry> batch = new ArrayList<>();
659    for (Entry entry : entries) {
660      int entrySize = getEstimatedEntrySize(entry);
661      if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
662        replicateEntries(batch, index++, timeout);
663        batch.clear();
664        batchSize = 0;
665      }
666      batch.add(entry);
667      batchSize += entrySize;
668    }
669    if (batchSize > 0) {
670      replicateEntries(batch, index, timeout);
671    }
672    return batchIndex;
673  }
674
675  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
676    return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
677        : () -> replicateEntries(entries, batchIndex, timeout);
678  }
679
680  private String logPeerId(){
681    return "[Source for peer " + this.ctx.getPeerId() + "]:";
682  }
683
684}