001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.mockito.Mockito.doNothing; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.spy; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.when; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.TreeMap; 031import java.util.UUID; 032import java.util.concurrent.Callable; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicReference; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 048import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; 050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; 054import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.testclassification.ReplicationTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 059import org.apache.hadoop.hbase.util.Pair; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.hbase.wal.WAL.Entry; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.apache.hadoop.hbase.wal.WALKeyImpl; 064import org.apache.hadoop.hbase.zookeeper.ZKConfig; 065import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 066import org.junit.AfterClass; 067import org.junit.Assert; 068import org.junit.Before; 069import org.junit.BeforeClass; 070import org.junit.ClassRule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076/** 077 * Tests ReplicationSource and ReplicationEndpoint interactions 078 */ 079@Category({ ReplicationTests.class, MediumTests.class }) 080public class TestReplicationEndpoint extends TestReplicationBase { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestReplicationEndpoint.class); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class); 087 088 static int numRegionServers; 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 TestReplicationBase.setUpBeforeClass(); 093 numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 094 } 095 096 @AfterClass 097 public static void tearDownAfterClass() throws Exception { 098 TestReplicationBase.tearDownAfterClass(); 099 // check stop is called 100 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 101 } 102 103 @Before 104 public void setup() throws Exception { 105 ReplicationEndpointForTest.contructedCount.set(0); 106 ReplicationEndpointForTest.startedCount.set(0); 107 ReplicationEndpointForTest.replicateCount.set(0); 108 ReplicationEndpointReturningFalse.replicated.set(false); 109 ReplicationEndpointForTest.lastEntries = null; 110 final List<RegionServerThread> rsThreads = 111 UTIL1.getMiniHBaseCluster().getRegionServerThreads(); 112 for (RegionServerThread rs : rsThreads) { 113 UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 114 } 115 // Wait for all log roll to finish 116 UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 117 @Override 118 public boolean evaluate() throws Exception { 119 for (RegionServerThread rs : rsThreads) { 120 if (!rs.getRegionServer().walRollRequestFinished()) { 121 return false; 122 } 123 } 124 return true; 125 } 126 127 @Override 128 public String explainFailure() throws Exception { 129 List<String> logRollInProgressRsList = new ArrayList<>(); 130 for (RegionServerThread rs : rsThreads) { 131 if (!rs.getRegionServer().walRollRequestFinished()) { 132 logRollInProgressRsList.add(rs.getRegionServer().toString()); 133 } 134 } 135 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 136 } 137 }); 138 } 139 140 @Test 141 public void testCustomReplicationEndpoint() throws Exception { 142 // test installing a custom replication endpoint other than the default one. 143 admin.addPeer("testCustomReplicationEndpoint", 144 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 145 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); 146 147 // check whether the class has been constructed and started 148 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 149 @Override 150 public boolean evaluate() throws Exception { 151 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 152 } 153 }); 154 155 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 156 @Override 157 public boolean evaluate() throws Exception { 158 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 159 } 160 }); 161 162 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 163 164 // now replicate some data. 165 doPut(Bytes.toBytes("row42")); 166 167 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 168 @Override 169 public boolean evaluate() throws Exception { 170 return ReplicationEndpointForTest.replicateCount.get() >= 1; 171 } 172 }); 173 174 doAssert(Bytes.toBytes("row42")); 175 176 admin.removePeer("testCustomReplicationEndpoint"); 177 } 178 179 @Test 180 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 181 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 182 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 183 int peerCount = admin.getPeersCount(); 184 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 185 admin.addPeer(id, 186 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 187 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); 188 // This test is flakey and then there is so much stuff flying around in here its, hard to 189 // debug. Peer needs to be up for the edit to make it across. This wait on 190 // peer count seems to be a hack that has us not progress till peer is up. 191 if (admin.getPeersCount() <= peerCount) { 192 LOG.info("Waiting on peercount to go up from " + peerCount); 193 Threads.sleep(100); 194 } 195 // now replicate some data 196 doPut(row); 197 198 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 199 @Override 200 public boolean evaluate() throws Exception { 201 // Looks like replication endpoint returns false unless we put more than 10 edits. We 202 // only send over one edit. 203 int count = ReplicationEndpointForTest.replicateCount.get(); 204 LOG.info("count=" + count); 205 return ReplicationEndpointReturningFalse.replicated.get(); 206 } 207 }); 208 if (ReplicationEndpointReturningFalse.ex.get() != null) { 209 throw ReplicationEndpointReturningFalse.ex.get(); 210 } 211 212 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); 213 } 214 215 @Test 216 public void testInterClusterReplication() throws Exception { 217 final String id = "testInterClusterReplication"; 218 219 List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName); 220 // This trick of waiting on peer to show up is taken from test above. 221 int peerCount = admin.getPeersCount(); 222 admin.addPeer(id, 223 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) 224 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), 225 null); 226 // This test is flakey and then there is so much stuff flying around in here its, hard to 227 // debug. Peer needs to be up for the edit to make it across. This wait on 228 // peer count seems to be a hack that has us not progress till peer is up. 229 if (admin.getPeersCount() <= peerCount) { 230 LOG.info("Waiting on peercount to go up from " + peerCount); 231 Threads.sleep(100); 232 } 233 234 int totEdits = 0; 235 236 // Make sure edits are spread across regions because we do region based batching 237 // before shipping edits. 238 for (HRegion region: regions) { 239 RegionInfo hri = region.getRegionInfo(); 240 byte[] row = hri.getStartKey(); 241 for (int i = 0; i < 100; i++) { 242 if (row.length > 0) { 243 Put put = new Put(row); 244 put.addColumn(famName, row, row); 245 region.put(put); 246 totEdits++; 247 } 248 } 249 } 250 251 final int numEdits = totEdits; 252 LOG.info("Waiting on replication of {}", numEdits); 253 Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() { 254 @Override 255 public boolean evaluate() throws Exception { 256 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 257 } 258 259 @Override 260 public String explainFailure() throws Exception { 261 String failure = "Failed to replicate all edits, expected = " + numEdits 262 + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get(); 263 return failure; 264 } 265 }); 266 267 admin.removePeer("testInterClusterReplication"); 268 UTIL1.deleteTableData(tableName); 269 } 270 271 @Test 272 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 273 ReplicationPeerConfig rpc = 274 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 275 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 276 // test that we can create mutliple WALFilters reflectively 277 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 278 EverythingPassesWALEntryFilter.class.getName() + "," + 279 EverythingPassesWALEntryFilterSubclass.class.getName()); 280 admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 281 // now replicate some data. 282 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 283 doPut(connection, Bytes.toBytes("row1")); 284 doPut(connection, row); 285 doPut(connection, Bytes.toBytes("row2")); 286 } 287 288 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 289 @Override 290 public boolean evaluate() throws Exception { 291 return ReplicationEndpointForTest.replicateCount.get() >= 1; 292 } 293 }); 294 295 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 296 //make sure our reflectively created filter is in the filter chain 297 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 298 admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); 299 } 300 301 @Test(expected = IOException.class) 302 public void testWALEntryFilterAddValidation() throws Exception { 303 ReplicationPeerConfig rpc = 304 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 305 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 306 // test that we can create mutliple WALFilters reflectively 307 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 308 "IAmNotARealWalEntryFilter"); 309 admin.addPeer("testWALEntryFilterAddValidation", rpc); 310 } 311 312 @Test(expected = IOException.class) 313 public void testWALEntryFilterUpdateValidation() throws Exception { 314 ReplicationPeerConfig rpc = 315 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 316 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 317 // test that we can create mutliple WALFilters reflectively 318 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 319 "IAmNotARealWalEntryFilter"); 320 admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); 321 } 322 323 @Test 324 public void testMetricsSourceBaseSourcePassThrough() { 325 /* 326 * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, 327 * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, 328 * so that metrics get written to both namespaces. Both of those classes wrap a 329 * MetricsReplicationSourceImpl that implements BaseSource, which allows 330 * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on 331 * MetricsSource actually calls down through the two layers of wrapping to the actual 332 * BaseSource. 333 */ 334 String id = "id"; 335 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 336 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 337 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 338 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 339 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 340 341 MetricsReplicationSourceSource singleSourceSource = 342 new MetricsReplicationSourceSourceImpl(singleRms, id); 343 MetricsReplicationGlobalSourceSource globalSourceSource = 344 new MetricsReplicationGlobalSourceSourceImpl(globalRms); 345 MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); 346 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); 347 348 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = 349 new HashMap<>(); 350 MetricsSource source = new MetricsSource(id, singleSourceSource, 351 spyglobalSourceSource, singleSourceSourceByTable); 352 353 354 String gaugeName = "gauge"; 355 String singleGaugeName = "source.id." + gaugeName; 356 String globalGaugeName = "source." + gaugeName; 357 long delta = 1; 358 String counterName = "counter"; 359 String singleCounterName = "source.id." + counterName; 360 String globalCounterName = "source." + counterName; 361 long count = 2; 362 source.decGauge(gaugeName, delta); 363 source.getMetricsContext(); 364 source.getMetricsDescription(); 365 source.getMetricsJmxContext(); 366 source.getMetricsName(); 367 source.incCounters(counterName, count); 368 source.incGauge(gaugeName, delta); 369 source.init(); 370 source.removeMetric(gaugeName); 371 source.setGauge(gaugeName, delta); 372 source.updateHistogram(counterName, count); 373 source.incrFailedRecoveryQueue(); 374 375 376 verify(singleRms).decGauge(singleGaugeName, delta); 377 verify(globalRms).decGauge(globalGaugeName, delta); 378 verify(globalRms).getMetricsContext(); 379 verify(globalRms).getMetricsJmxContext(); 380 verify(globalRms).getMetricsName(); 381 verify(singleRms).incCounters(singleCounterName, count); 382 verify(globalRms).incCounters(globalCounterName, count); 383 verify(singleRms).incGauge(singleGaugeName, delta); 384 verify(globalRms).incGauge(globalGaugeName, delta); 385 verify(globalRms).init(); 386 verify(singleRms).removeMetric(singleGaugeName); 387 verify(globalRms).removeMetric(globalGaugeName); 388 verify(singleRms).setGauge(singleGaugeName, delta); 389 verify(globalRms).setGauge(globalGaugeName, delta); 390 verify(singleRms).updateHistogram(singleCounterName, count); 391 verify(globalRms).updateHistogram(globalCounterName, count); 392 verify(spyglobalSourceSource).incrFailedRecoveryQueue(); 393 394 //check singleSourceSourceByTable metrics. 395 // singleSourceSourceByTable map entry will be created only 396 // after calling #setAgeOfLastShippedOpByTable 397 boolean containsRandomNewTable = source.getSingleSourceSourceByTable() 398 .containsKey("RandomNewTable"); 399 Assert.assertEquals(false, containsRandomNewTable); 400 source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); 401 containsRandomNewTable = source.getSingleSourceSourceByTable() 402 .containsKey("RandomNewTable"); 403 Assert.assertEquals(true, containsRandomNewTable); 404 MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable() 405 .get("RandomNewTable"); 406 407 // age should be greater than zero we created the entry with time in the past 408 Assert.assertTrue(msr.getLastShippedAge() > 0); 409 Assert.assertTrue(msr.getShippedBytes() > 0); 410 411 } 412 413 private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) { 414 List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>(); 415 byte[] a = new byte[] { 'a' }; 416 Entry entry = createEntry(tableName, null, a); 417 walEntriesWithSize.add(new Pair<>(entry, 10L)); 418 return walEntriesWithSize; 419 } 420 421 private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) { 422 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), 423 System.currentTimeMillis() - 1L, 424 scopes); 425 WALEdit edit1 = new WALEdit(); 426 427 for (byte[] kv : kvs) { 428 edit1.add(new KeyValue(kv, kv, kv)); 429 } 430 return new Entry(key1, edit1); 431 } 432 433 private void doPut(byte[] row) throws IOException { 434 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 435 doPut(connection, row); 436 } 437 } 438 439 private void doPut(final Connection connection, final byte [] row) throws IOException { 440 try (Table t = connection.getTable(tableName)) { 441 Put put = new Put(row); 442 put.addColumn(famName, row, row); 443 t.put(put); 444 } 445 } 446 447 private static void doAssert(byte[] row) throws Exception { 448 if (ReplicationEndpointForTest.lastEntries == null) { 449 return; // first call 450 } 451 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 452 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 453 Assert.assertEquals(1, cells.size()); 454 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 455 cells.get(0).getRowLength(), row, 0, row.length)); 456 } 457 458 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 459 static UUID uuid = UTIL1.getRandomUUID(); 460 static AtomicInteger contructedCount = new AtomicInteger(); 461 static AtomicInteger startedCount = new AtomicInteger(); 462 static AtomicInteger stoppedCount = new AtomicInteger(); 463 static AtomicInteger replicateCount = new AtomicInteger(); 464 static volatile List<Entry> lastEntries = null; 465 466 public ReplicationEndpointForTest() { 467 replicateCount.set(0); 468 contructedCount.incrementAndGet(); 469 } 470 471 @Override 472 public UUID getPeerUUID() { 473 return uuid; 474 } 475 476 @Override 477 public boolean replicate(ReplicateContext replicateContext) { 478 replicateCount.incrementAndGet(); 479 lastEntries = new ArrayList<>(replicateContext.entries); 480 return true; 481 } 482 483 @Override 484 public void start() { 485 startAsync(); 486 } 487 488 @Override 489 public void stop() { 490 stopAsync(); 491 } 492 493 @Override 494 protected void doStart() { 495 startedCount.incrementAndGet(); 496 notifyStarted(); 497 } 498 499 @Override 500 protected void doStop() { 501 stoppedCount.incrementAndGet(); 502 notifyStopped(); 503 } 504 505 @Override 506 public boolean canReplicateToSameCluster() { 507 return true; 508 } 509 } 510 511 /** 512 * Not used by unit tests, helpful for manual testing with replication. 513 * <p> 514 * Snippet for `hbase shell`: 515 * <pre> 516 * create 't', 'f' 517 * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \ 518 * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest' 519 * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1} 520 * </pre> 521 */ 522 public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { 523 private long duration; 524 public SleepingReplicationEndpointForTest() { 525 super(); 526 } 527 528 @Override 529 public void init(Context context) throws IOException { 530 super.init(context); 531 if (this.ctx != null) { 532 duration = this.ctx.getConfiguration().getLong( 533 "hbase.test.sleep.replication.endpoint.duration.millis", 5000L); 534 } 535 } 536 537 @Override 538 public boolean replicate(ReplicateContext context) { 539 try { 540 Thread.sleep(duration); 541 } catch (InterruptedException e) { 542 Thread.currentThread().interrupt(); 543 return false; 544 } 545 return super.replicate(context); 546 } 547 } 548 549 public static class InterClusterReplicationEndpointForTest 550 extends HBaseInterClusterReplicationEndpoint { 551 552 static AtomicInteger replicateCount = new AtomicInteger(); 553 static boolean failedOnce; 554 555 public InterClusterReplicationEndpointForTest() { 556 replicateCount.set(0); 557 } 558 559 @Override 560 public boolean replicate(ReplicateContext replicateContext) { 561 boolean success = super.replicate(replicateContext); 562 if (success) { 563 replicateCount.addAndGet(replicateContext.entries.size()); 564 } 565 return success; 566 } 567 568 @Override 569 protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { 570 // Fail only once, we don't want to slow down the test. 571 if (failedOnce) { 572 return () -> ordinal; 573 } else { 574 failedOnce = true; 575 return () -> { 576 throw new IOException("Sample Exception: Failed to replicate."); 577 }; 578 } 579 } 580 } 581 582 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 583 static int COUNT = 10; 584 static AtomicReference<Exception> ex = new AtomicReference<>(null); 585 static AtomicBoolean replicated = new AtomicBoolean(false); 586 @Override 587 public boolean replicate(ReplicateContext replicateContext) { 588 try { 589 // check row 590 doAssert(row); 591 } catch (Exception e) { 592 ex.set(e); 593 } 594 595 super.replicate(replicateContext); 596 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 597 598 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 599 return replicated.get(); 600 } 601 } 602 603 // return a WALEntry filter which only accepts "row", but not other rows 604 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 605 static AtomicReference<Exception> ex = new AtomicReference<>(null); 606 607 @Override 608 public boolean replicate(ReplicateContext replicateContext) { 609 try { 610 super.replicate(replicateContext); 611 doAssert(row); 612 } catch (Exception e) { 613 ex.set(e); 614 } 615 return true; 616 } 617 618 @Override 619 public WALEntryFilter getWALEntryfilter() { 620 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 621 @Override 622 public Entry filter(Entry entry) { 623 ArrayList<Cell> cells = entry.getEdit().getCells(); 624 int size = cells.size(); 625 for (int i = size-1; i >= 0; i--) { 626 Cell cell = cells.get(i); 627 if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), 628 row, 0, row.length)) { 629 cells.remove(i); 630 } 631 } 632 return entry; 633 } 634 }); 635 } 636 } 637 638 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 639 private static boolean passedEntry = false; 640 @Override 641 public Entry filter(Entry entry) { 642 passedEntry = true; 643 return entry; 644 } 645 646 public static boolean hasPassedAnEntry() { 647 return passedEntry; 648 } 649 } 650 651 public static class EverythingPassesWALEntryFilterSubclass 652 extends EverythingPassesWALEntryFilter { 653 } 654}