001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Future;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.CellScanner;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseIOException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.RegionLocations;
044import org.apache.hadoop.hbase.TableDescriptors;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.TableNotFoundException;
047import org.apache.hadoop.hbase.client.ClusterConnection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.RetryingCallable;
053import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.ipc.HBaseRpcController;
056import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
057import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
058import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
059import org.apache.hadoop.hbase.replication.WALEntryFilter;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.apache.hadoop.hbase.wal.WAL.Entry;
064import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
065import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
066import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
067import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
068import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
069import org.apache.hadoop.util.StringUtils;
070import org.apache.yetus.audience.InterfaceAudience;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
074import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
077
078/**
079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
080 * which receives the WAL edits from the WAL, and sends the edits to replicas
081 * of regions.
082 */
083@InterfaceAudience.Private
084public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
085
086  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
087
088  // Can be configured differently than hbase.client.retries.number
089  private static String CLIENT_RETRIES_NUMBER
090    = "hbase.region.replica.replication.client.retries.number";
091
092  private Configuration conf;
093  private ClusterConnection connection;
094  private TableDescriptors tableDescriptors;
095
096  // Reuse WALSplitter constructs as a WAL pipe
097  private PipelineController controller;
098  private RegionReplicaOutputSink outputSink;
099  private EntryBuffers entryBuffers;
100
101  // Number of writer threads
102  private int numWriterThreads;
103
104  private int operationTimeout;
105
106  private ExecutorService pool;
107
108  @Override
109  public void init(Context context) throws IOException {
110    super.init(context);
111
112    this.conf = HBaseConfiguration.create(context.getConfiguration());
113    this.tableDescriptors = context.getTableDescriptors();
114
115    // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
116    // We are resetting it here because we want default number of retries (35) rather than 10 times
117    // that which makes very long retries for disabled tables etc.
118    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
119      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
120    if (defaultNumRetries > 10) {
121      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
122        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
123      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
124    }
125
126    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
127    int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
128    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
129
130    this.numWriterThreads = this.conf.getInt(
131      "hbase.region.replica.replication.writer.threads", 3);
132    controller = new PipelineController();
133    entryBuffers = new EntryBuffers(controller,
134        this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024));
135
136    // use the regular RPC timeout for replica replication RPC's
137    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
138      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
139  }
140
141  @Override
142  protected void doStart() {
143    try {
144      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
145      this.pool = getDefaultThreadPool(conf);
146      outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
147        connection, pool, numWriterThreads, operationTimeout);
148      outputSink.startWriterThreads();
149      super.doStart();
150    } catch (IOException ex) {
151      LOG.warn("Received exception while creating connection :" + ex);
152      notifyFailed(ex);
153    }
154  }
155
156  @Override
157  protected void doStop() {
158    if (outputSink != null) {
159      try {
160        outputSink.finishWritingAndClose();
161      } catch (IOException ex) {
162        LOG.warn("Got exception while trying to close OutputSink", ex);
163      }
164    }
165    if (this.pool != null) {
166      this.pool.shutdownNow();
167      try {
168        // wait for 10 sec
169        boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
170        if (!shutdown) {
171          LOG.warn("Failed to shutdown the thread pool after 10 seconds");
172        }
173      } catch (InterruptedException e) {
174        LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
175      }
176    }
177    if (connection != null) {
178      try {
179        connection.close();
180      } catch (IOException ex) {
181        LOG.warn("Got exception closing connection :" + ex);
182      }
183    }
184    super.doStop();
185  }
186
187  /**
188   * Returns a Thread pool for the RPC's to region replicas. Similar to
189   * Connection's thread pool.
190   */
191  private ExecutorService getDefaultThreadPool(Configuration conf) {
192    int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
193    if (maxThreads == 0) {
194      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
195    }
196    long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
197    LinkedBlockingQueue<Runnable> workQueue =
198        new LinkedBlockingQueue<>(maxThreads *
199            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
200              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
201    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
202      maxThreads,
203      maxThreads,
204      keepAliveTime,
205      TimeUnit.SECONDS,
206      workQueue,
207      Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
208    tpe.allowCoreThreadTimeOut(true);
209    return tpe;
210  }
211
212  @Override
213  public boolean replicate(ReplicateContext replicateContext) {
214    /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
215     *
216     * RRRE relies on batching from two different mechanisms. The first is the batching from
217     * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
218     * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
219     * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
220     * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
221     * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to
222     * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
223     * based on regions.
224     *
225     * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
226     * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
227     * The SinkWriter in this case will send the wal edits to all secondary region replicas in
228     * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
229     * being written to the sink, another buffer for the same region will not be made available to
230     * writers ensuring regions edits are not replayed out of order.
231     *
232     * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
233     * that the replication can assume all edits are persisted. We may be able to do a better
234     * pipelining between the replication thread and output sinks later if it becomes a bottleneck.
235     */
236
237    while (this.isRunning()) {
238      try {
239        for (Entry entry: replicateContext.getEntries()) {
240          entryBuffers.appendEntry(entry);
241        }
242        outputSink.flush(); // make sure everything is flushed
243        ctx.getMetrics().incrLogEditsFiltered(
244          outputSink.getSkippedEditsCounter().getAndSet(0));
245        return true;
246      } catch (InterruptedException e) {
247        Thread.currentThread().interrupt();
248        return false;
249      } catch (IOException e) {
250        LOG.warn("Received IOException while trying to replicate"
251            + StringUtils.stringifyException(e));
252      }
253    }
254
255    return false;
256  }
257
258  @Override
259  public boolean canReplicateToSameCluster() {
260    return true;
261  }
262
263  @Override
264  protected WALEntryFilter getScopeWALEntryFilter() {
265    // we do not care about scope. We replicate everything.
266    return null;
267  }
268
269  static class RegionReplicaOutputSink extends OutputSink {
270    private final RegionReplicaSinkWriter sinkWriter;
271    private final TableDescriptors tableDescriptors;
272    private final Cache<TableName, Boolean> memstoreReplicationEnabled;
273
274    public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
275        EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
276        int numWriters, int operationTimeout) {
277      super(controller, entryBuffers, numWriters);
278      this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
279      this.tableDescriptors = tableDescriptors;
280
281      // A cache for the table "memstore replication enabled" flag.
282      // It has a default expiry of 5 sec. This means that if the table is altered
283      // with a different flag value, we might miss to replicate for that amount of
284      // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
285      int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
286        .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
287      this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
288        .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
289        .initialCapacity(10)
290        .maximumSize(1000)
291        .build();
292    }
293
294    @Override
295    public void append(RegionEntryBuffer buffer) throws IOException {
296      List<Entry> entries = buffer.getEntryBuffer();
297
298      if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
299        return;
300      }
301
302      // meta edits (e.g. flush) are always replicated.
303      // data edits (e.g. put) are replicated if the table requires them.
304      if (!requiresReplication(buffer.getTableName(), entries)) {
305        return;
306      }
307
308      sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
309        CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
310    }
311
312    @Override
313    public boolean flush() throws IOException {
314      // nothing much to do for now. Wait for the Writer threads to finish up
315      // append()'ing the data.
316      entryBuffers.waitUntilDrained();
317      return super.flush();
318    }
319
320    @Override
321    public boolean keepRegionEvent(Entry entry) {
322      return true;
323    }
324
325    @Override
326    public List<Path> finishWritingAndClose() throws IOException {
327      finishWriting(true);
328      return null;
329    }
330
331    @Override
332    public Map<byte[], Long> getOutputCounts() {
333      return null; // only used in tests
334    }
335
336    @Override
337    public int getNumberOfRecoveredRegions() {
338      return 0;
339    }
340
341    AtomicLong getSkippedEditsCounter() {
342      return skippedEdits;
343    }
344
345    /**
346     * returns true if the specified entry must be replicated.
347     * We should always replicate meta operations (e.g. flush)
348     * and use the user HTD flag to decide whether or not replicate the memstore.
349     */
350    private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
351        throws IOException {
352      // unit-tests may not the TableDescriptors, bypass the check and always replicate
353      if (tableDescriptors == null) return true;
354
355      Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
356      if (requiresReplication == null) {
357        // check if the table requires memstore replication
358        // some unit-test drop the table, so we should do a bypass check and always replicate.
359        TableDescriptor htd = tableDescriptors.get(tableName);
360        requiresReplication = htd == null || htd.hasRegionMemStoreReplication();
361        memstoreReplicationEnabled.put(tableName, requiresReplication);
362      }
363
364      // if memstore replication is not required, check the entries.
365      // meta edits (e.g. flush) must be always replicated.
366      if (!requiresReplication) {
367        int skipEdits = 0;
368        java.util.Iterator<Entry> it = entries.iterator();
369        while (it.hasNext()) {
370          Entry entry = it.next();
371          if (entry.getEdit().isMetaEdit()) {
372            requiresReplication = true;
373          } else {
374            it.remove();
375            skipEdits++;
376          }
377        }
378        skippedEdits.addAndGet(skipEdits);
379      }
380      return requiresReplication;
381    }
382  }
383
384  static class RegionReplicaSinkWriter extends SinkWriter {
385    RegionReplicaOutputSink sink;
386    ClusterConnection connection;
387    RpcControllerFactory rpcControllerFactory;
388    RpcRetryingCallerFactory rpcRetryingCallerFactory;
389    int operationTimeout;
390    ExecutorService pool;
391    Cache<TableName, Boolean> disabledAndDroppedTables;
392
393    public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
394        ExecutorService pool, int operationTimeout) {
395      this.sink = sink;
396      this.connection = connection;
397      this.operationTimeout = operationTimeout;
398      this.rpcRetryingCallerFactory
399        = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
400      this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
401      this.pool = pool;
402
403      int nonExistentTableCacheExpiryMs = connection.getConfiguration()
404        .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
405      // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
406      // table is created again with the same name, we might miss to replicate for that amount of
407      // time. But this cache prevents overloading meta requests for every edit from a deleted file.
408      disabledAndDroppedTables = CacheBuilder.newBuilder()
409        .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
410        .initialCapacity(10)
411        .maximumSize(1000)
412        .build();
413    }
414
415    public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
416        List<Entry> entries) throws IOException {
417
418      if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
419        if (LOG.isTraceEnabled()) {
420          LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
421            + " is cached as a disabled or dropped table");
422          for (Entry entry : entries) {
423            LOG.trace("Skipping : " + entry);
424          }
425        }
426        sink.getSkippedEditsCounter().addAndGet(entries.size());
427        return;
428      }
429
430      // If the table is disabled or dropped, we should not replay the entries, and we can skip
431      // replaying them. However, we might not know whether the table is disabled until we
432      // invalidate the cache and check from meta
433      RegionLocations locations = null;
434      boolean useCache = true;
435      while (true) {
436        // get the replicas of the primary region
437        try {
438          locations = RegionReplicaReplayCallable
439              .getRegionLocations(connection, tableName, row, useCache, 0);
440
441          if (locations == null) {
442            throw new HBaseIOException("Cannot locate locations for "
443                + tableName + ", row:" + Bytes.toStringBinary(row));
444          }
445        } catch (TableNotFoundException e) {
446          if (LOG.isTraceEnabled()) {
447            LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
448              + " is dropped. Adding table to cache.");
449            for (Entry entry : entries) {
450              LOG.trace("Skipping : " + entry);
451            }
452          }
453          disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
454          // skip this entry
455          sink.getSkippedEditsCounter().addAndGet(entries.size());
456          return;
457        }
458
459        // check whether we should still replay this entry. If the regions are changed, or the
460        // entry is not coming from the primary region, filter it out.
461        HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
462        if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
463          encodedRegionName)) {
464          if (useCache) {
465            useCache = false;
466            continue; // this will retry location lookup
467          }
468          if (LOG.isTraceEnabled()) {
469            LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
470              + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
471              + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
472              + " from WALEdit");
473            for (Entry entry : entries) {
474              LOG.trace("Skipping : " + entry);
475            }
476          }
477          sink.getSkippedEditsCounter().addAndGet(entries.size());
478          return;
479        }
480        break;
481      }
482
483      if (locations.size() == 1) {
484        return;
485      }
486
487      ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1);
488
489      // All passed entries should belong to one region because it is coming from the EntryBuffers
490      // split per region. But the regions might split and merge (unlike log recovery case).
491      for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
492        HRegionLocation location = locations.getRegionLocation(replicaId);
493        if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
494          RegionInfo regionInfo = location == null
495              ? RegionReplicaUtil.getRegionInfoForReplica(
496                locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
497              : location.getRegionInfo();
498          RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
499            rpcControllerFactory, tableName, location, regionInfo, row, entries,
500            sink.getSkippedEditsCounter());
501           Future<ReplicateWALEntryResponse> task = pool.submit(
502             new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout));
503           tasks.add(task);
504        }
505      }
506
507      boolean tasksCancelled = false;
508      for (Future<ReplicateWALEntryResponse> task : tasks) {
509        try {
510          task.get();
511        } catch (InterruptedException e) {
512          throw new InterruptedIOException(e.getMessage());
513        } catch (ExecutionException e) {
514          Throwable cause = e.getCause();
515          if (cause instanceof IOException) {
516            // The table can be disabled or dropped at this time. For disabled tables, we have no
517            // cheap mechanism to detect this case because meta does not contain this information.
518            // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
519            // RPC. So instead we start the replay RPC with retries and check whether the table is
520            // dropped or disabled which might cause SocketTimeoutException, or
521            // RetriesExhaustedException or similar if we get IOE.
522            if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
523              if (LOG.isTraceEnabled()) {
524                LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
525                  + " because received exception for dropped or disabled table", cause);
526                for (Entry entry : entries) {
527                  LOG.trace("Skipping : " + entry);
528                }
529              }
530              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
531              if (!tasksCancelled) {
532                sink.getSkippedEditsCounter().addAndGet(entries.size());
533                tasksCancelled = true; // so that we do not add to skipped counter again
534              }
535              continue;
536            }
537            // otherwise rethrow
538            throw (IOException)cause;
539          }
540          // unexpected exception
541          throw new IOException(cause);
542        }
543      }
544    }
545  }
546
547  static class RetryingRpcCallable<V> implements Callable<V> {
548    RpcRetryingCallerFactory factory;
549    RetryingCallable<V> callable;
550    int timeout;
551    public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
552        int timeout) {
553      this.factory = factory;
554      this.callable = callable;
555      this.timeout = timeout;
556    }
557    @Override
558    public V call() throws Exception {
559      return factory.<V>newCaller().callWithRetries(callable, timeout);
560    }
561  }
562
563  /**
564   * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
565   * the entry if the region boundaries have changed or the region is gone.
566   */
567  static class RegionReplicaReplayCallable extends
568      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
569    private final List<Entry> entries;
570    private final byte[] initialEncodedRegionName;
571    private final AtomicLong skippedEntries;
572
573    public RegionReplicaReplayCallable(ClusterConnection connection,
574        RpcControllerFactory rpcControllerFactory, TableName tableName,
575        HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries,
576        AtomicLong skippedEntries) {
577      super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
578      this.entries = entries;
579      this.skippedEntries = skippedEntries;
580      this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
581    }
582
583    @Override
584    public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception {
585      // Check whether we should still replay this entry. If the regions are changed, or the
586      // entry is not coming form the primary region, filter it out because we do not need it.
587      // Regions can change because of (1) region split (2) region merge (3) table recreated
588      boolean skip = false;
589      if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
590          initialEncodedRegionName)) {
591        skip = true;
592      }
593      if (!this.entries.isEmpty() && !skip) {
594        Entry[] entriesArray = new Entry[this.entries.size()];
595        entriesArray = this.entries.toArray(entriesArray);
596
597        // set the region name for the target region replica
598        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
599            ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
600                .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
601        controller.setCellScanner(p.getSecond());
602        return stub.replay(controller, p.getFirst());
603      }
604
605      if (skip) {
606        if (LOG.isTraceEnabled()) {
607          LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
608            + " because located region " + location.getRegionInfo().getEncodedName()
609            + " is different than the original region "
610            + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
611          for (Entry entry : entries) {
612            LOG.trace("Skipping : " + entry);
613          }
614        }
615        skippedEntries.addAndGet(entries.size());
616      }
617      return ReplicateWALEntryResponse.newBuilder().build();
618    }
619  }
620}