001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNull; 022import static org.junit.jupiter.api.Assertions.assertThrows; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.mockito.Mockito.doNothing; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.spy; 027import static org.mockito.Mockito.verify; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.TreeMap; 036import java.util.UUID; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicReference; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.Waiter; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.regionserver.HRegion; 052import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; 055import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 056import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 057import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 058import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; 059import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 063import org.apache.hadoop.hbase.util.Pair; 064import org.apache.hadoop.hbase.util.Threads; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.hadoop.hbase.wal.WALEdit; 067import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 068import org.apache.hadoop.hbase.wal.WALKeyImpl; 069import org.apache.hadoop.hbase.zookeeper.ZKConfig; 070import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 071import org.junit.jupiter.api.AfterAll; 072import org.junit.jupiter.api.BeforeEach; 073import org.junit.jupiter.api.Test; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** 078 * Tests ReplicationSource and ReplicationEndpoint interactions 079 */ 080public class ReplicationEndpointTestBase extends TestReplicationBaseNoBeforeAll { 081 082 private static final Logger LOG = LoggerFactory.getLogger(ReplicationEndpointTestBase.class); 083 084 static int numRegionServers; 085 086 protected static void setUpBeforeClass() throws Exception { 087 configureClusters(UTIL1, UTIL2); 088 startClusters(); 089 numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 090 } 091 092 @AfterAll 093 public static void assertStopped() { 094 // check stop is called 095 assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 096 } 097 098 @BeforeEach 099 public void setup() throws Exception { 100 ReplicationEndpointForTest.contructedCount.set(0); 101 ReplicationEndpointForTest.startedCount.set(0); 102 ReplicationEndpointForTest.replicateCount.set(0); 103 ReplicationEndpointReturningFalse.replicated.set(false); 104 ReplicationEndpointForTest.lastEntries = null; 105 final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads(); 106 for (RegionServerThread rs : rsThreads) { 107 UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 108 } 109 // Wait for all log roll to finish 110 UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 111 @Override 112 public boolean evaluate() throws Exception { 113 for (RegionServerThread rs : rsThreads) { 114 if (!rs.getRegionServer().walRollRequestFinished()) { 115 return false; 116 } 117 } 118 return true; 119 } 120 121 @Override 122 public String explainFailure() throws Exception { 123 List<String> logRollInProgressRsList = new ArrayList<>(); 124 for (RegionServerThread rs : rsThreads) { 125 if (!rs.getRegionServer().walRollRequestFinished()) { 126 logRollInProgressRsList.add(rs.getRegionServer().toString()); 127 } 128 } 129 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 130 } 131 }); 132 } 133 134 @Test 135 public void testCustomReplicationEndpoint() throws Exception { 136 // test installing a custom replication endpoint other than the default one. 137 hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint", 138 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 139 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build()); 140 141 // check whether the class has been constructed and started 142 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 143 @Override 144 public boolean evaluate() throws Exception { 145 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 146 } 147 }); 148 149 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 150 @Override 151 public boolean evaluate() throws Exception { 152 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 153 } 154 }); 155 156 assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 157 158 // now replicate some data. 159 doPut(Bytes.toBytes("row42")); 160 161 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 162 @Override 163 public boolean evaluate() throws Exception { 164 return ReplicationEndpointForTest.replicateCount.get() >= 1; 165 } 166 }); 167 168 doAssert(Bytes.toBytes("row42")); 169 170 hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint"); 171 } 172 173 @Test 174 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 175 assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 176 assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 177 int peerCount = hbaseAdmin.listReplicationPeers().size(); 178 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 179 hbaseAdmin.addReplicationPeer(id, 180 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 181 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()).build()); 182 // This test is flakey and then there is so much stuff flying around in here its, hard to 183 // debug. Peer needs to be up for the edit to make it across. This wait on 184 // peer count seems to be a hack that has us not progress till peer is up. 185 if (hbaseAdmin.listReplicationPeers().size() <= peerCount) { 186 LOG.info("Waiting on peercount to go up from " + peerCount); 187 Threads.sleep(100); 188 } 189 // now replicate some data 190 doPut(row); 191 192 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 193 @Override 194 public boolean evaluate() throws Exception { 195 // Looks like replication endpoint returns false unless we put more than 10 edits. We 196 // only send over one edit. 197 int count = ReplicationEndpointForTest.replicateCount.get(); 198 LOG.info("count=" + count); 199 return ReplicationEndpointReturningFalse.replicated.get(); 200 } 201 }); 202 if (ReplicationEndpointReturningFalse.ex.get() != null) { 203 throw ReplicationEndpointReturningFalse.ex.get(); 204 } 205 206 hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate"); 207 } 208 209 @Test 210 public void testInterClusterReplication() throws Exception { 211 final String id = "testInterClusterReplication"; 212 213 List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName); 214 int totEdits = 0; 215 216 // Make sure edits are spread across regions because we do region based batching 217 // before shipping edits. 218 for (HRegion region : regions) { 219 RegionInfo hri = region.getRegionInfo(); 220 byte[] row = hri.getStartKey(); 221 for (int i = 0; i < 100; i++) { 222 if (row.length > 0) { 223 Put put = new Put(row); 224 put.addColumn(famName, row, row); 225 region.put(put); 226 totEdits++; 227 } 228 } 229 } 230 231 hbaseAdmin.addReplicationPeer(id, 232 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) 233 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()) 234 .build()); 235 236 final int numEdits = totEdits; 237 Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() { 238 @Override 239 public boolean evaluate() throws Exception { 240 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 241 } 242 243 @Override 244 public String explainFailure() throws Exception { 245 String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " 246 + InterClusterReplicationEndpointForTest.replicateCount.get(); 247 return failure; 248 } 249 }); 250 251 hbaseAdmin.removeReplicationPeer("testInterClusterReplication"); 252 UTIL1.deleteTableData(tableName); 253 } 254 255 @Test 256 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 257 ReplicationPeerConfig rpc = 258 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 259 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 260 // test that we can create mutliple WALFilters reflectively 261 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 262 EverythingPassesWALEntryFilter.class.getName() + "," 263 + EverythingPassesWALEntryFilterSubclass.class.getName()) 264 .build(); 265 266 hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 267 // now replicate some data. 268 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 269 doPut(connection, Bytes.toBytes("row1")); 270 doPut(connection, row); 271 doPut(connection, Bytes.toBytes("row2")); 272 } 273 274 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 275 @Override 276 public boolean evaluate() throws Exception { 277 return ReplicationEndpointForTest.replicateCount.get() >= 1; 278 } 279 }); 280 281 assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 282 // make sure our reflectively created filter is in the filter chain 283 assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 284 hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint"); 285 } 286 287 @Test 288 public void testWALEntryFilterAddValidation() throws Exception { 289 ReplicationPeerConfig rpc = 290 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 291 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 292 // test that we can create mutliple WALFilters reflectively 293 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 294 "IAmNotARealWalEntryFilter") 295 .build(); 296 assertThrows(IOException.class, 297 () -> hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc)); 298 } 299 300 @Test 301 public void testWALEntryFilterUpdateValidation() throws Exception { 302 ReplicationPeerConfig rpc = 303 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 304 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 305 // test that we can create mutliple WALFilters reflectively 306 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 307 "IAmNotARealWalEntryFilter") 308 .build(); 309 assertThrows(IOException.class, 310 () -> hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc)); 311 } 312 313 @Test 314 public void testMetricsSourceBaseSourcePassThrough() { 315 /* 316 * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, 317 * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that 318 * metrics get written to both namespaces. Both of those classes wrap a 319 * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics. 320 * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls 321 * down through the two layers of wrapping to the actual BaseSource. 322 */ 323 String id = "id"; 324 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 325 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 326 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 327 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 328 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 329 330 MetricsReplicationSourceSource singleSourceSource = 331 new MetricsReplicationSourceSourceImpl(singleRms, id); 332 MetricsReplicationGlobalSourceSource globalSourceSource = 333 new MetricsReplicationGlobalSourceSourceImpl(globalRms); 334 MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); 335 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); 336 337 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>(); 338 MetricsSource source = 339 new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); 340 341 String gaugeName = "gauge"; 342 String singleGaugeName = "source.id." + gaugeName; 343 String globalGaugeName = "source." + gaugeName; 344 long delta = 1; 345 String counterName = "counter"; 346 String singleCounterName = "source.id." + counterName; 347 String globalCounterName = "source." + counterName; 348 long count = 2; 349 source.decGauge(gaugeName, delta); 350 source.getMetricsContext(); 351 source.getMetricsDescription(); 352 source.getMetricsJmxContext(); 353 source.getMetricsName(); 354 source.incCounters(counterName, count); 355 source.incGauge(gaugeName, delta); 356 source.init(); 357 source.removeMetric(gaugeName); 358 source.setGauge(gaugeName, delta); 359 source.updateHistogram(counterName, count); 360 source.incrFailedRecoveryQueue(); 361 362 verify(singleRms).decGauge(singleGaugeName, delta); 363 verify(globalRms).decGauge(globalGaugeName, delta); 364 verify(globalRms).getMetricsContext(); 365 verify(globalRms).getMetricsJmxContext(); 366 verify(globalRms).getMetricsName(); 367 verify(singleRms).incCounters(singleCounterName, count); 368 verify(globalRms).incCounters(globalCounterName, count); 369 verify(singleRms).incGauge(singleGaugeName, delta); 370 verify(globalRms).incGauge(globalGaugeName, delta); 371 verify(globalRms).init(); 372 verify(singleRms).removeMetric(singleGaugeName); 373 verify(globalRms).removeMetric(globalGaugeName); 374 verify(singleRms).setGauge(singleGaugeName, delta); 375 verify(globalRms).setGauge(globalGaugeName, delta); 376 verify(singleRms).updateHistogram(singleCounterName, count); 377 verify(globalRms).updateHistogram(globalCounterName, count); 378 verify(spyglobalSourceSource).incrFailedRecoveryQueue(); 379 380 // check singleSourceSourceByTable metrics. 381 // singleSourceSourceByTable map entry will be created only 382 // after calling #setAgeOfLastShippedOpByTable 383 boolean containsRandomNewTable = 384 source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 385 assertEquals(false, containsRandomNewTable); 386 source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); 387 containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 388 assertEquals(true, containsRandomNewTable); 389 MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable"); 390 391 // age should be greater than zero we created the entry with time in the past 392 assertTrue(msr.getLastShippedAge() > 0); 393 assertTrue(msr.getShippedBytes() > 0); 394 395 } 396 397 private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) { 398 List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>(); 399 byte[] a = new byte[] { 'a' }; 400 Entry entry = createEntry(tableName, null, a); 401 walEntriesWithSize.add(new Pair<>(entry, 10L)); 402 return walEntriesWithSize; 403 } 404 405 private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) { 406 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), 407 EnvironmentEdgeManager.currentTime() - 1L, scopes); 408 WALEdit edit1 = new WALEdit(); 409 410 for (byte[] kv : kvs) { 411 WALEditInternalHelper.addExtendedCell(edit1, new KeyValue(kv, kv, kv)); 412 } 413 return new Entry(key1, edit1); 414 } 415 416 private void doPut(byte[] row) throws IOException { 417 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 418 doPut(connection, row); 419 } 420 } 421 422 private void doPut(final Connection connection, final byte[] row) throws IOException { 423 try (Table t = connection.getTable(tableName)) { 424 Put put = new Put(row); 425 put.addColumn(famName, row, row); 426 t.put(put); 427 } 428 } 429 430 private static void doAssert(byte[] row) throws Exception { 431 if (ReplicationEndpointForTest.lastEntries == null) { 432 return; // first call 433 } 434 assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 435 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 436 assertEquals(1, cells.size()); 437 assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 438 cells.get(0).getRowLength(), row, 0, row.length)); 439 } 440 441 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 442 static UUID uuid = HBaseTestingUtil.getRandomUUID(); 443 static AtomicInteger contructedCount = new AtomicInteger(); 444 static AtomicInteger startedCount = new AtomicInteger(); 445 static AtomicInteger stoppedCount = new AtomicInteger(); 446 static AtomicInteger replicateCount = new AtomicInteger(); 447 static volatile List<Entry> lastEntries = null; 448 449 public ReplicationEndpointForTest() { 450 replicateCount.set(0); 451 contructedCount.incrementAndGet(); 452 } 453 454 @Override 455 public UUID getPeerUUID() { 456 return uuid; 457 } 458 459 @Override 460 public boolean replicate(ReplicateContext replicateContext) { 461 replicateCount.incrementAndGet(); 462 lastEntries = new ArrayList<>(replicateContext.entries); 463 return true; 464 } 465 466 @Override 467 public void start() { 468 startAsync(); 469 } 470 471 @Override 472 public void stop() { 473 stopAsync(); 474 } 475 476 @Override 477 protected void doStart() { 478 startedCount.incrementAndGet(); 479 notifyStarted(); 480 } 481 482 @Override 483 protected void doStop() { 484 stoppedCount.incrementAndGet(); 485 notifyStopped(); 486 } 487 488 @Override 489 public boolean canReplicateToSameCluster() { 490 return true; 491 } 492 } 493 494 /** 495 * Not used by unit tests, helpful for manual testing with replication. 496 * <p> 497 * Snippet for `hbase shell`: 498 * 499 * <pre> 500 * create 't', 'f' 501 * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \ 502 * 'ReplicationEndpointTestBase$SleepingReplicationEndpointForTest' 503 * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1} 504 * </pre> 505 */ 506 public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { 507 private long duration; 508 509 public SleepingReplicationEndpointForTest() { 510 super(); 511 } 512 513 @Override 514 public void init(Context context) throws IOException { 515 super.init(context); 516 if (this.ctx != null) { 517 duration = this.ctx.getConfiguration() 518 .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L); 519 } 520 } 521 522 @Override 523 public boolean replicate(ReplicateContext context) { 524 try { 525 Thread.sleep(duration); 526 } catch (InterruptedException e) { 527 Thread.currentThread().interrupt(); 528 return false; 529 } 530 return super.replicate(context); 531 } 532 } 533 534 public static class InterClusterReplicationEndpointForTest 535 extends HBaseInterClusterReplicationEndpoint { 536 537 static AtomicInteger replicateCount = new AtomicInteger(); 538 static boolean failedOnce; 539 540 public InterClusterReplicationEndpointForTest() { 541 replicateCount.set(0); 542 } 543 544 @Override 545 public boolean replicate(ReplicateContext replicateContext) { 546 boolean success = super.replicate(replicateContext); 547 if (success) { 548 replicateCount.addAndGet(replicateContext.entries.size()); 549 } 550 return success; 551 } 552 553 @Override 554 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 555 int timeout) { 556 // Fail only once, we don't want to slow down the test. 557 if (failedOnce) { 558 return CompletableFuture.completedFuture(ordinal); 559 } else { 560 failedOnce = true; 561 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 562 future.completeExceptionally(new IOException("Sample Exception: Failed to replicate.")); 563 return future; 564 } 565 } 566 } 567 568 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 569 static int COUNT = 10; 570 static AtomicReference<Exception> ex = new AtomicReference<>(null); 571 static AtomicBoolean replicated = new AtomicBoolean(false); 572 573 @Override 574 public boolean replicate(ReplicateContext replicateContext) { 575 try { 576 // check row 577 doAssert(row); 578 } catch (Exception e) { 579 ex.set(e); 580 } 581 582 super.replicate(replicateContext); 583 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 584 585 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 586 return replicated.get(); 587 } 588 } 589 590 // return a WALEntry filter which only accepts "row", but not other rows 591 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 592 static AtomicReference<Exception> ex = new AtomicReference<>(null); 593 594 @Override 595 public boolean replicate(ReplicateContext replicateContext) { 596 try { 597 super.replicate(replicateContext); 598 doAssert(row); 599 } catch (Exception e) { 600 ex.set(e); 601 } 602 return true; 603 } 604 605 @Override 606 public WALEntryFilter getWALEntryfilter() { 607 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 608 @Override 609 public Entry filter(Entry entry) { 610 ArrayList<Cell> cells = entry.getEdit().getCells(); 611 int size = cells.size(); 612 for (int i = size - 1; i >= 0; i--) { 613 Cell cell = cells.get(i); 614 if ( 615 !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0, 616 row.length) 617 ) { 618 cells.remove(i); 619 } 620 } 621 return entry; 622 } 623 }); 624 } 625 } 626 627 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 628 private static boolean passedEntry = false; 629 630 @Override 631 public Entry filter(Entry entry) { 632 passedEntry = true; 633 return entry; 634 } 635 636 public static boolean hasPassedAnEntry() { 637 return passedEntry; 638 } 639 } 640 641 public static class EverythingPassesWALEntryFilterSubclass 642 extends EverythingPassesWALEntryFilter { 643 } 644}