001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.net.ConnectException;
022import java.net.SocketTimeoutException;
023import java.net.UnknownHostException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.concurrent.CompletableFuture;
033import java.util.stream.Collectors;
034import java.util.stream.Stream;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.TableNotFoundException;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.ConnectionFactory;
045import org.apache.hadoop.hbase.ipc.CallTimeoutException;
046import org.apache.hadoop.hbase.ipc.RpcServer;
047import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
048import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
049import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
050import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
051import org.apache.hadoop.hbase.replication.ReplicationUtils;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FutureUtils;
056import org.apache.hadoop.hbase.wal.WAL.Entry;
057import org.apache.hadoop.hbase.wal.WALEdit;
058import org.apache.hadoop.ipc.RemoteException;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
064import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
065
066/**
067 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
068 * to another HBase cluster. For the slave cluster it selects a random number of peers using a
069 * replication ratio. For example, if replication ration = 0.1 and slave cluster has 100 region
070 * servers, 10 will be selected.
071 * <p>
072 * A stream is considered down when we cannot contact a region server on the peer cluster for more
073 * than 55 seconds by default.
074 * </p>
075 */
076@InterfaceAudience.Private
077public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
078  private static final Logger LOG =
079    LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
080
081  /** Drop edits for tables that been deleted from the replication source and target */
082  public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
083    "hbase.replication.drop.on.deleted.table";
084  /** Drop edits for CFs that been deleted from the replication source and target */
085  public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
086    "hbase.replication.drop.on.deleted.columnfamily";
087
088  // How long should we sleep for each retry
089  private long sleepForRetries;
090  // Maximum number of retries before taking bold actions
091  private int maxRetriesMultiplier;
092  // Socket timeouts require even bolder actions since we don't want to DDOS
093  private int socketTimeoutMultiplier;
094  // Size limit for replication RPCs, in bytes
095  private int replicationRpcLimit;
096  // Metrics for this source
097  private MetricsSource metrics;
098  private boolean peersSelected = false;
099  private String replicationClusterId = "";
100  private int maxThreads;
101  private Path baseNamespaceDir;
102  private Path hfileArchiveDir;
103  private boolean replicationBulkLoadDataEnabled;
104  private boolean dropOnDeletedTables;
105  private boolean dropOnDeletedColumnFamilies;
106  private boolean isSerial = false;
107  // Initialising as 0 to guarantee at least one logging message
108  private long lastSinkFetchTime = 0;
109
110  @Override
111  public void init(Context context) throws IOException {
112    super.init(context);
113    decorateConf();
114    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
115    this.socketTimeoutMultiplier =
116      this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
117    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
118    this.metrics = context.getMetrics();
119    // per sink thread pool
120    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
121      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
122    // Set the size limit for replication RPCs to 95% of the max request size.
123    // We could do with less slop if we have an accurate estimate of encoded size. Being
124    // conservative for now.
125    this.replicationRpcLimit =
126      (int) (0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE));
127    this.dropOnDeletedTables = this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
128    this.dropOnDeletedColumnFamilies =
129      this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false);
130
131    this.replicationBulkLoadDataEnabled = conf.getBoolean(
132      HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
133    if (this.replicationBulkLoadDataEnabled) {
134      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
135    }
136    // Construct base namespace directory and hfile archive directory path
137    Path rootDir = CommonFSUtils.getRootDir(conf);
138    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
139    baseNamespaceDir = new Path(rootDir, baseNSDir);
140    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
141    isSerial = context.getPeerConfig().isSerial();
142  }
143
144  private void decorateConf() {
145    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
146    if (StringUtils.isNotEmpty(replicationCodec)) {
147      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
148    }
149  }
150
151  private void connectToPeers() {
152    int sleepMultiplier = 1;
153    // Connect to peer cluster first, unless we have to stop
154    while (this.isRunning() && getNumSinks() == 0) {
155      chooseSinks();
156      if (this.isRunning() && getNumSinks() == 0) {
157        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
158          sleepMultiplier++;
159        }
160      }
161    }
162  }
163
164  /**
165   * Do the sleeping logic
166   * @param msg             Why we sleep
167   * @param sleepMultiplier by how many times the default sleeping time is augmented
168   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
169   */
170  private boolean sleepForRetries(String msg, int sleepMultiplier) {
171    try {
172      if (LOG.isTraceEnabled()) {
173        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
174          sleepMultiplier);
175      }
176      Thread.sleep(this.sleepForRetries * sleepMultiplier);
177    } catch (InterruptedException e) {
178      Thread.currentThread().interrupt();
179      if (LOG.isDebugEnabled()) {
180        LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId());
181      }
182    }
183    return sleepMultiplier < maxRetriesMultiplier;
184  }
185
186  private int getEstimatedEntrySize(Entry e) {
187    long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
188    return (int) size;
189  }
190
191  private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
192    int numSinks = Math.max(getNumSinks(), 1);
193    int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
194    List<List<Entry>> entryLists =
195      Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
196    int[] sizes = new int[n];
197    for (Entry e : entries) {
198      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
199      int entrySize = getEstimatedEntrySize(e);
200      // If this batch has at least one entry and is over sized, move it to the tail of list and
201      // initialize the entryLists[index] to be a empty list.
202      if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
203        entryLists.add(entryLists.get(index));
204        entryLists.set(index, new ArrayList<>());
205        sizes[index] = 0;
206      }
207      entryLists.get(index).add(e);
208      sizes[index] += entrySize;
209    }
210    return entryLists;
211  }
212
213  private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
214    Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
215    for (Entry e : entries) {
216      regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
217        .add(e);
218    }
219    return new ArrayList<>(regionEntries.values());
220  }
221
222  /**
223   * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
224   * concurrently. Note that, for serial replication, we need to make sure that entries from the
225   * same region to be replicated serially, so entries from the same region consist of a batch, and
226   * we will divide a batch into several batches by replicationRpcLimit in method
227   * serialReplicateRegionEntries()
228   */
229  private List<List<Entry>> createBatches(final List<Entry> entries) {
230    if (isSerial) {
231      return createSerialBatches(entries);
232    } else {
233      return createParallelBatches(entries);
234    }
235  }
236
237  /**
238   * Check if there's an {@link TableNotFoundException} in the caused by stacktrace.
239   */
240  public static boolean isTableNotFoundException(Throwable io) {
241    if (io instanceof RemoteException) {
242      io = ((RemoteException) io).unwrapRemoteException();
243    }
244    if (io != null && io.getMessage().contains("TableNotFoundException")) {
245      return true;
246    }
247    for (; io != null; io = io.getCause()) {
248      if (io instanceof TableNotFoundException) {
249        return true;
250      }
251    }
252    return false;
253  }
254
255  /**
256   * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace.
257   */
258  public static boolean isNoSuchColumnFamilyException(Throwable io) {
259    if (io instanceof RemoteException) {
260      io = ((RemoteException) io).unwrapRemoteException();
261    }
262    if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) {
263      return true;
264    }
265    for (; io != null; io = io.getCause()) {
266      if (io instanceof NoSuchColumnFamilyException) {
267        return true;
268      }
269    }
270    return false;
271  }
272
273  List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) {
274    List<List<Entry>> entryList = new ArrayList<>();
275    Map<TableName, Boolean> existMap = new HashMap<>();
276    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
277      Admin localAdmin = localConn.getAdmin()) {
278      for (List<Entry> oldEntries : oldEntryList) {
279        List<Entry> entries = new ArrayList<>();
280        for (Entry e : oldEntries) {
281          TableName tableName = e.getKey().getTableName();
282          boolean exist = true;
283          if (existMap.containsKey(tableName)) {
284            exist = existMap.get(tableName);
285          } else {
286            try {
287              exist = localAdmin.tableExists(tableName);
288              existMap.put(tableName, exist);
289            } catch (IOException iox) {
290              LOG.warn("Exception checking for local table " + tableName, iox);
291              // we can't drop edits without full assurance, so we assume table exists.
292              exist = true;
293            }
294          }
295          if (exist) {
296            entries.add(e);
297          } else {
298            // Would potentially be better to retry in one of the outer loops
299            // and add a table filter there; but that would break the encapsulation,
300            // so we're doing the filtering here.
301            LOG.warn("Missing table detected at sink, local table also does not exist, "
302              + "filtering edits for table '{}'", tableName);
303          }
304        }
305        if (!entries.isEmpty()) {
306          entryList.add(entries);
307        }
308      }
309    } catch (IOException iox) {
310      LOG.warn("Exception when creating connection to check local table", iox);
311      return oldEntryList;
312    }
313    return entryList;
314  }
315
316  List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) {
317    List<List<Entry>> entryList = new ArrayList<>();
318    Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>();
319    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
320      Admin localAdmin = localConn.getAdmin()) {
321      for (List<Entry> oldEntries : oldEntryList) {
322        List<Entry> entries = new ArrayList<>();
323        for (Entry e : oldEntries) {
324          TableName tableName = e.getKey().getTableName();
325          if (!existColumnFamilyMap.containsKey(tableName)) {
326            try {
327              Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream()
328                .map(Bytes::toString).collect(Collectors.toSet());
329              existColumnFamilyMap.put(tableName, cfs);
330            } catch (Exception ex) {
331              LOG.warn("Exception getting cf names for local table {}", tableName, ex);
332              // if catch any exception, we are not sure about table's description,
333              // so replicate raw entry
334              entries.add(e);
335              continue;
336            }
337          }
338
339          Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName);
340          Set<String> missingCFs = new HashSet<>();
341          WALEdit walEdit = new WALEdit();
342          walEdit.getCells().addAll(e.getEdit().getCells());
343          WALUtil.filterCells(walEdit, cell -> {
344            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
345            if (existColumnFamilies.contains(cf)) {
346              return cell;
347            } else {
348              missingCFs.add(cf);
349              return null;
350            }
351          });
352          if (!walEdit.isEmpty()) {
353            Entry newEntry = new Entry(e.getKey(), walEdit);
354            entries.add(newEntry);
355          }
356
357          if (!missingCFs.isEmpty()) {
358            // Would potentially be better to retry in one of the outer loops
359            // and add a table filter there; but that would break the encapsulation,
360            // so we're doing the filtering here.
361            LOG.warn(
362              "Missing column family detected at sink, local column family also does not exist,"
363                + " filtering edits for table '{}',column family '{}'",
364              tableName, missingCFs);
365          }
366        }
367        if (!entries.isEmpty()) {
368          entryList.add(entries);
369        }
370      }
371    } catch (IOException iox) {
372      LOG.warn("Exception when creating connection to check local table", iox);
373      return oldEntryList;
374    }
375    return entryList;
376  }
377
378  private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
379    throws IOException {
380    List<CompletableFuture<Integer>> futures =
381      new ArrayList<CompletableFuture<Integer>>(batches.size());
382    for (int i = 0; i < batches.size(); i++) {
383      List<Entry> entries = batches.get(i);
384      if (entries.isEmpty()) {
385        continue;
386      }
387      if (LOG.isTraceEnabled()) {
388        LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
389          replicateContext.getSize());
390      }
391      // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
392      futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
393    }
394
395    IOException iox = null;
396    long lastWriteTime = 0;
397
398    for (CompletableFuture<Integer> f : futures) {
399      try {
400        // wait for all futures, remove successful parts
401        // (only the remaining parts will be retried)
402        int index = FutureUtils.get(f);
403        List<Entry> batch = batches.get(index);
404        batches.set(index, Collections.emptyList()); // remove successful batch
405        // Find the most recent write time in the batch
406        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
407        if (writeTime > lastWriteTime) {
408          lastWriteTime = writeTime;
409        }
410      } catch (IOException e) {
411        iox = e;
412      } catch (RuntimeException e) {
413        iox = new IOException(e);
414      }
415    }
416    if (iox != null) {
417      // if we had any exceptions, try again
418      throw iox;
419    }
420    return lastWriteTime;
421  }
422
423  /**
424   * Do the shipping logic
425   */
426  @Override
427  public boolean replicate(ReplicateContext replicateContext) {
428    int sleepMultiplier = 1;
429    int initialTimeout = replicateContext.getTimeout();
430
431    if (!peersSelected && this.isRunning()) {
432      connectToPeers();
433      peersSelected = true;
434    }
435
436    int numSinks = getNumSinks();
437    if (numSinks == 0) {
438      if (
439        (EnvironmentEdgeManager.currentTime() - lastSinkFetchTime) >= (maxRetriesMultiplier * 1000)
440      ) {
441        LOG.warn("No replication sinks found, returning without replicating. "
442          + "The source should retry with the same set of edits. Not logging this again for "
443          + "the next {} seconds.", maxRetriesMultiplier);
444        lastSinkFetchTime = EnvironmentEdgeManager.currentTime();
445      }
446      sleepForRetries("No sinks available at peer", sleepMultiplier);
447      return false;
448    }
449
450    List<List<Entry>> batches = createBatches(replicateContext.getEntries());
451    while (this.isRunning()) {
452      if (!isPeerEnabled()) {
453        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
454          sleepMultiplier++;
455        }
456        continue;
457      }
458      try {
459        // replicate the batches to sink side.
460        parallelReplicate(replicateContext, batches);
461        return true;
462      } catch (IOException ioe) {
463        if (ioe instanceof RemoteException) {
464          if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
465            // Only filter the edits to replicate and don't change the entries in replicateContext
466            // as the upper layer rely on it.
467            batches = filterNotExistTableEdits(batches);
468            if (batches.isEmpty()) {
469              LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
470              return true;
471            }
472          } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
473            batches = filterNotExistColumnFamilyEdits(batches);
474            if (batches.isEmpty()) {
475              LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
476                + "just return");
477              return true;
478            }
479          } else {
480            LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
481              ioe);
482            chooseSinks();
483          }
484        } else {
485          if (ioe instanceof SocketTimeoutException) {
486            // This exception means we waited for more than 60s and nothing
487            // happened, the cluster is alive and calling it right away
488            // even for a test just makes things worse.
489            sleepForRetries(
490              "Encountered a SocketTimeoutException. Since the "
491                + "call to the remote cluster timed out, which is usually "
492                + "caused by a machine failure or a massive slowdown",
493              this.socketTimeoutMultiplier);
494          } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
495            LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
496            chooseSinks();
497          } else if (ioe instanceof CallTimeoutException) {
498            replicateContext
499              .setTimeout(ReplicationUtils.getAdaptiveTimeout(initialTimeout, sleepMultiplier));
500          } else {
501            LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
502          }
503        }
504        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
505          sleepMultiplier++;
506        }
507      }
508    }
509    return false; // in case we exited before replicating
510  }
511
512  protected boolean isPeerEnabled() {
513    return ctx.getReplicationPeer().isPeerEnabled();
514  }
515
516  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
517    int timeout) {
518    int entriesHashCode = System.identityHashCode(entries);
519    if (LOG.isTraceEnabled()) {
520      long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
521      LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
522        entriesHashCode, entries.size(), size, replicationClusterId);
523    }
524    SinkPeer sinkPeer = null;
525    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
526    try {
527      sinkPeer = getReplicationSink();
528    } catch (IOException e) {
529      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
530      resultCompletableFuture.completeExceptionally(e);
531      return resultCompletableFuture;
532    }
533    assert sinkPeer != null;
534    AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
535    final SinkPeer sinkPeerToUse = sinkPeer;
536    FutureUtils.addListener(
537      ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
538        replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
539      (response, exception) -> {
540        if (exception != null) {
541          onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
542          resultCompletableFuture.completeExceptionally(exception);
543          return;
544        }
545        reportSinkSuccess(sinkPeerToUse);
546        resultCompletableFuture.complete(batchIndex);
547      });
548    return resultCompletableFuture;
549  }
550
551  private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
552    final SinkPeer sinkPeer) {
553    if (LOG.isTraceEnabled()) {
554      LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
555    }
556    if (exception instanceof IOException) {
557      if (sinkPeer != null) {
558        reportBadSink(sinkPeer);
559      }
560    }
561  }
562
563  /**
564   * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
565   * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
566   * send the next batch, until we send all entries out.
567   */
568  private CompletableFuture<Integer> serialReplicateRegionEntries(
569    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
570    if (!walEntryPeekingIterator.hasNext()) {
571      return CompletableFuture.completedFuture(batchIndex);
572    }
573    int batchSize = 0;
574    List<Entry> batch = new ArrayList<>();
575    while (walEntryPeekingIterator.hasNext()) {
576      Entry entry = walEntryPeekingIterator.peek();
577      int entrySize = getEstimatedEntrySize(entry);
578      if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
579        break;
580      }
581      walEntryPeekingIterator.next();
582      batch.add(entry);
583      batchSize += entrySize;
584    }
585
586    if (batchSize <= 0) {
587      return CompletableFuture.completedFuture(batchIndex);
588    }
589    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
590    FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
591      if (exception != null) {
592        resultCompletableFuture.completeExceptionally(exception);
593        return;
594      }
595      if (!walEntryPeekingIterator.hasNext()) {
596        resultCompletableFuture.complete(batchIndex);
597        return;
598      }
599      FutureUtils.addListener(
600        serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
601        (currentResponse, currentException) -> {
602          if (currentException != null) {
603            resultCompletableFuture.completeExceptionally(currentException);
604            return;
605          }
606          resultCompletableFuture.complete(batchIndex);
607        });
608    });
609    return resultCompletableFuture;
610  }
611
612  /**
613   * Replicate entries to peer cluster by async API.
614   */
615  protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
616    int timeout) {
617    return isSerial
618      ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
619        timeout)
620      : replicateEntries(entries, batchIndex, timeout);
621  }
622
623  private String logPeerId() {
624    return "[Source for peer " + this.ctx.getPeerId() + "]:";
625  }
626}