View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.CellScanner;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HBaseIOException;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HRegionLocation;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.RegionLocations;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.TableNotFoundException;
50  import org.apache.hadoop.hbase.TableDescriptors;
51  import org.apache.hadoop.hbase.client.ConnectionFactory;
52  import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
53  import org.apache.hadoop.hbase.client.ClusterConnection;
54  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
55  import org.apache.hadoop.hbase.client.RetryingCallable;
56  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
57  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
58  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
59  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
61  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
62  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
63  import org.apache.hadoop.hbase.wal.WALKey;
64  import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
65  import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
66  import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
67  import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
68  import org.apache.hadoop.hbase.wal.WAL.Entry;
69  import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
70  import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
71  import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
72  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
73  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
74  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
75  import org.apache.hadoop.hbase.replication.WALEntryFilter;
76  import org.apache.hadoop.hbase.util.Bytes;
77  import org.apache.hadoop.hbase.util.Pair;
78  import org.apache.hadoop.hbase.util.Threads;
79  import org.apache.hadoop.util.StringUtils;
80  
81  import com.google.common.cache.Cache;
82  import com.google.common.cache.CacheBuilder;
83  import com.google.common.collect.Lists;
84  import com.google.protobuf.ServiceException;
85  
86  /**
87   * A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the
88   * WAL, and sends the edits to replicas of regions.
89   */
90  @InterfaceAudience.Private
91  public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
92  
93    private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
94  
95    // Can be configured differently than hbase.client.retries.number
96    private static String CLIENT_RETRIES_NUMBER
97      = "hbase.region.replica.replication.client.retries.number";
98  
99    private Configuration conf;
100   private ClusterConnection connection;
101   private TableDescriptors tableDescriptors;
102 
103   // Reuse WALSplitter constructs as a WAL pipe
104   private PipelineController controller;
105   private RegionReplicaOutputSink outputSink;
106   private EntryBuffers entryBuffers;
107 
108   // Number of writer threads
109   private int numWriterThreads;
110 
111   private int operationTimeout;
112 
113   private ExecutorService pool;
114 
115   /**
116    * Skips the entries which has original seqId. Only entries persisted via distributed log replay
117    * have their original seq Id fields set.
118    */
119   private static class SkipReplayedEditsFilter extends BaseWALEntryFilter {
120     @Override
121     public Entry filter(Entry entry) {
122       // if orig seq id is set, skip replaying the entry
123       if (entry.getKey().getOrigLogSeqNum() > 0) {
124         return null;
125       }
126       return entry;
127     }
128   }
129 
130   @Override
131   public WALEntryFilter getWALEntryfilter() {
132     WALEntryFilter superFilter = super.getWALEntryfilter();
133     WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();
134 
135     if (superFilter == null) {
136       return skipReplayedEditsFilter;
137     }
138 
139     if (skipReplayedEditsFilter == null) {
140       return superFilter;
141     }
142 
143     ArrayList<WALEntryFilter> filters = Lists.newArrayList();
144     filters.add(superFilter);
145     filters.add(skipReplayedEditsFilter);
146     return new ChainWALEntryFilter(filters);
147   }
148 
149   protected WALEntryFilter getSkipReplayedEditsFilter() {
150     return new SkipReplayedEditsFilter();
151   }
152 
153   @Override
154   public void init(Context context) throws IOException {
155     super.init(context);
156 
157     this.conf = HBaseConfiguration.create(context.getConfiguration());
158     this.tableDescriptors = context.getTableDescriptors();
159 
160     // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
161     // We are resetting it here because we want default number of retries (35) rather than 10 times
162     // that which makes very long retries for disabled tables etc.
163     int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
164       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
165     if (defaultNumRetries > 10) {
166       int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
167       defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
168     }
169 
170     conf.setInt("hbase.client.serverside.retries.multiplier", 1);
171     int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
172     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
173 
174     this.numWriterThreads = this.conf.getInt(
175       "hbase.region.replica.replication.writer.threads", 3);
176     controller = new PipelineController();
177     entryBuffers = new EntryBuffers(controller,
178       this.conf.getInt("hbase.region.replica.replication.buffersize",
179           128*1024*1024));
180 
181     // use the regular RPC timeout for replica replication RPC's
182     this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
183       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
184   }
185 
186   @Override
187   protected void doStart() {
188     try {
189       connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
190       this.pool = getDefaultThreadPool(conf);
191       outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
192         connection, pool, numWriterThreads, operationTimeout);
193       outputSink.startWriterThreads();
194       super.doStart();
195     } catch (IOException ex) {
196       LOG.warn("Received exception while creating connection :" + ex);
197       notifyFailed(ex);
198     }
199   }
200 
201   @Override
202   protected void doStop() {
203     if (outputSink != null) {
204       try {
205         outputSink.finishWritingAndClose();
206       } catch (IOException ex) {
207         LOG.warn("Got exception while trying to close OutputSink");
208         LOG.warn(ex);
209       }
210     }
211     if (this.pool != null) {
212       this.pool.shutdownNow();
213       try {
214         // wait for 10 sec
215         boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
216         if (!shutdown) {
217           LOG.warn("Failed to shutdown the thread pool after 10 seconds");
218         }
219       } catch (InterruptedException e) {
220         LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
221       }
222     }
223     if (connection != null) {
224       try {
225         connection.close();
226       } catch (IOException ex) {
227         LOG.warn("Got exception closing connection :" + ex);
228       }
229     }
230     super.doStop();
231   }
232 
233   /**
234    * Returns a Thread pool for the RPC's to region replicas. Similar to
235    * Connection's thread pool.
236    */
237   private ExecutorService getDefaultThreadPool(Configuration conf) {
238     int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
239     int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
240     if (maxThreads == 0) {
241       maxThreads = Runtime.getRuntime().availableProcessors() * 8;
242     }
243     if (coreThreads == 0) {
244       coreThreads = Runtime.getRuntime().availableProcessors() * 8;
245     }
246     long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
247     LinkedBlockingQueue<Runnable> workQueue =
248         new LinkedBlockingQueue<Runnable>(maxThreads *
249             conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
250               HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
251     ThreadPoolExecutor tpe = new ThreadPoolExecutor(
252       coreThreads,
253       maxThreads,
254       keepAliveTime,
255       TimeUnit.SECONDS,
256       workQueue,
257       Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
258     tpe.allowCoreThreadTimeOut(true);
259     return tpe;
260   }
261 
262   @Override
263   public boolean replicate(ReplicateContext replicateContext) {
264     /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
265      *
266      * RRRE relies on batching from two different mechanisms. The first is the batching from
267      * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
268      * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
269      * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
270      * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
271      * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to
272      * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
273      * based on regions.
274      *
275      * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
276      * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
277      * The SinkWriter in this case will send the wal edits to all secondary region replicas in
278      * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
279      * being written to the sink, another buffer for the same region will not be made available to
280      * writers ensuring regions edits are not replayed out of order.
281      *
282      * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
283      * that the replication can assume all edits are persisted. We may be able to do a better
284      * pipelining between the replication thread and output sinks later if it becomes a bottleneck.
285      */
286 
287     while (this.isRunning()) {
288       try {
289         for (Entry entry: replicateContext.getEntries()) {
290           entryBuffers.appendEntry(entry);
291         }
292         outputSink.flush(); // make sure everything is flushed
293         ctx.getMetrics().incrLogEditsFiltered(
294           outputSink.getSkippedEditsCounter().getAndSet(0));
295         return true;
296       } catch (InterruptedException e) {
297         Thread.currentThread().interrupt();
298         return false;
299       } catch (IOException e) {
300         LOG.warn("Received IOException while trying to replicate"
301             + StringUtils.stringifyException(e));
302       }
303     }
304 
305     return false;
306   }
307 
308   @Override
309   public boolean canReplicateToSameCluster() {
310     return true;
311   }
312 
313   @Override
314   protected WALEntryFilter getScopeWALEntryFilter() {
315     // we do not care about scope. We replicate everything.
316     return null;
317   }
318 
319   static class RegionReplicaOutputSink extends OutputSink {
320     private final RegionReplicaSinkWriter sinkWriter;
321     private final TableDescriptors tableDescriptors;
322     private final Cache<TableName, Boolean> memstoreReplicationEnabled;
323 
324     public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
325         EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
326         int numWriters, int operationTimeout) {
327       super(controller, entryBuffers, numWriters);
328       this.sinkWriter =
329           new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
330       this.tableDescriptors = tableDescriptors;
331 
332       // A cache for the table "memstore replication enabled" flag.
333       // It has a default expiry of 5 sec. This means that if the table is altered
334       // with a different flag value, we might miss to replicate for that amount of
335       // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
336       int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
337         .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
338       this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
339         .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
340         .initialCapacity(10)
341         .maximumSize(1000)
342         .build();
343     }
344 
345     @Override
346     public void append(RegionEntryBuffer buffer) throws IOException {
347       List<Entry> entries = buffer.getEntryBuffer();
348 
349       if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
350         return;
351       }
352 
353       // meta edits (e.g. flush) are always replicated.
354       // data edits (e.g. put) are replicated if the table requires them.
355       if (!requiresReplication(buffer.getTableName(), entries)) {
356         return;
357       }
358 
359       sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
360         entries.get(0).getEdit().getCells().get(0).getRow(), entries);
361     }
362 
363     @Override
364     public boolean flush() throws IOException {
365       // nothing much to do for now. Wait for the Writer threads to finish up
366       // append()'ing the data.
367       entryBuffers.waitUntilDrained();
368       return super.flush();
369     }
370 
371     @Override
372     public boolean keepRegionEvent(Entry entry) {
373       return true;
374     }
375 
376     @Override
377     public List<Path> finishWritingAndClose() throws IOException {
378       finishWriting(true);
379       return null;
380     }
381 
382     @Override
383     public Map<byte[], Long> getOutputCounts() {
384       return null; // only used in tests
385     }
386 
387     @Override
388     public int getNumberOfRecoveredRegions() {
389       return 0;
390     }
391 
392     AtomicLong getSkippedEditsCounter() {
393       return skippedEdits;
394     }
395 
396     /**
397      * returns true if the specified entry must be replicated.
398      * We should always replicate meta operations (e.g. flush)
399      * and use the user HTD flag to decide whether or not replicate the memstore.
400      */
401     private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
402         throws IOException {
403       // unit-tests may not the TableDescriptors, bypass the check and always replicate
404       if (tableDescriptors == null) return true;
405 
406       Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
407       if (requiresReplication == null) {
408         // check if the table requires memstore replication
409         // some unit-test drop the table, so we should do a bypass check and always replicate.
410         HTableDescriptor htd = tableDescriptors.get(tableName);
411         requiresReplication = htd == null || htd.hasRegionMemstoreReplication();
412         memstoreReplicationEnabled.put(tableName, requiresReplication);
413       }
414 
415       // if memstore replication is not required, check the entries.
416       // meta edits (e.g. flush) must be always replicated.
417       if (!requiresReplication) {
418         int skipEdits = 0;
419         java.util.Iterator<Entry> it = entries.iterator();
420         while (it.hasNext()) {
421           Entry entry = it.next();
422           if (entry.getEdit().isMetaEdit()) {
423             requiresReplication = true;
424           } else {
425             it.remove();
426             skipEdits++;
427           }
428         }
429         skippedEdits.addAndGet(skipEdits);
430       }
431       return requiresReplication;
432     }
433   }
434 
435   static class RegionReplicaSinkWriter extends SinkWriter {
436     RegionReplicaOutputSink sink;
437     ClusterConnection connection;
438     RpcControllerFactory rpcControllerFactory;
439     RpcRetryingCallerFactory rpcRetryingCallerFactory;
440     int operationTimeout;
441     ExecutorService pool;
442     Cache<TableName, Boolean> disabledAndDroppedTables;
443     TableDescriptors tableDescriptors;
444 
445     public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
446         ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
447       this.sink = sink;
448       this.connection = connection;
449       this.operationTimeout = operationTimeout;
450       this.rpcRetryingCallerFactory
451         = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
452       this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
453       this.pool = pool;
454       this.tableDescriptors = tableDescriptors;
455 
456       int nonExistentTableCacheExpiryMs = connection.getConfiguration()
457         .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
458       // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
459       // table is created again with the same name, we might miss to replicate for that amount of
460       // time. But this cache prevents overloading meta requests for every edit from a deleted file.
461       disabledAndDroppedTables = CacheBuilder.newBuilder()
462         .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
463         .initialCapacity(10)
464         .maximumSize(1000)
465         .build();
466     }
467 
468     public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
469         List<Entry> entries) throws IOException {
470 
471       if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
472         if (LOG.isTraceEnabled()) {
473           LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
474             + " is cached as a disabled or dropped table");
475           for (Entry entry : entries) {
476             LOG.trace("Skipping : " + entry);
477           }
478         }
479         sink.getSkippedEditsCounter().addAndGet(entries.size());
480         return;
481       }
482 
483       // If the table is disabled or dropped, we should not replay the entries, and we can skip
484       // replaying them. However, we might not know whether the table is disabled until we
485       // invalidate the cache and check from meta
486       RegionLocations locations = null;
487       boolean useCache = true;
488       while (true) {
489         // get the replicas of the primary region
490         try {
491           locations = RegionReplicaReplayCallable
492               .getRegionLocations(connection, tableName, row, useCache, 0);
493 
494           if (locations == null) {
495             throw new HBaseIOException("Cannot locate locations for "
496                 + tableName + ", row:" + Bytes.toStringBinary(row));
497           }
498         } catch (TableNotFoundException e) {
499           if (LOG.isTraceEnabled()) {
500             LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
501               + " is dropped. Adding table to cache.");
502             for (Entry entry : entries) {
503               LOG.trace("Skipping : " + entry);
504             }
505           }
506           disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
507           // skip this entry
508           sink.getSkippedEditsCounter().addAndGet(entries.size());
509           return;
510         }
511 
512         // check whether we should still replay this entry. If the regions are changed, or the
513         // entry is not coming from the primary region, filter it out.
514         HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
515         if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
516           encodedRegionName)) {
517           if (useCache) {
518             useCache = false;
519             continue; // this will retry location lookup
520           }
521           if (LOG.isTraceEnabled()) {
522             LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
523               + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
524               + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
525               + " from WALEdit");
526             for (Entry entry : entries) {
527               LOG.trace("Skipping : " + entry);
528             }
529           }
530           sink.getSkippedEditsCounter().addAndGet(entries.size());
531           return;
532         }
533         break;
534       }
535 
536       if (locations.size() == 1) {
537         return;
538       }
539 
540       ArrayList<Future<ReplicateWALEntryResponse>> tasks
541         = new ArrayList<Future<ReplicateWALEntryResponse>>(locations.size() - 1);
542 
543       // All passed entries should belong to one region because it is coming from the EntryBuffers
544       // split per region. But the regions might split and merge (unlike log recovery case).
545       for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
546         HRegionLocation location = locations.getRegionLocation(replicaId);
547         if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
548           HRegionInfo regionInfo = location == null
549               ? RegionReplicaUtil.getRegionInfoForReplica(
550                 locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
551               : location.getRegionInfo();
552           RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
553             rpcControllerFactory, tableName, location, regionInfo, row, entries,
554             sink.getSkippedEditsCounter());
555            Future<ReplicateWALEntryResponse> task = pool.submit(
556              new RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory,
557                  callable, operationTimeout));
558            tasks.add(task);
559         }
560       }
561 
562       boolean tasksCancelled = false;
563       for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
564         try {
565           tasks.get(replicaId).get();
566         } catch (InterruptedException e) {
567           throw new InterruptedIOException(e.getMessage());
568         } catch (ExecutionException e) {
569           Throwable cause = e.getCause();
570           boolean canBeSkipped = false;
571           if (cause instanceof IOException) {
572             // The table can be disabled or dropped at this time. For disabled tables, we have no
573             // cheap mechanism to detect this case because meta does not contain this information.
574             // HConnection.isTableDisabled() is a zk call which we cannot do for every replay RPC.
575             // So instead we start the replay RPC with retries and
576             // check whether the table is dropped or disabled which might cause
577             // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
578             if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
579               disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
580               canBeSkipped = true;
581             } else if (tableDescriptors != null) {
582               HTableDescriptor tableDescriptor = tableDescriptors.get(tableName);
583               if (tableDescriptor != null
584                   // (replicaId + 1) as no task is added for primary replica for replication
585                   && tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
586                 canBeSkipped = true;
587               }
588             }
589             if (canBeSkipped) {
590               if (LOG.isTraceEnabled()) {
591                 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
592                     + " because received exception for dropped or disabled table",
593                   cause);
594                 for (Entry entry : entries) {
595                   LOG.trace("Skipping : " + entry);
596                 }
597               }
598 
599               if (!tasksCancelled) {
600                 sink.getSkippedEditsCounter().addAndGet(entries.size());
601                 tasksCancelled = true; // so that we do not add to skipped counter again
602               }
603               continue;
604             }
605             // otherwise rethrow
606             throw (IOException)cause;
607           }
608           // unexpected exception
609           throw new IOException(cause);
610         }
611       }
612     }
613   }
614 
615   static class RetryingRpcCallable<V> implements Callable<V> {
616     RpcRetryingCallerFactory factory;
617     RetryingCallable<V> callable;
618     int timeout;
619     public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
620         int timeout) {
621       this.factory = factory;
622       this.callable = callable;
623       this.timeout = timeout;
624     }
625     @Override
626     public V call() throws Exception {
627       return factory.<V>newCaller().callWithRetries(callable, timeout);
628     }
629   }
630 
631   /**
632    * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
633    * the entry if the region boundaries have changed or the region is gone.
634    */
635   static class RegionReplicaReplayCallable
636     extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
637 
638     private final List<Entry> entries;
639     private final byte[] initialEncodedRegionName;
640     private final AtomicLong skippedEntries;
641 
642     public RegionReplicaReplayCallable(ClusterConnection connection,
643         RpcControllerFactory rpcControllerFactory, TableName tableName,
644         HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<Entry> entries,
645         AtomicLong skippedEntries) {
646       super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
647       this.entries = entries;
648       this.skippedEntries = skippedEntries;
649       this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
650     }
651 
652     @Override
653     public ReplicateWALEntryResponse call(int timeout) throws IOException {
654       return replayToServer(this.entries, timeout);
655     }
656 
657     private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
658         throws IOException {
659       // check whether we should still replay this entry. If the regions are changed, or the
660       // entry is not coming form the primary region, filter it out because we do not need it.
661       // Regions can change because of (1) region split (2) region merge (3) table recreated
662       boolean skip = false;
663 
664       if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
665         initialEncodedRegionName)) {
666         skip = true;
667       }
668       if (!entries.isEmpty() && !skip) {
669         Entry[] entriesArray = new Entry[entries.size()];
670         entriesArray = entries.toArray(entriesArray);
671 
672         // set the region name for the target region replica
673         Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
674             ReplicationProtbufUtil.buildReplicateWALEntryRequest(
675               entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
676         try {
677           PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
678           controller.setCallTimeout(timeout);
679           controller.setPriority(tableName);
680           return stub.replay(controller, p.getFirst());
681         } catch (ServiceException se) {
682           throw ProtobufUtil.getRemoteException(se);
683         }
684       }
685 
686       if (skip) {
687         if (LOG.isTraceEnabled()) {
688           LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
689             + " because located region " + location.getRegionInfo().getEncodedName()
690             + " is different than the original region "
691             + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
692           for (Entry entry : entries) {
693             LOG.trace("Skipping : " + entry);
694           }
695         }
696         skippedEntries.addAndGet(entries.size());
697       }
698       return ReplicateWALEntryResponse.newBuilder().build();
699     }
700   }
701 }