001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.mockito.Mockito.doNothing; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.spy; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.UUID; 033import java.util.concurrent.Callable; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; 051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; 055import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.testclassification.ReplicationTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.util.Threads; 063import org.apache.hadoop.hbase.wal.WAL.Entry; 064import org.apache.hadoop.hbase.wal.WALEdit; 065import org.apache.hadoop.hbase.wal.WALKeyImpl; 066import org.apache.hadoop.hbase.zookeeper.ZKConfig; 067import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 068import org.junit.AfterClass; 069import org.junit.Assert; 070import org.junit.Before; 071import org.junit.BeforeClass; 072import org.junit.ClassRule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078/** 079 * Tests ReplicationSource and ReplicationEndpoint interactions 080 */ 081@Category({ ReplicationTests.class, MediumTests.class }) 082public class TestReplicationEndpoint extends TestReplicationBase { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestReplicationEndpoint.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class); 089 090 static int numRegionServers; 091 092 @BeforeClass 093 public static void setUpBeforeClass() throws Exception { 094 TestReplicationBase.setUpBeforeClass(); 095 numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 096 } 097 098 @AfterClass 099 public static void tearDownAfterClass() throws Exception { 100 TestReplicationBase.tearDownAfterClass(); 101 // check stop is called 102 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 103 } 104 105 @Before 106 public void setup() throws Exception { 107 ReplicationEndpointForTest.contructedCount.set(0); 108 ReplicationEndpointForTest.startedCount.set(0); 109 ReplicationEndpointForTest.replicateCount.set(0); 110 ReplicationEndpointReturningFalse.replicated.set(false); 111 ReplicationEndpointForTest.lastEntries = null; 112 final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads(); 113 for (RegionServerThread rs : rsThreads) { 114 UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 115 } 116 // Wait for all log roll to finish 117 UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 118 @Override 119 public boolean evaluate() throws Exception { 120 for (RegionServerThread rs : rsThreads) { 121 if (!rs.getRegionServer().walRollRequestFinished()) { 122 return false; 123 } 124 } 125 return true; 126 } 127 128 @Override 129 public String explainFailure() throws Exception { 130 List<String> logRollInProgressRsList = new ArrayList<>(); 131 for (RegionServerThread rs : rsThreads) { 132 if (!rs.getRegionServer().walRollRequestFinished()) { 133 logRollInProgressRsList.add(rs.getRegionServer().toString()); 134 } 135 } 136 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 137 } 138 }); 139 } 140 141 @Test 142 public void testCustomReplicationEndpoint() throws Exception { 143 // test installing a custom replication endpoint other than the default one. 144 admin.addPeer("testCustomReplicationEndpoint", 145 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 146 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), 147 null); 148 149 // check whether the class has been constructed and started 150 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 151 @Override 152 public boolean evaluate() throws Exception { 153 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 154 } 155 }); 156 157 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 158 @Override 159 public boolean evaluate() throws Exception { 160 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 161 } 162 }); 163 164 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 165 166 // now replicate some data. 167 doPut(Bytes.toBytes("row42")); 168 169 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 170 @Override 171 public boolean evaluate() throws Exception { 172 return ReplicationEndpointForTest.replicateCount.get() >= 1; 173 } 174 }); 175 176 doAssert(Bytes.toBytes("row42")); 177 178 admin.removePeer("testCustomReplicationEndpoint"); 179 } 180 181 @Test 182 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 183 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 184 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 185 int peerCount = admin.getPeersCount(); 186 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 187 admin.addPeer(id, 188 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 189 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), 190 null); 191 // This test is flakey and then there is so much stuff flying around in here its, hard to 192 // debug. Peer needs to be up for the edit to make it across. This wait on 193 // peer count seems to be a hack that has us not progress till peer is up. 194 if (admin.getPeersCount() <= peerCount) { 195 LOG.info("Waiting on peercount to go up from " + peerCount); 196 Threads.sleep(100); 197 } 198 // now replicate some data 199 doPut(row); 200 201 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 202 @Override 203 public boolean evaluate() throws Exception { 204 // Looks like replication endpoint returns false unless we put more than 10 edits. We 205 // only send over one edit. 206 int count = ReplicationEndpointForTest.replicateCount.get(); 207 LOG.info("count=" + count); 208 return ReplicationEndpointReturningFalse.replicated.get(); 209 } 210 }); 211 if (ReplicationEndpointReturningFalse.ex.get() != null) { 212 throw ReplicationEndpointReturningFalse.ex.get(); 213 } 214 215 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); 216 } 217 218 @Test 219 public void testInterClusterReplication() throws Exception { 220 final String id = "testInterClusterReplication"; 221 222 List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName); 223 // This trick of waiting on peer to show up is taken from test above. 224 int peerCount = admin.getPeersCount(); 225 admin 226 .addPeer(id, 227 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) 228 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), 229 null); 230 // This test is flakey and then there is so much stuff flying around in here its, hard to 231 // debug. Peer needs to be up for the edit to make it across. This wait on 232 // peer count seems to be a hack that has us not progress till peer is up. 233 if (admin.getPeersCount() <= peerCount) { 234 LOG.info("Waiting on peercount to go up from " + peerCount); 235 Threads.sleep(100); 236 } 237 238 int totEdits = 0; 239 240 // Make sure edits are spread across regions because we do region based batching 241 // before shipping edits. 242 for (HRegion region : regions) { 243 RegionInfo hri = region.getRegionInfo(); 244 byte[] row = hri.getStartKey(); 245 for (int i = 0; i < 100; i++) { 246 if (row.length > 0) { 247 Put put = new Put(row); 248 put.addColumn(famName, row, row); 249 region.put(put); 250 totEdits++; 251 } 252 } 253 } 254 255 final int numEdits = totEdits; 256 LOG.info("Waiting on replication of {}", numEdits); 257 Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() { 258 @Override 259 public boolean evaluate() throws Exception { 260 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 261 } 262 263 @Override 264 public String explainFailure() throws Exception { 265 String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " 266 + InterClusterReplicationEndpointForTest.replicateCount.get(); 267 return failure; 268 } 269 }); 270 271 admin.removePeer("testInterClusterReplication"); 272 UTIL1.deleteTableData(tableName); 273 } 274 275 @Test 276 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 277 ReplicationPeerConfig rpc = 278 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 279 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 280 // test that we can create mutliple WALFilters reflectively 281 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 282 EverythingPassesWALEntryFilter.class.getName() + "," 283 + EverythingPassesWALEntryFilterSubclass.class.getName()); 284 admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 285 // now replicate some data. 286 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 287 doPut(connection, Bytes.toBytes("row1")); 288 doPut(connection, row); 289 doPut(connection, Bytes.toBytes("row2")); 290 } 291 292 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 293 @Override 294 public boolean evaluate() throws Exception { 295 return ReplicationEndpointForTest.replicateCount.get() >= 1; 296 } 297 }); 298 299 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 300 // make sure our reflectively created filter is in the filter chain 301 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 302 admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); 303 } 304 305 @Test(expected = IOException.class) 306 public void testWALEntryFilterAddValidation() throws Exception { 307 ReplicationPeerConfig rpc = 308 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 309 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 310 // test that we can create mutliple WALFilters reflectively 311 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 312 "IAmNotARealWalEntryFilter"); 313 admin.addPeer("testWALEntryFilterAddValidation", rpc); 314 } 315 316 @Test(expected = IOException.class) 317 public void testWALEntryFilterUpdateValidation() throws Exception { 318 ReplicationPeerConfig rpc = 319 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 320 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 321 // test that we can create mutliple WALFilters reflectively 322 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 323 "IAmNotARealWalEntryFilter"); 324 admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); 325 } 326 327 @Test 328 public void testMetricsSourceBaseSourcePassThrough() { 329 /* 330 * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, 331 * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that 332 * metrics get written to both namespaces. Both of those classes wrap a 333 * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics. 334 * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls 335 * down through the two layers of wrapping to the actual BaseSource. 336 */ 337 String id = "id"; 338 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 339 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 340 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 341 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 342 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 343 344 MetricsReplicationSourceSource singleSourceSource = 345 new MetricsReplicationSourceSourceImpl(singleRms, id); 346 MetricsReplicationGlobalSourceSource globalSourceSource = 347 new MetricsReplicationGlobalSourceSourceImpl(globalRms); 348 MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); 349 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); 350 351 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>(); 352 MetricsSource source = 353 new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); 354 355 String gaugeName = "gauge"; 356 String singleGaugeName = "source.id." + gaugeName; 357 String globalGaugeName = "source." + gaugeName; 358 long delta = 1; 359 String counterName = "counter"; 360 String singleCounterName = "source.id." + counterName; 361 String globalCounterName = "source." + counterName; 362 long count = 2; 363 source.decGauge(gaugeName, delta); 364 source.getMetricsContext(); 365 source.getMetricsDescription(); 366 source.getMetricsJmxContext(); 367 source.getMetricsName(); 368 source.incCounters(counterName, count); 369 source.incGauge(gaugeName, delta); 370 source.init(); 371 source.removeMetric(gaugeName); 372 source.setGauge(gaugeName, delta); 373 source.updateHistogram(counterName, count); 374 source.incrFailedRecoveryQueue(); 375 376 verify(singleRms).decGauge(singleGaugeName, delta); 377 verify(globalRms).decGauge(globalGaugeName, delta); 378 verify(globalRms).getMetricsContext(); 379 verify(globalRms).getMetricsJmxContext(); 380 verify(globalRms).getMetricsName(); 381 verify(singleRms).incCounters(singleCounterName, count); 382 verify(globalRms).incCounters(globalCounterName, count); 383 verify(singleRms).incGauge(singleGaugeName, delta); 384 verify(globalRms).incGauge(globalGaugeName, delta); 385 verify(globalRms).init(); 386 verify(singleRms).removeMetric(singleGaugeName); 387 verify(globalRms).removeMetric(globalGaugeName); 388 verify(singleRms).setGauge(singleGaugeName, delta); 389 verify(globalRms).setGauge(globalGaugeName, delta); 390 verify(singleRms).updateHistogram(singleCounterName, count); 391 verify(globalRms).updateHistogram(globalCounterName, count); 392 verify(spyglobalSourceSource).incrFailedRecoveryQueue(); 393 394 // check singleSourceSourceByTable metrics. 395 // singleSourceSourceByTable map entry will be created only 396 // after calling #setAgeOfLastShippedOpByTable 397 boolean containsRandomNewTable = 398 source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 399 Assert.assertEquals(false, containsRandomNewTable); 400 source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); 401 containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 402 Assert.assertEquals(true, containsRandomNewTable); 403 MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable"); 404 405 // age should be greater than zero we created the entry with time in the past 406 Assert.assertTrue(msr.getLastShippedAge() > 0); 407 Assert.assertTrue(msr.getShippedBytes() > 0); 408 409 } 410 411 private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) { 412 List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>(); 413 byte[] a = new byte[] { 'a' }; 414 Entry entry = createEntry(tableName, null, a); 415 walEntriesWithSize.add(new Pair<>(entry, 10L)); 416 return walEntriesWithSize; 417 } 418 419 private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) { 420 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), 421 EnvironmentEdgeManager.currentTime() - 1L, scopes); 422 WALEdit edit1 = new WALEdit(); 423 424 for (byte[] kv : kvs) { 425 edit1.add(new KeyValue(kv, kv, kv)); 426 } 427 return new Entry(key1, edit1); 428 } 429 430 private void doPut(byte[] row) throws IOException { 431 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 432 doPut(connection, row); 433 } 434 } 435 436 private void doPut(final Connection connection, final byte[] row) throws IOException { 437 try (Table t = connection.getTable(tableName)) { 438 Put put = new Put(row); 439 put.addColumn(famName, row, row); 440 t.put(put); 441 } 442 } 443 444 private static void doAssert(byte[] row) throws Exception { 445 if (ReplicationEndpointForTest.lastEntries == null) { 446 return; // first call 447 } 448 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 449 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 450 Assert.assertEquals(1, cells.size()); 451 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 452 cells.get(0).getRowLength(), row, 0, row.length)); 453 } 454 455 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 456 static UUID uuid = UTIL1.getRandomUUID(); 457 static AtomicInteger contructedCount = new AtomicInteger(); 458 static AtomicInteger startedCount = new AtomicInteger(); 459 static AtomicInteger stoppedCount = new AtomicInteger(); 460 static AtomicInteger replicateCount = new AtomicInteger(); 461 static volatile List<Entry> lastEntries = null; 462 463 public ReplicationEndpointForTest() { 464 replicateCount.set(0); 465 contructedCount.incrementAndGet(); 466 } 467 468 @Override 469 public UUID getPeerUUID() { 470 return uuid; 471 } 472 473 @Override 474 public boolean replicate(ReplicateContext replicateContext) { 475 replicateCount.incrementAndGet(); 476 lastEntries = new ArrayList<>(replicateContext.entries); 477 return true; 478 } 479 480 @Override 481 public void start() { 482 startAsync(); 483 } 484 485 @Override 486 public void stop() { 487 stopAsync(); 488 } 489 490 @Override 491 protected void doStart() { 492 startedCount.incrementAndGet(); 493 notifyStarted(); 494 } 495 496 @Override 497 protected void doStop() { 498 stoppedCount.incrementAndGet(); 499 notifyStopped(); 500 } 501 502 @Override 503 public boolean canReplicateToSameCluster() { 504 return true; 505 } 506 } 507 508 /** 509 * Not used by unit tests, helpful for manual testing with replication. 510 * <p> 511 * Snippet for `hbase shell`: 512 * 513 * <pre> 514 * create 't', 'f' 515 * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \ 516 * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest' 517 * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1} 518 * </pre> 519 */ 520 public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { 521 private long duration; 522 523 public SleepingReplicationEndpointForTest() { 524 super(); 525 } 526 527 @Override 528 public void init(Context context) throws IOException { 529 super.init(context); 530 if (this.ctx != null) { 531 duration = this.ctx.getConfiguration() 532 .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L); 533 } 534 } 535 536 @Override 537 public boolean replicate(ReplicateContext context) { 538 try { 539 Thread.sleep(duration); 540 } catch (InterruptedException e) { 541 Thread.currentThread().interrupt(); 542 return false; 543 } 544 return super.replicate(context); 545 } 546 } 547 548 public static class InterClusterReplicationEndpointForTest 549 extends HBaseInterClusterReplicationEndpoint { 550 551 static AtomicInteger replicateCount = new AtomicInteger(); 552 static boolean failedOnce; 553 554 public InterClusterReplicationEndpointForTest() { 555 replicateCount.set(0); 556 } 557 558 @Override 559 public boolean replicate(ReplicateContext replicateContext) { 560 boolean success = super.replicate(replicateContext); 561 if (success) { 562 replicateCount.addAndGet(replicateContext.entries.size()); 563 } 564 return success; 565 } 566 567 @Override 568 protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { 569 // Fail only once, we don't want to slow down the test. 570 if (failedOnce) { 571 return () -> ordinal; 572 } else { 573 failedOnce = true; 574 return () -> { 575 throw new IOException("Sample Exception: Failed to replicate."); 576 }; 577 } 578 } 579 } 580 581 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 582 static int COUNT = 10; 583 static AtomicReference<Exception> ex = new AtomicReference<>(null); 584 static AtomicBoolean replicated = new AtomicBoolean(false); 585 586 @Override 587 public boolean replicate(ReplicateContext replicateContext) { 588 try { 589 // check row 590 doAssert(row); 591 } catch (Exception e) { 592 ex.set(e); 593 } 594 595 super.replicate(replicateContext); 596 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 597 598 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 599 return replicated.get(); 600 } 601 } 602 603 // return a WALEntry filter which only accepts "row", but not other rows 604 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 605 static AtomicReference<Exception> ex = new AtomicReference<>(null); 606 607 @Override 608 public boolean replicate(ReplicateContext replicateContext) { 609 try { 610 super.replicate(replicateContext); 611 doAssert(row); 612 } catch (Exception e) { 613 ex.set(e); 614 } 615 return true; 616 } 617 618 @Override 619 public WALEntryFilter getWALEntryfilter() { 620 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 621 @Override 622 public Entry filter(Entry entry) { 623 ArrayList<Cell> cells = entry.getEdit().getCells(); 624 int size = cells.size(); 625 for (int i = size - 1; i >= 0; i--) { 626 Cell cell = cells.get(i); 627 if ( 628 !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0, 629 row.length) 630 ) { 631 cells.remove(i); 632 } 633 } 634 return entry; 635 } 636 }); 637 } 638 } 639 640 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 641 private static boolean passedEntry = false; 642 643 @Override 644 public Entry filter(Entry entry) { 645 passedEntry = true; 646 return entry; 647 } 648 649 public static boolean hasPassedAnEntry() { 650 return passedEntry; 651 } 652 } 653 654 public static class EverythingPassesWALEntryFilterSubclass 655 extends EverythingPassesWALEntryFilter { 656 } 657}