001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.Callable; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Future; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.CellScanner; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HRegionLocation; 042import org.apache.hadoop.hbase.RegionLocations; 043import org.apache.hadoop.hbase.TableDescriptors; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.TableNotFoundException; 046import org.apache.hadoop.hbase.client.ClusterConnection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionReplicaUtil; 051import org.apache.hadoop.hbase.client.RetryingCallable; 052import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 053import org.apache.hadoop.hbase.client.TableDescriptor; 054import org.apache.hadoop.hbase.ipc.HBaseRpcController; 055import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 056import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 057import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 058import org.apache.hadoop.hbase.replication.WALEntryFilter; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.hbase.util.Threads; 062import org.apache.hadoop.hbase.wal.EntryBuffers; 063import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; 064import org.apache.hadoop.hbase.wal.OutputSink; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 067import org.apache.hadoop.util.StringUtils; 068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 074import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 075 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 078 079/** 080 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 081 * which receives the WAL edits from the WAL, and sends the edits to replicas 082 * of regions. 083 */ 084@InterfaceAudience.Private 085public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { 086 087 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class); 088 089 // Can be configured differently than hbase.client.retries.number 090 private static String CLIENT_RETRIES_NUMBER 091 = "hbase.region.replica.replication.client.retries.number"; 092 093 private Configuration conf; 094 private ClusterConnection connection; 095 private TableDescriptors tableDescriptors; 096 097 // Reuse WALSplitter constructs as a WAL pipe 098 private PipelineController controller; 099 private RegionReplicaOutputSink outputSink; 100 private EntryBuffers entryBuffers; 101 102 // Number of writer threads 103 private int numWriterThreads; 104 105 private int operationTimeout; 106 107 private ExecutorService pool; 108 109 @Override 110 public void init(Context context) throws IOException { 111 super.init(context); 112 113 this.conf = HBaseConfiguration.create(context.getConfiguration()); 114 this.tableDescriptors = context.getTableDescriptors(); 115 116 // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. 117 // We are resetting it here because we want default number of retries (35) rather than 10 times 118 // that which makes very long retries for disabled tables etc. 119 int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 120 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 121 if (defaultNumRetries > 10) { 122 int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 123 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 124 defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already 125 } 126 127 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 128 int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); 129 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); 130 131 this.numWriterThreads = this.conf.getInt( 132 "hbase.region.replica.replication.writer.threads", 3); 133 controller = new PipelineController(); 134 entryBuffers = new EntryBuffers(controller, 135 this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024)); 136 137 // use the regular RPC timeout for replica replication RPC's 138 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 139 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 140 } 141 142 @Override 143 protected void doStart() { 144 try { 145 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 146 this.pool = getDefaultThreadPool(conf); 147 outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers, 148 connection, pool, numWriterThreads, operationTimeout); 149 outputSink.startWriterThreads(); 150 super.doStart(); 151 } catch (IOException ex) { 152 LOG.warn("Received exception while creating connection :" + ex); 153 notifyFailed(ex); 154 } 155 } 156 157 @Override 158 protected void doStop() { 159 if (outputSink != null) { 160 try { 161 outputSink.close(); 162 } catch (IOException ex) { 163 LOG.warn("Got exception while trying to close OutputSink", ex); 164 } 165 } 166 if (this.pool != null) { 167 this.pool.shutdownNow(); 168 try { 169 // wait for 10 sec 170 boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS); 171 if (!shutdown) { 172 LOG.warn("Failed to shutdown the thread pool after 10 seconds"); 173 } 174 } catch (InterruptedException e) { 175 LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e); 176 } 177 } 178 if (connection != null) { 179 try { 180 connection.close(); 181 } catch (IOException ex) { 182 LOG.warn("Got exception closing connection :" + ex); 183 } 184 } 185 super.doStop(); 186 } 187 188 /** 189 * Returns a Thread pool for the RPC's to region replicas. Similar to 190 * Connection's thread pool. 191 */ 192 private ExecutorService getDefaultThreadPool(Configuration conf) { 193 int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); 194 if (maxThreads == 0) { 195 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 196 } 197 long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); 198 LinkedBlockingQueue<Runnable> workQueue = 199 new LinkedBlockingQueue<>(maxThreads * 200 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 201 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 202 ThreadPoolExecutor tpe = 203 new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 204 new ThreadFactoryBuilder() 205 .setNameFormat(this.getClass().getSimpleName() + "-rpc-shared-pool-%d") 206 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 207 tpe.allowCoreThreadTimeOut(true); 208 return tpe; 209 } 210 211 @Override 212 public boolean replicate(ReplicateContext replicateContext) { 213 /* A note on batching in RegionReplicaReplicationEndpoint (RRRE): 214 * 215 * RRRE relies on batching from two different mechanisms. The first is the batching from 216 * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single 217 * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most 218 * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing). 219 * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits 220 * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to 221 * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits 222 * based on regions. 223 * 224 * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which 225 * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink). 226 * The SinkWriter in this case will send the wal edits to all secondary region replicas in 227 * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is 228 * being written to the sink, another buffer for the same region will not be made available to 229 * writers ensuring regions edits are not replayed out of order. 230 * 231 * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so 232 * that the replication can assume all edits are persisted. We may be able to do a better 233 * pipelining between the replication thread and output sinks later if it becomes a bottleneck. 234 */ 235 236 while (this.isRunning()) { 237 try { 238 for (Entry entry: replicateContext.getEntries()) { 239 entryBuffers.appendEntry(entry); 240 } 241 outputSink.flush(); // make sure everything is flushed 242 ctx.getMetrics().incrLogEditsFiltered( 243 outputSink.getSkippedEditsCounter().getAndSet(0)); 244 return true; 245 } catch (InterruptedException e) { 246 Thread.currentThread().interrupt(); 247 return false; 248 } catch (IOException e) { 249 LOG.warn("Received IOException while trying to replicate" 250 + StringUtils.stringifyException(e)); 251 outputSink.restartWriterThreadsIfNeeded(); 252 } 253 } 254 255 return false; 256 } 257 258 @Override 259 public boolean canReplicateToSameCluster() { 260 return true; 261 } 262 263 @Override 264 protected WALEntryFilter getScopeWALEntryFilter() { 265 // we do not care about scope. We replicate everything. 266 return null; 267 } 268 269 static class RegionReplicaOutputSink extends OutputSink { 270 private final RegionReplicaSinkWriter sinkWriter; 271 private final TableDescriptors tableDescriptors; 272 private final Cache<TableName, Boolean> memstoreReplicationEnabled; 273 274 public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors, 275 EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, 276 int numWriters, int operationTimeout) { 277 super(controller, entryBuffers, numWriters); 278 this.sinkWriter = 279 new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors); 280 this.tableDescriptors = tableDescriptors; 281 282 // A cache for the table "memstore replication enabled" flag. 283 // It has a default expiry of 5 sec. This means that if the table is altered 284 // with a different flag value, we might miss to replicate for that amount of 285 // time. But this cache avoid the slow lookup and parsing of the TableDescriptor. 286 int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration() 287 .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000); 288 this.memstoreReplicationEnabled = CacheBuilder.newBuilder() 289 .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS) 290 .initialCapacity(10) 291 .maximumSize(1000) 292 .build(); 293 } 294 295 @Override 296 public void append(RegionEntryBuffer buffer) throws IOException { 297 List<Entry> entries = buffer.getEntries(); 298 299 if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) { 300 return; 301 } 302 303 // meta edits (e.g. flush) are always replicated. 304 // data edits (e.g. put) are replicated if the table requires them. 305 if (!requiresReplication(buffer.getTableName(), entries)) { 306 return; 307 } 308 309 sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), 310 CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); 311 } 312 313 void flush() throws IOException { 314 // nothing much to do for now. Wait for the Writer threads to finish up 315 // append()'ing the data. 316 entryBuffers.waitUntilDrained(); 317 } 318 319 @Override 320 public boolean keepRegionEvent(Entry entry) { 321 return true; 322 } 323 324 @Override 325 public List<Path> close() throws IOException { 326 finishWriterThreads(true); 327 return null; 328 } 329 330 @Override 331 public Map<String, Long> getOutputCounts() { 332 return null; // only used in tests 333 } 334 335 @Override 336 public int getNumberOfRecoveredRegions() { 337 return 0; 338 } 339 340 AtomicLong getSkippedEditsCounter() { 341 return totalSkippedEdits; 342 } 343 344 /** 345 * returns true if the specified entry must be replicated. 346 * We should always replicate meta operations (e.g. flush) 347 * and use the user HTD flag to decide whether or not replicate the memstore. 348 */ 349 private boolean requiresReplication(final TableName tableName, final List<Entry> entries) 350 throws IOException { 351 // unit-tests may not the TableDescriptors, bypass the check and always replicate 352 if (tableDescriptors == null) return true; 353 354 Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName); 355 if (requiresReplication == null) { 356 // check if the table requires memstore replication 357 // some unit-test drop the table, so we should do a bypass check and always replicate. 358 TableDescriptor htd = tableDescriptors.get(tableName); 359 requiresReplication = htd == null || htd.hasRegionMemStoreReplication(); 360 memstoreReplicationEnabled.put(tableName, requiresReplication); 361 } 362 363 // if memstore replication is not required, check the entries. 364 // meta edits (e.g. flush) must be always replicated. 365 if (!requiresReplication) { 366 int skipEdits = 0; 367 java.util.Iterator<Entry> it = entries.iterator(); 368 while (it.hasNext()) { 369 Entry entry = it.next(); 370 if (entry.getEdit().isMetaEdit()) { 371 requiresReplication = true; 372 } else { 373 it.remove(); 374 skipEdits++; 375 } 376 } 377 totalSkippedEdits.addAndGet(skipEdits); 378 } 379 return requiresReplication; 380 } 381 382 @Override 383 protected int getNumOpenWriters() { 384 // TODO Auto-generated method stub 385 return 0; 386 } 387 } 388 389 static class RegionReplicaSinkWriter { 390 RegionReplicaOutputSink sink; 391 ClusterConnection connection; 392 RpcControllerFactory rpcControllerFactory; 393 RpcRetryingCallerFactory rpcRetryingCallerFactory; 394 int operationTimeout; 395 ExecutorService pool; 396 Cache<TableName, Boolean> disabledAndDroppedTables; 397 TableDescriptors tableDescriptors; 398 399 public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, 400 ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) { 401 this.sink = sink; 402 this.connection = connection; 403 this.operationTimeout = operationTimeout; 404 this.rpcRetryingCallerFactory 405 = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); 406 this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); 407 this.pool = pool; 408 this.tableDescriptors = tableDescriptors; 409 410 int nonExistentTableCacheExpiryMs = connection.getConfiguration() 411 .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); 412 // A cache for non existing tables that have a default expiry of 5 sec. This means that if the 413 // table is created again with the same name, we might miss to replicate for that amount of 414 // time. But this cache prevents overloading meta requests for every edit from a deleted file. 415 disabledAndDroppedTables = CacheBuilder.newBuilder() 416 .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS) 417 .initialCapacity(10) 418 .maximumSize(1000) 419 .build(); 420 } 421 422 public void append(TableName tableName, byte[] encodedRegionName, byte[] row, 423 List<Entry> entries) throws IOException { 424 425 if (disabledAndDroppedTables.getIfPresent(tableName) != null) { 426 if (LOG.isTraceEnabled()) { 427 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName 428 + " is cached as a disabled or dropped table"); 429 for (Entry entry : entries) { 430 LOG.trace("Skipping : " + entry); 431 } 432 } 433 sink.getSkippedEditsCounter().addAndGet(entries.size()); 434 return; 435 } 436 437 // If the table is disabled or dropped, we should not replay the entries, and we can skip 438 // replaying them. However, we might not know whether the table is disabled until we 439 // invalidate the cache and check from meta 440 RegionLocations locations = null; 441 boolean useCache = true; 442 while (true) { 443 // get the replicas of the primary region 444 try { 445 locations = RegionReplicaReplayCallable 446 .getRegionLocations(connection, tableName, row, useCache, 0); 447 if (locations == null) { 448 throw new HBaseIOException("Cannot locate locations for " 449 + tableName + ", row:" + Bytes.toStringBinary(row)); 450 } 451 // Replicas can take a while to come online. The cache may have only the primary. If we 452 // keep going to the cache, we will not learn of the replicas and their locations after 453 // they come online. 454 if (useCache && locations.size() == 1 && TableName.isMetaTableName(tableName)) { 455 if (tableDescriptors.get(tableName).getRegionReplication() > 1) { 456 // Make an obnoxious log here. See how bad this issue is. Add a timer if happening 457 // too much. 458 LOG.info("Skipping location cache; only one location found for {}", tableName); 459 useCache = false; 460 continue; 461 } 462 } 463 } catch (TableNotFoundException e) { 464 if (LOG.isTraceEnabled()) { 465 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName 466 + " is dropped. Adding table to cache."); 467 for (Entry entry : entries) { 468 LOG.trace("Skipping : " + entry); 469 } 470 } 471 disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored 472 // skip this entry 473 sink.getSkippedEditsCounter().addAndGet(entries.size()); 474 return; 475 } 476 477 // check whether we should still replay this entry. If the regions are changed, or the 478 // entry is not coming from the primary region, filter it out. 479 HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); 480 if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), 481 encodedRegionName)) { 482 if (useCache) { 483 useCache = false; 484 continue; // this will retry location lookup 485 } 486 if (LOG.isTraceEnabled()) { 487 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 488 + " because located region " + primaryLocation.getRegionInfo().getEncodedName() 489 + " is different than the original region " + Bytes.toStringBinary(encodedRegionName) 490 + " from WALEdit"); 491 for (Entry entry : entries) { 492 LOG.trace("Skipping : " + entry); 493 } 494 } 495 sink.getSkippedEditsCounter().addAndGet(entries.size()); 496 return; 497 } 498 break; 499 } 500 501 if (locations.size() == 1) { 502 return; 503 } 504 505 ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1); 506 507 // All passed entries should belong to one region because it is coming from the EntryBuffers 508 // split per region. But the regions might split and merge (unlike log recovery case). 509 for (int replicaId = 0; replicaId < locations.size(); replicaId++) { 510 HRegionLocation location = locations.getRegionLocation(replicaId); 511 if (!RegionReplicaUtil.isDefaultReplica(replicaId)) { 512 RegionInfo regionInfo = location == null 513 ? RegionReplicaUtil.getRegionInfoForReplica( 514 locations.getDefaultRegionLocation().getRegionInfo(), replicaId) 515 : location.getRegionInfo(); 516 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, 517 rpcControllerFactory, tableName, location, regionInfo, row, entries, 518 sink.getSkippedEditsCounter()); 519 Future<ReplicateWALEntryResponse> task = pool.submit( 520 new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout)); 521 tasks.add(task); 522 } 523 } 524 525 boolean tasksCancelled = false; 526 for (int replicaId = 0; replicaId < tasks.size(); replicaId++) { 527 try { 528 tasks.get(replicaId).get(); 529 } catch (InterruptedException e) { 530 throw new InterruptedIOException(e.getMessage()); 531 } catch (ExecutionException e) { 532 Throwable cause = e.getCause(); 533 boolean canBeSkipped = false; 534 if (cause instanceof IOException) { 535 // The table can be disabled or dropped at this time. For disabled tables, we have no 536 // cheap mechanism to detect this case because meta does not contain this information. 537 // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay 538 // RPC. So instead we start the replay RPC with retries and check whether the table is 539 // dropped or disabled which might cause SocketTimeoutException, or 540 // RetriesExhaustedException or similar if we get IOE. 541 if (cause instanceof TableNotFoundException 542 || connection.isTableDisabled(tableName)) { 543 disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. 544 canBeSkipped = true; 545 } else if (tableDescriptors != null) { 546 TableDescriptor tableDescriptor = tableDescriptors.get(tableName); 547 if (tableDescriptor != null 548 //(replicaId + 1) as no task is added for primary replica for replication 549 && tableDescriptor.getRegionReplication() <= (replicaId + 1)) { 550 canBeSkipped = true; 551 } 552 } 553 if (canBeSkipped) { 554 if (LOG.isTraceEnabled()) { 555 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 556 + " because received exception for dropped or disabled table", 557 cause); 558 for (Entry entry : entries) { 559 LOG.trace("Skipping : " + entry); 560 } 561 } 562 if (!tasksCancelled) { 563 sink.getSkippedEditsCounter().addAndGet(entries.size()); 564 tasksCancelled = true; // so that we do not add to skipped counter again 565 } 566 continue; 567 } 568 569 // otherwise rethrow 570 throw (IOException)cause; 571 } 572 // unexpected exception 573 throw new IOException(cause); 574 } 575 } 576 } 577 } 578 579 static class RetryingRpcCallable<V> implements Callable<V> { 580 RpcRetryingCallerFactory factory; 581 RetryingCallable<V> callable; 582 int timeout; 583 public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable, 584 int timeout) { 585 this.factory = factory; 586 this.callable = callable; 587 this.timeout = timeout; 588 } 589 @Override 590 public V call() throws Exception { 591 return factory.<V>newCaller().callWithRetries(callable, timeout); 592 } 593 } 594 595 /** 596 * Calls replay on the passed edits for the given set of entries belonging to the region. It skips 597 * the entry if the region boundaries have changed or the region is gone. 598 */ 599 static class RegionReplicaReplayCallable extends 600 RegionAdminServiceCallable<ReplicateWALEntryResponse> { 601 private final List<Entry> entries; 602 private final byte[] initialEncodedRegionName; 603 private final AtomicLong skippedEntries; 604 605 public RegionReplicaReplayCallable(ClusterConnection connection, 606 RpcControllerFactory rpcControllerFactory, TableName tableName, 607 HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries, 608 AtomicLong skippedEntries) { 609 super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId()); 610 this.entries = entries; 611 this.skippedEntries = skippedEntries; 612 this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); 613 } 614 615 @Override 616 public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception { 617 // Check whether we should still replay this entry. If the regions are changed, or the 618 // entry is not coming form the primary region, filter it out because we do not need it. 619 // Regions can change because of (1) region split (2) region merge (3) table recreated 620 boolean skip = false; 621 if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), 622 initialEncodedRegionName)) { 623 skip = true; 624 } 625 if (!this.entries.isEmpty() && !skip) { 626 Entry[] entriesArray = new Entry[this.entries.size()]; 627 entriesArray = this.entries.toArray(entriesArray); 628 629 // set the region name for the target region replica 630 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = 631 ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location 632 .getRegionInfo().getEncodedNameAsBytes(), null, null, null); 633 controller.setCellScanner(p.getSecond()); 634 return stub.replay(controller, p.getFirst()); 635 } 636 637 if (skip) { 638 if (LOG.isTraceEnabled()) { 639 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 640 + " because located region " + location.getRegionInfo().getEncodedName() 641 + " is different than the original region " 642 + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); 643 for (Entry entry : entries) { 644 LOG.trace("Skipping : " + entry); 645 } 646 } 647 skippedEntries.addAndGet(entries.size()); 648 } 649 return ReplicateWALEntryResponse.newBuilder().build(); 650 } 651 } 652}