001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Future;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.CellScanner;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.RegionLocations;
043import org.apache.hadoop.hbase.TableDescriptors;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.TableNotFoundException;
046import org.apache.hadoop.hbase.client.ClusterConnection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionReplicaUtil;
051import org.apache.hadoop.hbase.client.RetryingCallable;
052import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.ipc.HBaseRpcController;
055import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
056import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
057import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
058import org.apache.hadoop.hbase.replication.WALEntryFilter;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.hbase.util.Threads;
062import org.apache.hadoop.hbase.wal.EntryBuffers;
063import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
064import org.apache.hadoop.hbase.wal.OutputSink;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
067import org.apache.hadoop.util.StringUtils;
068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
074import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
075
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
078
079/**
080 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
081 * which receives the WAL edits from the WAL, and sends the edits to replicas
082 * of regions.
083 */
084@InterfaceAudience.Private
085public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
086
087  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
088
089  // Can be configured differently than hbase.client.retries.number
090  private static String CLIENT_RETRIES_NUMBER
091    = "hbase.region.replica.replication.client.retries.number";
092
093  private Configuration conf;
094  private ClusterConnection connection;
095  private TableDescriptors tableDescriptors;
096
097  // Reuse WALSplitter constructs as a WAL pipe
098  private PipelineController controller;
099  private RegionReplicaOutputSink outputSink;
100  private EntryBuffers entryBuffers;
101
102  // Number of writer threads
103  private int numWriterThreads;
104
105  private int operationTimeout;
106
107  private ExecutorService pool;
108
109  @Override
110  public void init(Context context) throws IOException {
111    super.init(context);
112
113    this.conf = HBaseConfiguration.create(context.getConfiguration());
114    this.tableDescriptors = context.getTableDescriptors();
115
116    // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
117    // We are resetting it here because we want default number of retries (35) rather than 10 times
118    // that which makes very long retries for disabled tables etc.
119    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
120      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
121    if (defaultNumRetries > 10) {
122      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
123        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
124      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
125    }
126
127    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
128    int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
129    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
130
131    this.numWriterThreads = this.conf.getInt(
132      "hbase.region.replica.replication.writer.threads", 3);
133    controller = new PipelineController();
134    entryBuffers = new EntryBuffers(controller,
135        this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024));
136
137    // use the regular RPC timeout for replica replication RPC's
138    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
139      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
140  }
141
142  @Override
143  protected void doStart() {
144    try {
145      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
146      this.pool = getDefaultThreadPool(conf);
147      outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
148        connection, pool, numWriterThreads, operationTimeout);
149      outputSink.startWriterThreads();
150      super.doStart();
151    } catch (IOException ex) {
152      LOG.warn("Received exception while creating connection :" + ex);
153      notifyFailed(ex);
154    }
155  }
156
157  @Override
158  protected void doStop() {
159    if (outputSink != null) {
160      try {
161        outputSink.close();
162      } catch (IOException ex) {
163        LOG.warn("Got exception while trying to close OutputSink", ex);
164      }
165    }
166    if (this.pool != null) {
167      this.pool.shutdownNow();
168      try {
169        // wait for 10 sec
170        boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
171        if (!shutdown) {
172          LOG.warn("Failed to shutdown the thread pool after 10 seconds");
173        }
174      } catch (InterruptedException e) {
175        LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
176      }
177    }
178    if (connection != null) {
179      try {
180        connection.close();
181      } catch (IOException ex) {
182        LOG.warn("Got exception closing connection :" + ex);
183      }
184    }
185    super.doStop();
186  }
187
188  /**
189   * Returns a Thread pool for the RPC's to region replicas. Similar to
190   * Connection's thread pool.
191   */
192  private ExecutorService getDefaultThreadPool(Configuration conf) {
193    int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
194    if (maxThreads == 0) {
195      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
196    }
197    long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
198    LinkedBlockingQueue<Runnable> workQueue =
199        new LinkedBlockingQueue<>(maxThreads *
200            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
201              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
202    ThreadPoolExecutor tpe =
203      new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
204        new ThreadFactoryBuilder()
205          .setNameFormat(this.getClass().getSimpleName() + "-rpc-shared-pool-%d")
206          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
207    tpe.allowCoreThreadTimeOut(true);
208    return tpe;
209  }
210
211  @Override
212  public boolean replicate(ReplicateContext replicateContext) {
213    /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
214     *
215     * RRRE relies on batching from two different mechanisms. The first is the batching from
216     * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
217     * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
218     * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
219     * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
220     * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to
221     * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
222     * based on regions.
223     *
224     * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
225     * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
226     * The SinkWriter in this case will send the wal edits to all secondary region replicas in
227     * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
228     * being written to the sink, another buffer for the same region will not be made available to
229     * writers ensuring regions edits are not replayed out of order.
230     *
231     * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
232     * that the replication can assume all edits are persisted. We may be able to do a better
233     * pipelining between the replication thread and output sinks later if it becomes a bottleneck.
234     */
235
236    while (this.isRunning()) {
237      try {
238        for (Entry entry: replicateContext.getEntries()) {
239          entryBuffers.appendEntry(entry);
240        }
241        outputSink.flush(); // make sure everything is flushed
242        ctx.getMetrics().incrLogEditsFiltered(
243          outputSink.getSkippedEditsCounter().getAndSet(0));
244        return true;
245      } catch (InterruptedException e) {
246        Thread.currentThread().interrupt();
247        return false;
248      } catch (IOException e) {
249        LOG.warn("Received IOException while trying to replicate"
250            + StringUtils.stringifyException(e));
251        outputSink.restartWriterThreadsIfNeeded();
252      }
253    }
254
255    return false;
256  }
257
258  @Override
259  public boolean canReplicateToSameCluster() {
260    return true;
261  }
262
263  @Override
264  protected WALEntryFilter getScopeWALEntryFilter() {
265    // we do not care about scope. We replicate everything.
266    return null;
267  }
268
269  static class RegionReplicaOutputSink extends OutputSink {
270    private final RegionReplicaSinkWriter sinkWriter;
271    private final TableDescriptors tableDescriptors;
272    private final Cache<TableName, Boolean> memstoreReplicationEnabled;
273
274    public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
275        EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
276        int numWriters, int operationTimeout) {
277      super(controller, entryBuffers, numWriters);
278      this.sinkWriter =
279          new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
280      this.tableDescriptors = tableDescriptors;
281
282      // A cache for the table "memstore replication enabled" flag.
283      // It has a default expiry of 5 sec. This means that if the table is altered
284      // with a different flag value, we might miss to replicate for that amount of
285      // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
286      int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
287        .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
288      this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
289        .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
290        .initialCapacity(10)
291        .maximumSize(1000)
292        .build();
293    }
294
295    @Override
296    public void append(RegionEntryBuffer buffer) throws IOException {
297      List<Entry> entries = buffer.getEntries();
298
299      if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
300        return;
301      }
302
303      // meta edits (e.g. flush) are always replicated.
304      // data edits (e.g. put) are replicated if the table requires them.
305      if (!requiresReplication(buffer.getTableName(), entries)) {
306        return;
307      }
308
309      sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
310        CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
311    }
312
313    void flush() throws IOException {
314      // nothing much to do for now. Wait for the Writer threads to finish up
315      // append()'ing the data.
316      entryBuffers.waitUntilDrained();
317    }
318
319    @Override
320    public boolean keepRegionEvent(Entry entry) {
321      return true;
322    }
323
324    @Override
325    public List<Path> close() throws IOException {
326      finishWriterThreads(true);
327      return null;
328    }
329
330    @Override
331    public Map<String, Long> getOutputCounts() {
332      return null; // only used in tests
333    }
334
335    @Override
336    public int getNumberOfRecoveredRegions() {
337      return 0;
338    }
339
340    AtomicLong getSkippedEditsCounter() {
341      return totalSkippedEdits;
342    }
343
344    /**
345     * returns true if the specified entry must be replicated.
346     * We should always replicate meta operations (e.g. flush)
347     * and use the user HTD flag to decide whether or not replicate the memstore.
348     */
349    private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
350        throws IOException {
351      // unit-tests may not the TableDescriptors, bypass the check and always replicate
352      if (tableDescriptors == null) return true;
353
354      Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
355      if (requiresReplication == null) {
356        // check if the table requires memstore replication
357        // some unit-test drop the table, so we should do a bypass check and always replicate.
358        TableDescriptor htd = tableDescriptors.get(tableName);
359        requiresReplication = htd == null || htd.hasRegionMemStoreReplication();
360        memstoreReplicationEnabled.put(tableName, requiresReplication);
361      }
362
363      // if memstore replication is not required, check the entries.
364      // meta edits (e.g. flush) must be always replicated.
365      if (!requiresReplication) {
366        int skipEdits = 0;
367        java.util.Iterator<Entry> it = entries.iterator();
368        while (it.hasNext()) {
369          Entry entry = it.next();
370          if (entry.getEdit().isMetaEdit()) {
371            requiresReplication = true;
372          } else {
373            it.remove();
374            skipEdits++;
375          }
376        }
377        totalSkippedEdits.addAndGet(skipEdits);
378      }
379      return requiresReplication;
380    }
381
382    @Override
383    protected int getNumOpenWriters() {
384      // TODO Auto-generated method stub
385      return 0;
386    }
387  }
388
389  static class RegionReplicaSinkWriter {
390    RegionReplicaOutputSink sink;
391    ClusterConnection connection;
392    RpcControllerFactory rpcControllerFactory;
393    RpcRetryingCallerFactory rpcRetryingCallerFactory;
394    int operationTimeout;
395    ExecutorService pool;
396    Cache<TableName, Boolean> disabledAndDroppedTables;
397    TableDescriptors tableDescriptors;
398
399    public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
400        ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
401      this.sink = sink;
402      this.connection = connection;
403      this.operationTimeout = operationTimeout;
404      this.rpcRetryingCallerFactory
405        = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
406      this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
407      this.pool = pool;
408      this.tableDescriptors = tableDescriptors;
409
410      int nonExistentTableCacheExpiryMs = connection.getConfiguration()
411        .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
412      // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
413      // table is created again with the same name, we might miss to replicate for that amount of
414      // time. But this cache prevents overloading meta requests for every edit from a deleted file.
415      disabledAndDroppedTables = CacheBuilder.newBuilder()
416        .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
417        .initialCapacity(10)
418        .maximumSize(1000)
419        .build();
420    }
421
422    public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
423        List<Entry> entries) throws IOException {
424
425      if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
426        if (LOG.isTraceEnabled()) {
427          LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
428            + " is cached as a disabled or dropped table");
429          for (Entry entry : entries) {
430            LOG.trace("Skipping : " + entry);
431          }
432        }
433        sink.getSkippedEditsCounter().addAndGet(entries.size());
434        return;
435      }
436
437      // If the table is disabled or dropped, we should not replay the entries, and we can skip
438      // replaying them. However, we might not know whether the table is disabled until we
439      // invalidate the cache and check from meta
440      RegionLocations locations = null;
441      boolean useCache = true;
442      while (true) {
443        // get the replicas of the primary region
444        try {
445          locations = RegionReplicaReplayCallable
446              .getRegionLocations(connection, tableName, row, useCache, 0);
447          if (locations == null) {
448            throw new HBaseIOException("Cannot locate locations for "
449                + tableName + ", row:" + Bytes.toStringBinary(row));
450          }
451          // Replicas can take a while to come online. The cache may have only the primary. If we
452          // keep going to the cache, we will not learn of the replicas and their locations after
453          // they come online.
454          if (useCache && locations.size() == 1 && TableName.isMetaTableName(tableName)) {
455            if (tableDescriptors.get(tableName).getRegionReplication() > 1) {
456              // Make an obnoxious log here. See how bad this issue is. Add a timer if happening
457              // too much.
458              LOG.info("Skipping location cache; only one location found for {}", tableName);
459              useCache = false;
460              continue;
461            }
462          }
463        } catch (TableNotFoundException e) {
464          if (LOG.isTraceEnabled()) {
465            LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
466              + " is dropped. Adding table to cache.");
467            for (Entry entry : entries) {
468              LOG.trace("Skipping : " + entry);
469            }
470          }
471          disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
472          // skip this entry
473          sink.getSkippedEditsCounter().addAndGet(entries.size());
474          return;
475        }
476
477        // check whether we should still replay this entry. If the regions are changed, or the
478        // entry is not coming from the primary region, filter it out.
479        HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
480        if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
481            encodedRegionName)) {
482          if (useCache) {
483            useCache = false;
484            continue; // this will retry location lookup
485          }
486          if (LOG.isTraceEnabled()) {
487            LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
488              + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
489              + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
490              + " from WALEdit");
491            for (Entry entry : entries) {
492              LOG.trace("Skipping : " + entry);
493            }
494          }
495          sink.getSkippedEditsCounter().addAndGet(entries.size());
496          return;
497        }
498        break;
499      }
500
501      if (locations.size() == 1) {
502        return;
503      }
504
505      ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1);
506
507      // All passed entries should belong to one region because it is coming from the EntryBuffers
508      // split per region. But the regions might split and merge (unlike log recovery case).
509      for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
510        HRegionLocation location = locations.getRegionLocation(replicaId);
511        if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
512          RegionInfo regionInfo = location == null
513              ? RegionReplicaUtil.getRegionInfoForReplica(
514                locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
515              : location.getRegionInfo();
516          RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
517            rpcControllerFactory, tableName, location, regionInfo, row, entries,
518            sink.getSkippedEditsCounter());
519           Future<ReplicateWALEntryResponse> task = pool.submit(
520             new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout));
521           tasks.add(task);
522        }
523      }
524
525      boolean tasksCancelled = false;
526      for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
527        try {
528          tasks.get(replicaId).get();
529        } catch (InterruptedException e) {
530          throw new InterruptedIOException(e.getMessage());
531        } catch (ExecutionException e) {
532          Throwable cause = e.getCause();
533          boolean canBeSkipped = false;
534          if (cause instanceof IOException) {
535            // The table can be disabled or dropped at this time. For disabled tables, we have no
536            // cheap mechanism to detect this case because meta does not contain this information.
537            // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
538            // RPC. So instead we start the replay RPC with retries and check whether the table is
539            // dropped or disabled which might cause SocketTimeoutException, or
540            // RetriesExhaustedException or similar if we get IOE.
541            if (cause instanceof TableNotFoundException
542                || connection.isTableDisabled(tableName)) {
543              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
544              canBeSkipped = true;
545            } else if (tableDescriptors != null) {
546              TableDescriptor tableDescriptor = tableDescriptors.get(tableName);
547              if (tableDescriptor != null
548                  //(replicaId + 1) as no task is added for primary replica for replication
549                  && tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
550                canBeSkipped = true;
551              }
552            }
553            if (canBeSkipped) {
554              if (LOG.isTraceEnabled()) {
555                LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
556                    + " because received exception for dropped or disabled table",
557                  cause);
558                for (Entry entry : entries) {
559                  LOG.trace("Skipping : " + entry);
560                }
561              }
562              if (!tasksCancelled) {
563                sink.getSkippedEditsCounter().addAndGet(entries.size());
564                tasksCancelled = true; // so that we do not add to skipped counter again
565              }
566              continue;
567            }
568
569            // otherwise rethrow
570            throw (IOException)cause;
571          }
572          // unexpected exception
573          throw new IOException(cause);
574        }
575      }
576    }
577  }
578
579  static class RetryingRpcCallable<V> implements Callable<V> {
580    RpcRetryingCallerFactory factory;
581    RetryingCallable<V> callable;
582    int timeout;
583    public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
584        int timeout) {
585      this.factory = factory;
586      this.callable = callable;
587      this.timeout = timeout;
588    }
589    @Override
590    public V call() throws Exception {
591      return factory.<V>newCaller().callWithRetries(callable, timeout);
592    }
593  }
594
595  /**
596   * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
597   * the entry if the region boundaries have changed or the region is gone.
598   */
599  static class RegionReplicaReplayCallable extends
600      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
601    private final List<Entry> entries;
602    private final byte[] initialEncodedRegionName;
603    private final AtomicLong skippedEntries;
604
605    public RegionReplicaReplayCallable(ClusterConnection connection,
606        RpcControllerFactory rpcControllerFactory, TableName tableName,
607        HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries,
608        AtomicLong skippedEntries) {
609      super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
610      this.entries = entries;
611      this.skippedEntries = skippedEntries;
612      this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
613    }
614
615    @Override
616    public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception {
617      // Check whether we should still replay this entry. If the regions are changed, or the
618      // entry is not coming form the primary region, filter it out because we do not need it.
619      // Regions can change because of (1) region split (2) region merge (3) table recreated
620      boolean skip = false;
621      if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
622          initialEncodedRegionName)) {
623        skip = true;
624      }
625      if (!this.entries.isEmpty() && !skip) {
626        Entry[] entriesArray = new Entry[this.entries.size()];
627        entriesArray = this.entries.toArray(entriesArray);
628
629        // set the region name for the target region replica
630        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
631            ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
632                .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
633        controller.setCellScanner(p.getSecond());
634        return stub.replay(controller, p.getFirst());
635      }
636
637      if (skip) {
638        if (LOG.isTraceEnabled()) {
639          LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
640            + " because located region " + location.getRegionInfo().getEncodedName()
641            + " is different than the original region "
642            + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
643          for (Entry entry : entries) {
644            LOG.trace("Skipping : " + entry);
645          }
646        }
647        skippedEntries.addAndGet(entries.size());
648      }
649      return ReplicateWALEntryResponse.newBuilder().build();
650    }
651  }
652}