001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.net.ConnectException;
022import java.net.SocketTimeoutException;
023import java.net.UnknownHostException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.concurrent.CompletableFuture;
033import java.util.stream.Collectors;
034import java.util.stream.Stream;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.TableNotFoundException;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.ConnectionFactory;
045import org.apache.hadoop.hbase.ipc.CallTimeoutException;
046import org.apache.hadoop.hbase.ipc.RpcServer;
047import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
048import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
049import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
050import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
051import org.apache.hadoop.hbase.replication.ReplicationUtils;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FutureUtils;
056import org.apache.hadoop.hbase.wal.WAL.Entry;
057import org.apache.hadoop.hbase.wal.WALEdit;
058import org.apache.hadoop.ipc.RemoteException;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
064import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
065
066/**
067 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
068 * to another HBase cluster. For the slave cluster it selects a random number of peers using a
069 * replication ratio. For example, if replication ration = 0.1 and slave cluster has 100 region
070 * servers, 10 will be selected.
071 * <p>
072 * A stream is considered down when we cannot contact a region server on the peer cluster for more
073 * than 55 seconds by default.
074 * </p>
075 */
076@InterfaceAudience.Private
077public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
078  private static final Logger LOG =
079    LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
080
081  /** Drop edits for tables that been deleted from the replication source and target */
082  public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
083    "hbase.replication.drop.on.deleted.table";
084  /** Drop edits for CFs that been deleted from the replication source and target */
085  public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
086    "hbase.replication.drop.on.deleted.columnfamily";
087
088  // How long should we sleep for each retry
089  private long sleepForRetries;
090  // Maximum number of retries before taking bold actions
091  private int maxRetriesMultiplier;
092  // Socket timeouts require even bolder actions since we don't want to DDOS
093  private int socketTimeoutMultiplier;
094  // Size limit for replication RPCs, in bytes
095  private int replicationRpcLimit;
096  // Metrics for this source
097  private MetricsSource metrics;
098  private boolean peersSelected = false;
099  private String replicationClusterId = "";
100  private int maxThreads;
101  private Path baseNamespaceDir;
102  private Path hfileArchiveDir;
103  private boolean replicationBulkLoadDataEnabled;
104  private boolean dropOnDeletedTables;
105  private boolean dropOnDeletedColumnFamilies;
106  private boolean isSerial = false;
107  // Initialising as 0 to guarantee at least one logging message
108  private long lastSinkFetchTime = 0;
109  private volatile boolean stopping = false;
110
111  @Override
112  public void init(Context context) throws IOException {
113    super.init(context);
114    decorateConf();
115    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
116    this.socketTimeoutMultiplier =
117      this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
118    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
119    this.metrics = context.getMetrics();
120    // per sink thread pool
121    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
122      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
123    // Set the size limit for replication RPCs to 95% of the max request size.
124    // We could do with less slop if we have an accurate estimate of encoded size. Being
125    // conservative for now.
126    this.replicationRpcLimit =
127      (int) (0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE));
128    this.dropOnDeletedTables = this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
129    this.dropOnDeletedColumnFamilies =
130      this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false);
131
132    this.replicationBulkLoadDataEnabled = conf.getBoolean(
133      HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
134    if (this.replicationBulkLoadDataEnabled) {
135      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
136    }
137    // Construct base namespace directory and hfile archive directory path
138    Path rootDir = CommonFSUtils.getRootDir(conf);
139    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
140    baseNamespaceDir = new Path(rootDir, baseNSDir);
141    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
142    isSerial = context.getPeerConfig().isSerial();
143  }
144
145  private void decorateConf() {
146    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
147    if (StringUtils.isNotEmpty(replicationCodec)) {
148      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
149    }
150  }
151
152  private void connectToPeers() {
153    int sleepMultiplier = 1;
154    // Connect to peer cluster first, unless we have to stop
155    while (this.isRunning() && getNumSinks() == 0) {
156      chooseSinks();
157      if (this.isRunning() && getNumSinks() == 0) {
158        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
159          sleepMultiplier++;
160        }
161      }
162    }
163  }
164
165  /**
166   * Do the sleeping logic
167   * @param msg             Why we sleep
168   * @param sleepMultiplier by how many times the default sleeping time is augmented
169   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
170   */
171  private boolean sleepForRetries(String msg, int sleepMultiplier) {
172    try {
173      if (LOG.isTraceEnabled()) {
174        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
175          sleepMultiplier);
176      }
177      Thread.sleep(this.sleepForRetries * sleepMultiplier);
178    } catch (InterruptedException e) {
179      Thread.currentThread().interrupt();
180      if (LOG.isDebugEnabled()) {
181        LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId());
182      }
183    }
184    return sleepMultiplier < maxRetriesMultiplier;
185  }
186
187  private int getEstimatedEntrySize(Entry e) {
188    long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
189    return (int) size;
190  }
191
192  private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
193    int numSinks = Math.max(getNumSinks(), 1);
194    int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
195    List<List<Entry>> entryLists =
196      Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
197    int[] sizes = new int[n];
198    for (Entry e : entries) {
199      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
200      int entrySize = getEstimatedEntrySize(e);
201      // If this batch has at least one entry and is over sized, move it to the tail of list and
202      // initialize the entryLists[index] to be a empty list.
203      if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
204        entryLists.add(entryLists.get(index));
205        entryLists.set(index, new ArrayList<>());
206        sizes[index] = 0;
207      }
208      entryLists.get(index).add(e);
209      sizes[index] += entrySize;
210    }
211    return entryLists;
212  }
213
214  private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
215    Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
216    for (Entry e : entries) {
217      regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
218        .add(e);
219    }
220    return new ArrayList<>(regionEntries.values());
221  }
222
223  /**
224   * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
225   * concurrently. Note that, for serial replication, we need to make sure that entries from the
226   * same region to be replicated serially, so entries from the same region consist of a batch, and
227   * we will divide a batch into several batches by replicationRpcLimit in method
228   * serialReplicateRegionEntries()
229   */
230  private List<List<Entry>> createBatches(final List<Entry> entries) {
231    if (isSerial) {
232      return createSerialBatches(entries);
233    } else {
234      return createParallelBatches(entries);
235    }
236  }
237
238  /**
239   * Check if there's an {@link TableNotFoundException} in the caused by stacktrace.
240   */
241  public static boolean isTableNotFoundException(Throwable io) {
242    if (io instanceof RemoteException) {
243      io = ((RemoteException) io).unwrapRemoteException();
244    }
245    if (io != null && io.getMessage().contains("TableNotFoundException")) {
246      return true;
247    }
248    for (; io != null; io = io.getCause()) {
249      if (io instanceof TableNotFoundException) {
250        return true;
251      }
252    }
253    return false;
254  }
255
256  /**
257   * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace.
258   */
259  public static boolean isNoSuchColumnFamilyException(Throwable io) {
260    if (io instanceof RemoteException) {
261      io = ((RemoteException) io).unwrapRemoteException();
262    }
263    if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) {
264      return true;
265    }
266    for (; io != null; io = io.getCause()) {
267      if (io instanceof NoSuchColumnFamilyException) {
268        return true;
269      }
270    }
271    return false;
272  }
273
274  List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) {
275    List<List<Entry>> entryList = new ArrayList<>();
276    Map<TableName, Boolean> existMap = new HashMap<>();
277    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
278      Admin localAdmin = localConn.getAdmin()) {
279      for (List<Entry> oldEntries : oldEntryList) {
280        List<Entry> entries = new ArrayList<>();
281        for (Entry e : oldEntries) {
282          TableName tableName = e.getKey().getTableName();
283          boolean exist = true;
284          if (existMap.containsKey(tableName)) {
285            exist = existMap.get(tableName);
286          } else {
287            try {
288              exist = localAdmin.tableExists(tableName);
289              existMap.put(tableName, exist);
290            } catch (IOException iox) {
291              LOG.warn("Exception checking for local table " + tableName, iox);
292              // we can't drop edits without full assurance, so we assume table exists.
293              exist = true;
294            }
295          }
296          if (exist) {
297            entries.add(e);
298          } else {
299            // Would potentially be better to retry in one of the outer loops
300            // and add a table filter there; but that would break the encapsulation,
301            // so we're doing the filtering here.
302            LOG.warn("Missing table detected at sink, local table also does not exist, "
303              + "filtering edits for table '{}'", tableName);
304          }
305        }
306        if (!entries.isEmpty()) {
307          entryList.add(entries);
308        }
309      }
310    } catch (IOException iox) {
311      LOG.warn("Exception when creating connection to check local table", iox);
312      return oldEntryList;
313    }
314    return entryList;
315  }
316
317  List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) {
318    List<List<Entry>> entryList = new ArrayList<>();
319    Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>();
320    try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration());
321      Admin localAdmin = localConn.getAdmin()) {
322      for (List<Entry> oldEntries : oldEntryList) {
323        List<Entry> entries = new ArrayList<>();
324        for (Entry e : oldEntries) {
325          TableName tableName = e.getKey().getTableName();
326          if (!existColumnFamilyMap.containsKey(tableName)) {
327            try {
328              Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream()
329                .map(Bytes::toString).collect(Collectors.toSet());
330              existColumnFamilyMap.put(tableName, cfs);
331            } catch (Exception ex) {
332              LOG.warn("Exception getting cf names for local table {}", tableName, ex);
333              // if catch any exception, we are not sure about table's description,
334              // so replicate raw entry
335              entries.add(e);
336              continue;
337            }
338          }
339
340          Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName);
341          Set<String> missingCFs = new HashSet<>();
342          WALEdit walEdit = new WALEdit();
343          walEdit.getCells().addAll(e.getEdit().getCells());
344          WALUtil.filterCells(walEdit, cell -> {
345            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
346            if (existColumnFamilies.contains(cf)) {
347              return cell;
348            } else {
349              missingCFs.add(cf);
350              return null;
351            }
352          });
353          if (!walEdit.isEmpty()) {
354            Entry newEntry = new Entry(e.getKey(), walEdit);
355            entries.add(newEntry);
356          }
357
358          if (!missingCFs.isEmpty()) {
359            // Would potentially be better to retry in one of the outer loops
360            // and add a table filter there; but that would break the encapsulation,
361            // so we're doing the filtering here.
362            LOG.warn(
363              "Missing column family detected at sink, local column family also does not exist,"
364                + " filtering edits for table '{}',column family '{}'",
365              tableName, missingCFs);
366          }
367        }
368        if (!entries.isEmpty()) {
369          entryList.add(entries);
370        }
371      }
372    } catch (IOException iox) {
373      LOG.warn("Exception when creating connection to check local table", iox);
374      return oldEntryList;
375    }
376    return entryList;
377  }
378
379  private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
380    throws IOException {
381    List<CompletableFuture<Integer>> futures =
382      new ArrayList<CompletableFuture<Integer>>(batches.size());
383    for (int i = 0; i < batches.size(); i++) {
384      List<Entry> entries = batches.get(i);
385      if (entries.isEmpty()) {
386        continue;
387      }
388      if (LOG.isTraceEnabled()) {
389        LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
390          replicateContext.getSize());
391      }
392      // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
393      futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
394    }
395
396    IOException iox = null;
397    long lastWriteTime = 0;
398
399    for (CompletableFuture<Integer> f : futures) {
400      try {
401        // wait for all futures, remove successful parts
402        // (only the remaining parts will be retried)
403        int index = FutureUtils.get(f);
404        List<Entry> batch = batches.get(index);
405        batches.set(index, Collections.emptyList()); // remove successful batch
406        // Find the most recent write time in the batch
407        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
408        if (writeTime > lastWriteTime) {
409          lastWriteTime = writeTime;
410        }
411      } catch (IOException e) {
412        iox = e;
413      } catch (RuntimeException e) {
414        iox = new IOException(e);
415      }
416    }
417    if (iox != null) {
418      // if we had any exceptions, try again
419      throw iox;
420    }
421    return lastWriteTime;
422  }
423
424  /**
425   * Do the shipping logic
426   */
427  @Override
428  public boolean replicate(ReplicateContext replicateContext) {
429    int sleepMultiplier = 1;
430    int initialTimeout = replicateContext.getTimeout();
431
432    if (!peersSelected && this.isRunning()) {
433      connectToPeers();
434      peersSelected = true;
435    }
436
437    int numSinks = getNumSinks();
438    if (numSinks == 0) {
439      if (
440        (EnvironmentEdgeManager.currentTime() - lastSinkFetchTime) >= (maxRetriesMultiplier * 1000)
441      ) {
442        LOG.warn("No replication sinks found, returning without replicating. "
443          + "The source should retry with the same set of edits. Not logging this again for "
444          + "the next {} seconds.", maxRetriesMultiplier);
445        lastSinkFetchTime = EnvironmentEdgeManager.currentTime();
446      }
447      sleepForRetries("No sinks available at peer", sleepMultiplier);
448      return false;
449    }
450
451    List<List<Entry>> batches = createBatches(replicateContext.getEntries());
452    while (this.isRunning() && !this.stopping) {
453      if (!isPeerEnabled()) {
454        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
455          sleepMultiplier++;
456        }
457        continue;
458      }
459      try {
460        // replicate the batches to sink side.
461        parallelReplicate(replicateContext, batches);
462        return true;
463      } catch (IOException ioe) {
464        if (ioe instanceof RemoteException) {
465          if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
466            // Only filter the edits to replicate and don't change the entries in replicateContext
467            // as the upper layer rely on it.
468            batches = filterNotExistTableEdits(batches);
469            if (batches.isEmpty()) {
470              LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
471              return true;
472            }
473          } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) {
474            batches = filterNotExistColumnFamilyEdits(batches);
475            if (batches.isEmpty()) {
476              LOG.warn("After filter not exist column family's edits, 0 edits to replicate, "
477                + "just return");
478              return true;
479            }
480          } else {
481            LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
482              ioe);
483            chooseSinks();
484          }
485        } else {
486          if (ioe instanceof SocketTimeoutException) {
487            // This exception means we waited for more than 60s and nothing
488            // happened, the cluster is alive and calling it right away
489            // even for a test just makes things worse.
490            sleepForRetries(
491              "Encountered a SocketTimeoutException. Since the "
492                + "call to the remote cluster timed out, which is usually "
493                + "caused by a machine failure or a massive slowdown",
494              this.socketTimeoutMultiplier);
495          } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
496            LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
497            chooseSinks();
498          } else if (ioe instanceof CallTimeoutException) {
499            replicateContext
500              .setTimeout(ReplicationUtils.getAdaptiveTimeout(initialTimeout, sleepMultiplier));
501          } else {
502            LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
503          }
504        }
505        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
506          sleepMultiplier++;
507        }
508      }
509    }
510    return false; // in case we exited before replicating
511  }
512
513  protected boolean isPeerEnabled() {
514    return ctx.getReplicationPeer().isPeerEnabled();
515  }
516
517  @Override
518  protected void doStop() {
519    // Allow currently running replication tasks to finish
520    this.stopping = true;
521    disconnect(); // don't call super.doStop()
522    notifyStopped();
523  }
524
525  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
526    int timeout) {
527    int entriesHashCode = System.identityHashCode(entries);
528    if (LOG.isTraceEnabled()) {
529      long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
530      LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
531        entriesHashCode, entries.size(), size, replicationClusterId);
532    }
533    SinkPeer sinkPeer = null;
534    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
535    try {
536      sinkPeer = getReplicationSink();
537    } catch (IOException e) {
538      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
539      resultCompletableFuture.completeExceptionally(e);
540      return resultCompletableFuture;
541    }
542    assert sinkPeer != null;
543    AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
544    final SinkPeer sinkPeerToUse = sinkPeer;
545    FutureUtils.addListener(
546      ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
547        replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
548      (response, exception) -> {
549        if (exception != null) {
550          onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
551          resultCompletableFuture.completeExceptionally(exception);
552          return;
553        }
554        reportSinkSuccess(sinkPeerToUse);
555        resultCompletableFuture.complete(batchIndex);
556      });
557    return resultCompletableFuture;
558  }
559
560  private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
561    final SinkPeer sinkPeer) {
562    if (LOG.isTraceEnabled()) {
563      LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
564    }
565    if (exception instanceof IOException) {
566      if (sinkPeer != null) {
567        reportBadSink(sinkPeer);
568      }
569    }
570  }
571
572  /**
573   * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
574   * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
575   * send the next batch, until we send all entries out.
576   */
577  private CompletableFuture<Integer> serialReplicateRegionEntries(
578    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
579    if (!walEntryPeekingIterator.hasNext()) {
580      return CompletableFuture.completedFuture(batchIndex);
581    }
582    int batchSize = 0;
583    List<Entry> batch = new ArrayList<>();
584    while (walEntryPeekingIterator.hasNext()) {
585      Entry entry = walEntryPeekingIterator.peek();
586      int entrySize = getEstimatedEntrySize(entry);
587      if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
588        break;
589      }
590      walEntryPeekingIterator.next();
591      batch.add(entry);
592      batchSize += entrySize;
593    }
594
595    if (batchSize <= 0) {
596      return CompletableFuture.completedFuture(batchIndex);
597    }
598    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
599    FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
600      if (exception != null) {
601        resultCompletableFuture.completeExceptionally(exception);
602        return;
603      }
604      if (!walEntryPeekingIterator.hasNext()) {
605        resultCompletableFuture.complete(batchIndex);
606        return;
607      }
608      FutureUtils.addListener(
609        serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
610        (currentResponse, currentException) -> {
611          if (currentException != null) {
612            resultCompletableFuture.completeExceptionally(currentException);
613            return;
614          }
615          resultCompletableFuture.complete(batchIndex);
616        });
617    });
618    return resultCompletableFuture;
619  }
620
621  /**
622   * Replicate entries to peer cluster by async API.
623   */
624  protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
625    int timeout) {
626    return isSerial
627      ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
628        timeout)
629      : replicateEntries(entries, batchIndex, timeout);
630  }
631
632  private String logPeerId() {
633    return "[Source for peer " + this.ctx.getPeerId() + "]:";
634  }
635}