001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.Callable; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Future; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseIOException; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.RegionLocations; 044import org.apache.hadoop.hbase.TableDescriptors; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.TableNotFoundException; 047import org.apache.hadoop.hbase.client.ClusterConnection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionReplicaUtil; 052import org.apache.hadoop.hbase.client.RetryingCallable; 053import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 057import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 058import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 059import org.apache.hadoop.hbase.replication.WALEntryFilter; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.util.Threads; 063import org.apache.hadoop.hbase.wal.WAL.Entry; 064import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; 065import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; 066import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 067import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; 068import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; 069import org.apache.hadoop.util.StringUtils; 070import org.apache.yetus.audience.InterfaceAudience; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 074import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 077 078/** 079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 080 * which receives the WAL edits from the WAL, and sends the edits to replicas 081 * of regions. 082 */ 083@InterfaceAudience.Private 084public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { 085 086 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class); 087 088 // Can be configured differently than hbase.client.retries.number 089 private static String CLIENT_RETRIES_NUMBER 090 = "hbase.region.replica.replication.client.retries.number"; 091 092 private Configuration conf; 093 private ClusterConnection connection; 094 private TableDescriptors tableDescriptors; 095 096 // Reuse WALSplitter constructs as a WAL pipe 097 private PipelineController controller; 098 private RegionReplicaOutputSink outputSink; 099 private EntryBuffers entryBuffers; 100 101 // Number of writer threads 102 private int numWriterThreads; 103 104 private int operationTimeout; 105 106 private ExecutorService pool; 107 108 @Override 109 public void init(Context context) throws IOException { 110 super.init(context); 111 112 this.conf = HBaseConfiguration.create(context.getConfiguration()); 113 this.tableDescriptors = context.getTableDescriptors(); 114 115 // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. 116 // We are resetting it here because we want default number of retries (35) rather than 10 times 117 // that which makes very long retries for disabled tables etc. 118 int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 119 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 120 if (defaultNumRetries > 10) { 121 int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 122 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 123 defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already 124 } 125 126 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 127 int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); 128 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); 129 130 this.numWriterThreads = this.conf.getInt( 131 "hbase.region.replica.replication.writer.threads", 3); 132 controller = new PipelineController(); 133 entryBuffers = new EntryBuffers(controller, 134 this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024)); 135 136 // use the regular RPC timeout for replica replication RPC's 137 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 138 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 139 } 140 141 @Override 142 protected void doStart() { 143 try { 144 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 145 this.pool = getDefaultThreadPool(conf); 146 outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers, 147 connection, pool, numWriterThreads, operationTimeout); 148 outputSink.startWriterThreads(); 149 super.doStart(); 150 } catch (IOException ex) { 151 LOG.warn("Received exception while creating connection :" + ex); 152 notifyFailed(ex); 153 } 154 } 155 156 @Override 157 protected void doStop() { 158 if (outputSink != null) { 159 try { 160 outputSink.finishWritingAndClose(); 161 } catch (IOException ex) { 162 LOG.warn("Got exception while trying to close OutputSink", ex); 163 } 164 } 165 if (this.pool != null) { 166 this.pool.shutdownNow(); 167 try { 168 // wait for 10 sec 169 boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS); 170 if (!shutdown) { 171 LOG.warn("Failed to shutdown the thread pool after 10 seconds"); 172 } 173 } catch (InterruptedException e) { 174 LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e); 175 } 176 } 177 if (connection != null) { 178 try { 179 connection.close(); 180 } catch (IOException ex) { 181 LOG.warn("Got exception closing connection :" + ex); 182 } 183 } 184 super.doStop(); 185 } 186 187 /** 188 * Returns a Thread pool for the RPC's to region replicas. Similar to 189 * Connection's thread pool. 190 */ 191 private ExecutorService getDefaultThreadPool(Configuration conf) { 192 int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); 193 if (maxThreads == 0) { 194 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 195 } 196 long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); 197 LinkedBlockingQueue<Runnable> workQueue = 198 new LinkedBlockingQueue<>(maxThreads * 199 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 200 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 201 ThreadPoolExecutor tpe = new ThreadPoolExecutor( 202 maxThreads, 203 maxThreads, 204 keepAliveTime, 205 TimeUnit.SECONDS, 206 workQueue, 207 Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-")); 208 tpe.allowCoreThreadTimeOut(true); 209 return tpe; 210 } 211 212 @Override 213 public boolean replicate(ReplicateContext replicateContext) { 214 /* A note on batching in RegionReplicaReplicationEndpoint (RRRE): 215 * 216 * RRRE relies on batching from two different mechanisms. The first is the batching from 217 * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single 218 * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most 219 * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing). 220 * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits 221 * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to 222 * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits 223 * based on regions. 224 * 225 * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which 226 * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink). 227 * The SinkWriter in this case will send the wal edits to all secondary region replicas in 228 * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is 229 * being written to the sink, another buffer for the same region will not be made available to 230 * writers ensuring regions edits are not replayed out of order. 231 * 232 * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so 233 * that the replication can assume all edits are persisted. We may be able to do a better 234 * pipelining between the replication thread and output sinks later if it becomes a bottleneck. 235 */ 236 237 while (this.isRunning()) { 238 try { 239 for (Entry entry: replicateContext.getEntries()) { 240 entryBuffers.appendEntry(entry); 241 } 242 outputSink.flush(); // make sure everything is flushed 243 ctx.getMetrics().incrLogEditsFiltered( 244 outputSink.getSkippedEditsCounter().getAndSet(0)); 245 return true; 246 } catch (InterruptedException e) { 247 Thread.currentThread().interrupt(); 248 return false; 249 } catch (IOException e) { 250 LOG.warn("Received IOException while trying to replicate" 251 + StringUtils.stringifyException(e)); 252 } 253 } 254 255 return false; 256 } 257 258 @Override 259 public boolean canReplicateToSameCluster() { 260 return true; 261 } 262 263 @Override 264 protected WALEntryFilter getScopeWALEntryFilter() { 265 // we do not care about scope. We replicate everything. 266 return null; 267 } 268 269 static class RegionReplicaOutputSink extends OutputSink { 270 private final RegionReplicaSinkWriter sinkWriter; 271 private final TableDescriptors tableDescriptors; 272 private final Cache<TableName, Boolean> memstoreReplicationEnabled; 273 274 public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors, 275 EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, 276 int numWriters, int operationTimeout) { 277 super(controller, entryBuffers, numWriters); 278 this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); 279 this.tableDescriptors = tableDescriptors; 280 281 // A cache for the table "memstore replication enabled" flag. 282 // It has a default expiry of 5 sec. This means that if the table is altered 283 // with a different flag value, we might miss to replicate for that amount of 284 // time. But this cache avoid the slow lookup and parsing of the TableDescriptor. 285 int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration() 286 .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000); 287 this.memstoreReplicationEnabled = CacheBuilder.newBuilder() 288 .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS) 289 .initialCapacity(10) 290 .maximumSize(1000) 291 .build(); 292 } 293 294 @Override 295 public void append(RegionEntryBuffer buffer) throws IOException { 296 List<Entry> entries = buffer.getEntryBuffer(); 297 298 if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) { 299 return; 300 } 301 302 // meta edits (e.g. flush) are always replicated. 303 // data edits (e.g. put) are replicated if the table requires them. 304 if (!requiresReplication(buffer.getTableName(), entries)) { 305 return; 306 } 307 308 sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), 309 CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); 310 } 311 312 @Override 313 public boolean flush() throws IOException { 314 // nothing much to do for now. Wait for the Writer threads to finish up 315 // append()'ing the data. 316 entryBuffers.waitUntilDrained(); 317 return super.flush(); 318 } 319 320 @Override 321 public boolean keepRegionEvent(Entry entry) { 322 return true; 323 } 324 325 @Override 326 public List<Path> finishWritingAndClose() throws IOException { 327 finishWriting(true); 328 return null; 329 } 330 331 @Override 332 public Map<byte[], Long> getOutputCounts() { 333 return null; // only used in tests 334 } 335 336 @Override 337 public int getNumberOfRecoveredRegions() { 338 return 0; 339 } 340 341 AtomicLong getSkippedEditsCounter() { 342 return skippedEdits; 343 } 344 345 /** 346 * returns true if the specified entry must be replicated. 347 * We should always replicate meta operations (e.g. flush) 348 * and use the user HTD flag to decide whether or not replicate the memstore. 349 */ 350 private boolean requiresReplication(final TableName tableName, final List<Entry> entries) 351 throws IOException { 352 // unit-tests may not the TableDescriptors, bypass the check and always replicate 353 if (tableDescriptors == null) return true; 354 355 Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName); 356 if (requiresReplication == null) { 357 // check if the table requires memstore replication 358 // some unit-test drop the table, so we should do a bypass check and always replicate. 359 TableDescriptor htd = tableDescriptors.get(tableName); 360 requiresReplication = htd == null || htd.hasRegionMemStoreReplication(); 361 memstoreReplicationEnabled.put(tableName, requiresReplication); 362 } 363 364 // if memstore replication is not required, check the entries. 365 // meta edits (e.g. flush) must be always replicated. 366 if (!requiresReplication) { 367 int skipEdits = 0; 368 java.util.Iterator<Entry> it = entries.iterator(); 369 while (it.hasNext()) { 370 Entry entry = it.next(); 371 if (entry.getEdit().isMetaEdit()) { 372 requiresReplication = true; 373 } else { 374 it.remove(); 375 skipEdits++; 376 } 377 } 378 skippedEdits.addAndGet(skipEdits); 379 } 380 return requiresReplication; 381 } 382 } 383 384 static class RegionReplicaSinkWriter extends SinkWriter { 385 RegionReplicaOutputSink sink; 386 ClusterConnection connection; 387 RpcControllerFactory rpcControllerFactory; 388 RpcRetryingCallerFactory rpcRetryingCallerFactory; 389 int operationTimeout; 390 ExecutorService pool; 391 Cache<TableName, Boolean> disabledAndDroppedTables; 392 393 public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, 394 ExecutorService pool, int operationTimeout) { 395 this.sink = sink; 396 this.connection = connection; 397 this.operationTimeout = operationTimeout; 398 this.rpcRetryingCallerFactory 399 = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); 400 this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); 401 this.pool = pool; 402 403 int nonExistentTableCacheExpiryMs = connection.getConfiguration() 404 .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); 405 // A cache for non existing tables that have a default expiry of 5 sec. This means that if the 406 // table is created again with the same name, we might miss to replicate for that amount of 407 // time. But this cache prevents overloading meta requests for every edit from a deleted file. 408 disabledAndDroppedTables = CacheBuilder.newBuilder() 409 .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS) 410 .initialCapacity(10) 411 .maximumSize(1000) 412 .build(); 413 } 414 415 public void append(TableName tableName, byte[] encodedRegionName, byte[] row, 416 List<Entry> entries) throws IOException { 417 418 if (disabledAndDroppedTables.getIfPresent(tableName) != null) { 419 if (LOG.isTraceEnabled()) { 420 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName 421 + " is cached as a disabled or dropped table"); 422 for (Entry entry : entries) { 423 LOG.trace("Skipping : " + entry); 424 } 425 } 426 sink.getSkippedEditsCounter().addAndGet(entries.size()); 427 return; 428 } 429 430 // If the table is disabled or dropped, we should not replay the entries, and we can skip 431 // replaying them. However, we might not know whether the table is disabled until we 432 // invalidate the cache and check from meta 433 RegionLocations locations = null; 434 boolean useCache = true; 435 while (true) { 436 // get the replicas of the primary region 437 try { 438 locations = RegionReplicaReplayCallable 439 .getRegionLocations(connection, tableName, row, useCache, 0); 440 441 if (locations == null) { 442 throw new HBaseIOException("Cannot locate locations for " 443 + tableName + ", row:" + Bytes.toStringBinary(row)); 444 } 445 } catch (TableNotFoundException e) { 446 if (LOG.isTraceEnabled()) { 447 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName 448 + " is dropped. Adding table to cache."); 449 for (Entry entry : entries) { 450 LOG.trace("Skipping : " + entry); 451 } 452 } 453 disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored 454 // skip this entry 455 sink.getSkippedEditsCounter().addAndGet(entries.size()); 456 return; 457 } 458 459 // check whether we should still replay this entry. If the regions are changed, or the 460 // entry is not coming from the primary region, filter it out. 461 HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); 462 if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), 463 encodedRegionName)) { 464 if (useCache) { 465 useCache = false; 466 continue; // this will retry location lookup 467 } 468 if (LOG.isTraceEnabled()) { 469 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 470 + " because located region " + primaryLocation.getRegionInfo().getEncodedName() 471 + " is different than the original region " + Bytes.toStringBinary(encodedRegionName) 472 + " from WALEdit"); 473 for (Entry entry : entries) { 474 LOG.trace("Skipping : " + entry); 475 } 476 } 477 sink.getSkippedEditsCounter().addAndGet(entries.size()); 478 return; 479 } 480 break; 481 } 482 483 if (locations.size() == 1) { 484 return; 485 } 486 487 ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1); 488 489 // All passed entries should belong to one region because it is coming from the EntryBuffers 490 // split per region. But the regions might split and merge (unlike log recovery case). 491 for (int replicaId = 0; replicaId < locations.size(); replicaId++) { 492 HRegionLocation location = locations.getRegionLocation(replicaId); 493 if (!RegionReplicaUtil.isDefaultReplica(replicaId)) { 494 RegionInfo regionInfo = location == null 495 ? RegionReplicaUtil.getRegionInfoForReplica( 496 locations.getDefaultRegionLocation().getRegionInfo(), replicaId) 497 : location.getRegionInfo(); 498 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, 499 rpcControllerFactory, tableName, location, regionInfo, row, entries, 500 sink.getSkippedEditsCounter()); 501 Future<ReplicateWALEntryResponse> task = pool.submit( 502 new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout)); 503 tasks.add(task); 504 } 505 } 506 507 boolean tasksCancelled = false; 508 for (Future<ReplicateWALEntryResponse> task : tasks) { 509 try { 510 task.get(); 511 } catch (InterruptedException e) { 512 throw new InterruptedIOException(e.getMessage()); 513 } catch (ExecutionException e) { 514 Throwable cause = e.getCause(); 515 if (cause instanceof IOException) { 516 // The table can be disabled or dropped at this time. For disabled tables, we have no 517 // cheap mechanism to detect this case because meta does not contain this information. 518 // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay 519 // RPC. So instead we start the replay RPC with retries and check whether the table is 520 // dropped or disabled which might cause SocketTimeoutException, or 521 // RetriesExhaustedException or similar if we get IOE. 522 if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { 523 if (LOG.isTraceEnabled()) { 524 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 525 + " because received exception for dropped or disabled table", cause); 526 for (Entry entry : entries) { 527 LOG.trace("Skipping : " + entry); 528 } 529 } 530 disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. 531 if (!tasksCancelled) { 532 sink.getSkippedEditsCounter().addAndGet(entries.size()); 533 tasksCancelled = true; // so that we do not add to skipped counter again 534 } 535 continue; 536 } 537 // otherwise rethrow 538 throw (IOException)cause; 539 } 540 // unexpected exception 541 throw new IOException(cause); 542 } 543 } 544 } 545 } 546 547 static class RetryingRpcCallable<V> implements Callable<V> { 548 RpcRetryingCallerFactory factory; 549 RetryingCallable<V> callable; 550 int timeout; 551 public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable, 552 int timeout) { 553 this.factory = factory; 554 this.callable = callable; 555 this.timeout = timeout; 556 } 557 @Override 558 public V call() throws Exception { 559 return factory.<V>newCaller().callWithRetries(callable, timeout); 560 } 561 } 562 563 /** 564 * Calls replay on the passed edits for the given set of entries belonging to the region. It skips 565 * the entry if the region boundaries have changed or the region is gone. 566 */ 567 static class RegionReplicaReplayCallable extends 568 RegionAdminServiceCallable<ReplicateWALEntryResponse> { 569 private final List<Entry> entries; 570 private final byte[] initialEncodedRegionName; 571 private final AtomicLong skippedEntries; 572 573 public RegionReplicaReplayCallable(ClusterConnection connection, 574 RpcControllerFactory rpcControllerFactory, TableName tableName, 575 HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries, 576 AtomicLong skippedEntries) { 577 super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId()); 578 this.entries = entries; 579 this.skippedEntries = skippedEntries; 580 this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); 581 } 582 583 @Override 584 public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception { 585 // Check whether we should still replay this entry. If the regions are changed, or the 586 // entry is not coming form the primary region, filter it out because we do not need it. 587 // Regions can change because of (1) region split (2) region merge (3) table recreated 588 boolean skip = false; 589 if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), 590 initialEncodedRegionName)) { 591 skip = true; 592 } 593 if (!this.entries.isEmpty() && !skip) { 594 Entry[] entriesArray = new Entry[this.entries.size()]; 595 entriesArray = this.entries.toArray(entriesArray); 596 597 // set the region name for the target region replica 598 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = 599 ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location 600 .getRegionInfo().getEncodedNameAsBytes(), null, null, null); 601 controller.setCellScanner(p.getSecond()); 602 return stub.replay(controller, p.getFirst()); 603 } 604 605 if (skip) { 606 if (LOG.isTraceEnabled()) { 607 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName 608 + " because located region " + location.getRegionInfo().getEncodedName() 609 + " is different than the original region " 610 + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); 611 for (Entry entry : entries) { 612 LOG.trace("Skipping : " + entry); 613 } 614 } 615 skippedEntries.addAndGet(entries.size()); 616 } 617 return ReplicateWALEntryResponse.newBuilder().build(); 618 } 619 } 620}