001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.mockito.Mockito.doNothing; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.spy; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.UUID; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; 051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; 055import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.testclassification.ReplicationTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.util.Threads; 063import org.apache.hadoop.hbase.wal.WAL.Entry; 064import org.apache.hadoop.hbase.wal.WALEdit; 065import org.apache.hadoop.hbase.wal.WALKeyImpl; 066import org.apache.hadoop.hbase.zookeeper.ZKConfig; 067import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 068import org.junit.AfterClass; 069import org.junit.Assert; 070import org.junit.Before; 071import org.junit.BeforeClass; 072import org.junit.ClassRule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078/** 079 * Tests ReplicationSource and ReplicationEndpoint interactions 080 */ 081@Category({ ReplicationTests.class, MediumTests.class }) 082public class TestReplicationEndpoint extends TestReplicationBase { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestReplicationEndpoint.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class); 089 090 static int numRegionServers; 091 092 @BeforeClass 093 public static void setUpBeforeClass() throws Exception { 094 TestReplicationBase.setUpBeforeClass(); 095 numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 096 } 097 098 @AfterClass 099 public static void tearDownAfterClass() throws Exception { 100 TestReplicationBase.tearDownAfterClass(); 101 // check stop is called 102 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 103 } 104 105 @Before 106 public void setup() throws Exception { 107 ReplicationEndpointForTest.contructedCount.set(0); 108 ReplicationEndpointForTest.startedCount.set(0); 109 ReplicationEndpointForTest.replicateCount.set(0); 110 ReplicationEndpointReturningFalse.replicated.set(false); 111 ReplicationEndpointForTest.lastEntries = null; 112 final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads(); 113 for (RegionServerThread rs : rsThreads) { 114 UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 115 } 116 // Wait for all log roll to finish 117 UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 118 @Override 119 public boolean evaluate() throws Exception { 120 for (RegionServerThread rs : rsThreads) { 121 if (!rs.getRegionServer().walRollRequestFinished()) { 122 return false; 123 } 124 } 125 return true; 126 } 127 128 @Override 129 public String explainFailure() throws Exception { 130 List<String> logRollInProgressRsList = new ArrayList<>(); 131 for (RegionServerThread rs : rsThreads) { 132 if (!rs.getRegionServer().walRollRequestFinished()) { 133 logRollInProgressRsList.add(rs.getRegionServer().toString()); 134 } 135 } 136 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 137 } 138 }); 139 } 140 141 @Test 142 public void testCustomReplicationEndpoint() throws Exception { 143 // test installing a custom replication endpoint other than the default one. 144 hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint", 145 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 146 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build()); 147 148 // check whether the class has been constructed and started 149 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 150 @Override 151 public boolean evaluate() throws Exception { 152 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 153 } 154 }); 155 156 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 157 @Override 158 public boolean evaluate() throws Exception { 159 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 160 } 161 }); 162 163 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 164 165 // now replicate some data. 166 doPut(Bytes.toBytes("row42")); 167 168 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 169 @Override 170 public boolean evaluate() throws Exception { 171 return ReplicationEndpointForTest.replicateCount.get() >= 1; 172 } 173 }); 174 175 doAssert(Bytes.toBytes("row42")); 176 177 hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint"); 178 } 179 180 @Test 181 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 182 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 183 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 184 int peerCount = hbaseAdmin.listReplicationPeers().size(); 185 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 186 hbaseAdmin.addReplicationPeer(id, 187 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 188 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()).build()); 189 // This test is flakey and then there is so much stuff flying around in here its, hard to 190 // debug. Peer needs to be up for the edit to make it across. This wait on 191 // peer count seems to be a hack that has us not progress till peer is up. 192 if (hbaseAdmin.listReplicationPeers().size() <= peerCount) { 193 LOG.info("Waiting on peercount to go up from " + peerCount); 194 Threads.sleep(100); 195 } 196 // now replicate some data 197 doPut(row); 198 199 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 200 @Override 201 public boolean evaluate() throws Exception { 202 // Looks like replication endpoint returns false unless we put more than 10 edits. We 203 // only send over one edit. 204 int count = ReplicationEndpointForTest.replicateCount.get(); 205 LOG.info("count=" + count); 206 return ReplicationEndpointReturningFalse.replicated.get(); 207 } 208 }); 209 if (ReplicationEndpointReturningFalse.ex.get() != null) { 210 throw ReplicationEndpointReturningFalse.ex.get(); 211 } 212 213 hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate"); 214 } 215 216 @Test 217 public void testInterClusterReplication() throws Exception { 218 final String id = "testInterClusterReplication"; 219 220 List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName); 221 int totEdits = 0; 222 223 // Make sure edits are spread across regions because we do region based batching 224 // before shipping edits. 225 for (HRegion region : regions) { 226 RegionInfo hri = region.getRegionInfo(); 227 byte[] row = hri.getStartKey(); 228 for (int i = 0; i < 100; i++) { 229 if (row.length > 0) { 230 Put put = new Put(row); 231 put.addColumn(famName, row, row); 232 region.put(put); 233 totEdits++; 234 } 235 } 236 } 237 238 hbaseAdmin.addReplicationPeer(id, 239 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) 240 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()) 241 .build()); 242 243 final int numEdits = totEdits; 244 Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() { 245 @Override 246 public boolean evaluate() throws Exception { 247 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 248 } 249 250 @Override 251 public String explainFailure() throws Exception { 252 String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " 253 + InterClusterReplicationEndpointForTest.replicateCount.get(); 254 return failure; 255 } 256 }); 257 258 hbaseAdmin.removeReplicationPeer("testInterClusterReplication"); 259 UTIL1.deleteTableData(tableName); 260 } 261 262 @Test 263 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 264 ReplicationPeerConfig rpc = 265 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 266 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 267 // test that we can create mutliple WALFilters reflectively 268 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 269 EverythingPassesWALEntryFilter.class.getName() + "," 270 + EverythingPassesWALEntryFilterSubclass.class.getName()) 271 .build(); 272 273 hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 274 // now replicate some data. 275 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 276 doPut(connection, Bytes.toBytes("row1")); 277 doPut(connection, row); 278 doPut(connection, Bytes.toBytes("row2")); 279 } 280 281 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 282 @Override 283 public boolean evaluate() throws Exception { 284 return ReplicationEndpointForTest.replicateCount.get() >= 1; 285 } 286 }); 287 288 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 289 // make sure our reflectively created filter is in the filter chain 290 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 291 hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint"); 292 } 293 294 @Test(expected = IOException.class) 295 public void testWALEntryFilterAddValidation() throws Exception { 296 ReplicationPeerConfig rpc = 297 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 298 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 299 // test that we can create mutliple WALFilters reflectively 300 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 301 "IAmNotARealWalEntryFilter") 302 .build(); 303 hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc); 304 } 305 306 @Test(expected = IOException.class) 307 public void testWALEntryFilterUpdateValidation() throws Exception { 308 ReplicationPeerConfig rpc = 309 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 310 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 311 // test that we can create mutliple WALFilters reflectively 312 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 313 "IAmNotARealWalEntryFilter") 314 .build(); 315 hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc); 316 } 317 318 @Test 319 public void testMetricsSourceBaseSourcePassThrough() { 320 /* 321 * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, 322 * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that 323 * metrics get written to both namespaces. Both of those classes wrap a 324 * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics. 325 * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls 326 * down through the two layers of wrapping to the actual BaseSource. 327 */ 328 String id = "id"; 329 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 330 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 331 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 332 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 333 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 334 335 MetricsReplicationSourceSource singleSourceSource = 336 new MetricsReplicationSourceSourceImpl(singleRms, id); 337 MetricsReplicationGlobalSourceSource globalSourceSource = 338 new MetricsReplicationGlobalSourceSourceImpl(globalRms); 339 MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); 340 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); 341 342 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>(); 343 MetricsSource source = 344 new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); 345 346 String gaugeName = "gauge"; 347 String singleGaugeName = "source.id." + gaugeName; 348 String globalGaugeName = "source." + gaugeName; 349 long delta = 1; 350 String counterName = "counter"; 351 String singleCounterName = "source.id." + counterName; 352 String globalCounterName = "source." + counterName; 353 long count = 2; 354 source.decGauge(gaugeName, delta); 355 source.getMetricsContext(); 356 source.getMetricsDescription(); 357 source.getMetricsJmxContext(); 358 source.getMetricsName(); 359 source.incCounters(counterName, count); 360 source.incGauge(gaugeName, delta); 361 source.init(); 362 source.removeMetric(gaugeName); 363 source.setGauge(gaugeName, delta); 364 source.updateHistogram(counterName, count); 365 source.incrFailedRecoveryQueue(); 366 367 verify(singleRms).decGauge(singleGaugeName, delta); 368 verify(globalRms).decGauge(globalGaugeName, delta); 369 verify(globalRms).getMetricsContext(); 370 verify(globalRms).getMetricsJmxContext(); 371 verify(globalRms).getMetricsName(); 372 verify(singleRms).incCounters(singleCounterName, count); 373 verify(globalRms).incCounters(globalCounterName, count); 374 verify(singleRms).incGauge(singleGaugeName, delta); 375 verify(globalRms).incGauge(globalGaugeName, delta); 376 verify(globalRms).init(); 377 verify(singleRms).removeMetric(singleGaugeName); 378 verify(globalRms).removeMetric(globalGaugeName); 379 verify(singleRms).setGauge(singleGaugeName, delta); 380 verify(globalRms).setGauge(globalGaugeName, delta); 381 verify(singleRms).updateHistogram(singleCounterName, count); 382 verify(globalRms).updateHistogram(globalCounterName, count); 383 verify(spyglobalSourceSource).incrFailedRecoveryQueue(); 384 385 // check singleSourceSourceByTable metrics. 386 // singleSourceSourceByTable map entry will be created only 387 // after calling #setAgeOfLastShippedOpByTable 388 boolean containsRandomNewTable = 389 source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 390 Assert.assertEquals(false, containsRandomNewTable); 391 source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); 392 containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 393 Assert.assertEquals(true, containsRandomNewTable); 394 MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable"); 395 396 // age should be greater than zero we created the entry with time in the past 397 Assert.assertTrue(msr.getLastShippedAge() > 0); 398 Assert.assertTrue(msr.getShippedBytes() > 0); 399 400 } 401 402 private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) { 403 List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>(); 404 byte[] a = new byte[] { 'a' }; 405 Entry entry = createEntry(tableName, null, a); 406 walEntriesWithSize.add(new Pair<>(entry, 10L)); 407 return walEntriesWithSize; 408 } 409 410 private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) { 411 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), 412 EnvironmentEdgeManager.currentTime() - 1L, scopes); 413 WALEdit edit1 = new WALEdit(); 414 415 for (byte[] kv : kvs) { 416 edit1.add(new KeyValue(kv, kv, kv)); 417 } 418 return new Entry(key1, edit1); 419 } 420 421 private void doPut(byte[] row) throws IOException { 422 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 423 doPut(connection, row); 424 } 425 } 426 427 private void doPut(final Connection connection, final byte[] row) throws IOException { 428 try (Table t = connection.getTable(tableName)) { 429 Put put = new Put(row); 430 put.addColumn(famName, row, row); 431 t.put(put); 432 } 433 } 434 435 private static void doAssert(byte[] row) throws Exception { 436 if (ReplicationEndpointForTest.lastEntries == null) { 437 return; // first call 438 } 439 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 440 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 441 Assert.assertEquals(1, cells.size()); 442 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 443 cells.get(0).getRowLength(), row, 0, row.length)); 444 } 445 446 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 447 static UUID uuid = UTIL1.getRandomUUID(); 448 static AtomicInteger contructedCount = new AtomicInteger(); 449 static AtomicInteger startedCount = new AtomicInteger(); 450 static AtomicInteger stoppedCount = new AtomicInteger(); 451 static AtomicInteger replicateCount = new AtomicInteger(); 452 static volatile List<Entry> lastEntries = null; 453 454 public ReplicationEndpointForTest() { 455 replicateCount.set(0); 456 contructedCount.incrementAndGet(); 457 } 458 459 @Override 460 public UUID getPeerUUID() { 461 return uuid; 462 } 463 464 @Override 465 public boolean replicate(ReplicateContext replicateContext) { 466 replicateCount.incrementAndGet(); 467 lastEntries = new ArrayList<>(replicateContext.entries); 468 return true; 469 } 470 471 @Override 472 public void start() { 473 startAsync(); 474 } 475 476 @Override 477 public void stop() { 478 stopAsync(); 479 } 480 481 @Override 482 protected void doStart() { 483 startedCount.incrementAndGet(); 484 notifyStarted(); 485 } 486 487 @Override 488 protected void doStop() { 489 stoppedCount.incrementAndGet(); 490 notifyStopped(); 491 } 492 493 @Override 494 public boolean canReplicateToSameCluster() { 495 return true; 496 } 497 } 498 499 /** 500 * Not used by unit tests, helpful for manual testing with replication. 501 * <p> 502 * Snippet for `hbase shell`: 503 * 504 * <pre> 505 * create 't', 'f' 506 * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \ 507 * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest' 508 * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1} 509 * </pre> 510 */ 511 public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { 512 private long duration; 513 514 public SleepingReplicationEndpointForTest() { 515 super(); 516 } 517 518 @Override 519 public void init(Context context) throws IOException { 520 super.init(context); 521 if (this.ctx != null) { 522 duration = this.ctx.getConfiguration() 523 .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L); 524 } 525 } 526 527 @Override 528 public boolean replicate(ReplicateContext context) { 529 try { 530 Thread.sleep(duration); 531 } catch (InterruptedException e) { 532 Thread.currentThread().interrupt(); 533 return false; 534 } 535 return super.replicate(context); 536 } 537 } 538 539 public static class InterClusterReplicationEndpointForTest 540 extends HBaseInterClusterReplicationEndpoint { 541 542 static AtomicInteger replicateCount = new AtomicInteger(); 543 static boolean failedOnce; 544 545 public InterClusterReplicationEndpointForTest() { 546 replicateCount.set(0); 547 } 548 549 @Override 550 public boolean replicate(ReplicateContext replicateContext) { 551 boolean success = super.replicate(replicateContext); 552 if (success) { 553 replicateCount.addAndGet(replicateContext.entries.size()); 554 } 555 return success; 556 } 557 558 @Override 559 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 560 int timeout) { 561 // Fail only once, we don't want to slow down the test. 562 if (failedOnce) { 563 return CompletableFuture.completedFuture(ordinal); 564 } else { 565 failedOnce = true; 566 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 567 future.completeExceptionally(new IOException("Sample Exception: Failed to replicate.")); 568 return future; 569 } 570 } 571 } 572 573 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 574 static int COUNT = 10; 575 static AtomicReference<Exception> ex = new AtomicReference<>(null); 576 static AtomicBoolean replicated = new AtomicBoolean(false); 577 578 @Override 579 public boolean replicate(ReplicateContext replicateContext) { 580 try { 581 // check row 582 doAssert(row); 583 } catch (Exception e) { 584 ex.set(e); 585 } 586 587 super.replicate(replicateContext); 588 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 589 590 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 591 return replicated.get(); 592 } 593 } 594 595 // return a WALEntry filter which only accepts "row", but not other rows 596 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 597 static AtomicReference<Exception> ex = new AtomicReference<>(null); 598 599 @Override 600 public boolean replicate(ReplicateContext replicateContext) { 601 try { 602 super.replicate(replicateContext); 603 doAssert(row); 604 } catch (Exception e) { 605 ex.set(e); 606 } 607 return true; 608 } 609 610 @Override 611 public WALEntryFilter getWALEntryfilter() { 612 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 613 @Override 614 public Entry filter(Entry entry) { 615 ArrayList<Cell> cells = entry.getEdit().getCells(); 616 int size = cells.size(); 617 for (int i = size - 1; i >= 0; i--) { 618 Cell cell = cells.get(i); 619 if ( 620 !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0, 621 row.length) 622 ) { 623 cells.remove(i); 624 } 625 } 626 return entry; 627 } 628 }); 629 } 630 } 631 632 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 633 private static boolean passedEntry = false; 634 635 @Override 636 public Entry filter(Entry entry) { 637 passedEntry = true; 638 return entry; 639 } 640 641 public static boolean hasPassedAnEntry() { 642 return passedEntry; 643 } 644 } 645 646 public static class EverythingPassesWALEntryFilterSubclass 647 extends EverythingPassesWALEntryFilter { 648 } 649}