001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.mockito.Mockito.doNothing; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.spy; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.UUID; 032import java.util.concurrent.Callable; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicReference; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.Waiter; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 046import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 047import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 048import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 050import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.ReplicationTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hbase.wal.WAL.Entry; 057import org.apache.hadoop.hbase.zookeeper.ZKConfig; 058import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 059import org.junit.AfterClass; 060import org.junit.Assert; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * Tests ReplicationSource and ReplicationEndpoint interactions 071 */ 072@Category({ ReplicationTests.class, MediumTests.class }) 073public class TestReplicationEndpoint extends TestReplicationBase { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestReplicationEndpoint.class); 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class); 080 081 static int numRegionServers; 082 083 @BeforeClass 084 public static void setUpBeforeClass() throws Exception { 085 TestReplicationBase.setUpBeforeClass(); 086 numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size(); 087 } 088 089 @AfterClass 090 public static void tearDownAfterClass() throws Exception { 091 TestReplicationBase.tearDownAfterClass(); 092 // check stop is called 093 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 094 } 095 096 @Before 097 public void setup() throws Exception { 098 ReplicationEndpointForTest.contructedCount.set(0); 099 ReplicationEndpointForTest.startedCount.set(0); 100 ReplicationEndpointForTest.replicateCount.set(0); 101 ReplicationEndpointReturningFalse.replicated.set(false); 102 ReplicationEndpointForTest.lastEntries = null; 103 final List<RegionServerThread> rsThreads = 104 utility1.getMiniHBaseCluster().getRegionServerThreads(); 105 for (RegionServerThread rs : rsThreads) { 106 utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 107 } 108 // Wait for all log roll to finish 109 utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 110 @Override 111 public boolean evaluate() throws Exception { 112 for (RegionServerThread rs : rsThreads) { 113 if (!rs.getRegionServer().walRollRequestFinished()) { 114 return false; 115 } 116 } 117 return true; 118 } 119 120 @Override 121 public String explainFailure() throws Exception { 122 List<String> logRollInProgressRsList = new ArrayList<>(); 123 for (RegionServerThread rs : rsThreads) { 124 if (!rs.getRegionServer().walRollRequestFinished()) { 125 logRollInProgressRsList.add(rs.getRegionServer().toString()); 126 } 127 } 128 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 129 } 130 }); 131 } 132 133 @Test 134 public void testCustomReplicationEndpoint() throws Exception { 135 // test installing a custom replication endpoint other than the default one. 136 admin.addPeer("testCustomReplicationEndpoint", 137 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 138 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); 139 140 // check whether the class has been constructed and started 141 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 142 @Override 143 public boolean evaluate() throws Exception { 144 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 145 } 146 }); 147 148 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 149 @Override 150 public boolean evaluate() throws Exception { 151 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 152 } 153 }); 154 155 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 156 157 // now replicate some data. 158 doPut(Bytes.toBytes("row42")); 159 160 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 161 @Override 162 public boolean evaluate() throws Exception { 163 return ReplicationEndpointForTest.replicateCount.get() >= 1; 164 } 165 }); 166 167 doAssert(Bytes.toBytes("row42")); 168 169 admin.removePeer("testCustomReplicationEndpoint"); 170 } 171 172 @Test 173 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 174 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 175 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 176 int peerCount = admin.getPeersCount(); 177 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 178 admin.addPeer(id, 179 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 180 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); 181 // This test is flakey and then there is so much stuff flying around in here its, hard to 182 // debug. Peer needs to be up for the edit to make it across. This wait on 183 // peer count seems to be a hack that has us not progress till peer is up. 184 if (admin.getPeersCount() <= peerCount) { 185 LOG.info("Waiting on peercount to go up from " + peerCount); 186 Threads.sleep(100); 187 } 188 // now replicate some data 189 doPut(row); 190 191 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 192 @Override 193 public boolean evaluate() throws Exception { 194 // Looks like replication endpoint returns false unless we put more than 10 edits. We 195 // only send over one edit. 196 int count = ReplicationEndpointForTest.replicateCount.get(); 197 LOG.info("count=" + count); 198 return ReplicationEndpointReturningFalse.replicated.get(); 199 } 200 }); 201 if (ReplicationEndpointReturningFalse.ex.get() != null) { 202 throw ReplicationEndpointReturningFalse.ex.get(); 203 } 204 205 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); 206 } 207 208 @Test 209 public void testInterClusterReplication() throws Exception { 210 final String id = "testInterClusterReplication"; 211 212 List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName); 213 int totEdits = 0; 214 215 // Make sure edits are spread across regions because we do region based batching 216 // before shipping edits. 217 for(HRegion region: regions) { 218 RegionInfo hri = region.getRegionInfo(); 219 byte[] row = hri.getStartKey(); 220 for (int i = 0; i < 100; i++) { 221 if (row.length > 0) { 222 Put put = new Put(row); 223 put.addColumn(famName, row, row); 224 region.put(put); 225 totEdits++; 226 } 227 } 228 } 229 230 admin.addPeer(id, 231 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2)) 232 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), 233 null); 234 235 final int numEdits = totEdits; 236 Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() { 237 @Override 238 public boolean evaluate() throws Exception { 239 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 240 } 241 242 @Override 243 public String explainFailure() throws Exception { 244 String failure = "Failed to replicate all edits, expected = " + numEdits 245 + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get(); 246 return failure; 247 } 248 }); 249 250 admin.removePeer("testInterClusterReplication"); 251 utility1.deleteTableData(tableName); 252 } 253 254 @Test 255 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 256 ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 257 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 258 //test that we can create mutliple WALFilters reflectively 259 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 260 EverythingPassesWALEntryFilter.class.getName() + 261 "," + EverythingPassesWALEntryFilterSubclass.class.getName()); 262 admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 263 // now replicate some data. 264 try (Connection connection = ConnectionFactory.createConnection(conf1)) { 265 doPut(connection, Bytes.toBytes("row1")); 266 doPut(connection, row); 267 doPut(connection, Bytes.toBytes("row2")); 268 } 269 270 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 271 @Override 272 public boolean evaluate() throws Exception { 273 return ReplicationEndpointForTest.replicateCount.get() >= 1; 274 } 275 }); 276 277 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 278 //make sure our reflectively created filter is in the filter chain 279 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 280 admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); 281 } 282 283 @Test (expected=IOException.class) 284 public void testWALEntryFilterAddValidation() throws Exception { 285 ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 286 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 287 //test that we can create mutliple WALFilters reflectively 288 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 289 "IAmNotARealWalEntryFilter"); 290 admin.addPeer("testWALEntryFilterAddValidation", rpc); 291 } 292 293 @Test (expected=IOException.class) 294 public void testWALEntryFilterUpdateValidation() throws Exception { 295 ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 296 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 297 //test that we can create mutliple WALFilters reflectively 298 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 299 "IAmNotARealWalEntryFilter"); 300 admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); 301 } 302 303 304 @Test 305 public void testMetricsSourceBaseSourcePassthrough(){ 306 /* 307 The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl 308 and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. 309 Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which 310 allows for custom JMX metrics. 311 This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through 312 the two layers of wrapping to the actual BaseSource. 313 */ 314 String id = "id"; 315 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 316 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 317 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 318 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 319 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 320 321 322 MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); 323 MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); 324 MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); 325 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); 326 327 Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>(); 328 MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource, 329 singleSourceSourceByTable); 330 331 332 String gaugeName = "gauge"; 333 String singleGaugeName = "source.id." + gaugeName; 334 String globalGaugeName = "source." + gaugeName; 335 long delta = 1; 336 String counterName = "counter"; 337 String singleCounterName = "source.id." + counterName; 338 String globalCounterName = "source." + counterName; 339 long count = 2; 340 source.decGauge(gaugeName, delta); 341 source.getMetricsContext(); 342 source.getMetricsDescription(); 343 source.getMetricsJmxContext(); 344 source.getMetricsName(); 345 source.incCounters(counterName, count); 346 source.incGauge(gaugeName, delta); 347 source.init(); 348 source.removeMetric(gaugeName); 349 source.setGauge(gaugeName, delta); 350 source.updateHistogram(counterName, count); 351 source.incrFailedRecoveryQueue(); 352 353 354 verify(singleRms).decGauge(singleGaugeName, delta); 355 verify(globalRms).decGauge(globalGaugeName, delta); 356 verify(globalRms).getMetricsContext(); 357 verify(globalRms).getMetricsJmxContext(); 358 verify(globalRms).getMetricsName(); 359 verify(singleRms).incCounters(singleCounterName, count); 360 verify(globalRms).incCounters(globalCounterName, count); 361 verify(singleRms).incGauge(singleGaugeName, delta); 362 verify(globalRms).incGauge(globalGaugeName, delta); 363 verify(globalRms).init(); 364 verify(singleRms).removeMetric(singleGaugeName); 365 verify(globalRms).removeMetric(globalGaugeName); 366 verify(singleRms).setGauge(singleGaugeName, delta); 367 verify(globalRms).setGauge(globalGaugeName, delta); 368 verify(singleRms).updateHistogram(singleCounterName, count); 369 verify(globalRms).updateHistogram(globalCounterName, count); 370 verify(spyglobalSourceSource).incrFailedRecoveryQueue(); 371 372 //check singleSourceSourceByTable metrics. 373 // singleSourceSourceByTable map entry will be created only 374 // after calling #setAgeOfLastShippedOpByTable 375 boolean containsRandomNewTable = source.getSingleSourceSourceByTable() 376 .containsKey("RandomNewTable"); 377 Assert.assertEquals(false, containsRandomNewTable); 378 source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable"); 379 containsRandomNewTable = source.getSingleSourceSourceByTable() 380 .containsKey("RandomNewTable"); 381 Assert.assertEquals(true, containsRandomNewTable); 382 MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable() 383 .get("RandomNewTable"); 384 // cannot put more concreate value here to verify because the age is arbitrary. 385 // as long as it's greater than 0, we see it as correct answer. 386 Assert.assertTrue(msr.getLastShippedAge() > 0); 387 388 } 389 390 private void doPut(byte[] row) throws IOException { 391 try (Connection connection = ConnectionFactory.createConnection(conf1)) { 392 doPut(connection, row); 393 } 394 } 395 396 private void doPut(final Connection connection, final byte [] row) throws IOException { 397 try (Table t = connection.getTable(tableName)) { 398 Put put = new Put(row); 399 put.addColumn(famName, row, row); 400 t.put(put); 401 } 402 } 403 404 private static void doAssert(byte[] row) throws Exception { 405 if (ReplicationEndpointForTest.lastEntries == null) { 406 return; // first call 407 } 408 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 409 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 410 Assert.assertEquals(1, cells.size()); 411 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 412 cells.get(0).getRowLength(), row, 0, row.length)); 413 } 414 415 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 416 static UUID uuid = utility1.getRandomUUID(); 417 static AtomicInteger contructedCount = new AtomicInteger(); 418 static AtomicInteger startedCount = new AtomicInteger(); 419 static AtomicInteger stoppedCount = new AtomicInteger(); 420 static AtomicInteger replicateCount = new AtomicInteger(); 421 static volatile List<Entry> lastEntries = null; 422 423 public ReplicationEndpointForTest() { 424 replicateCount.set(0); 425 contructedCount.incrementAndGet(); 426 } 427 428 @Override 429 public UUID getPeerUUID() { 430 return uuid; 431 } 432 433 @Override 434 public boolean replicate(ReplicateContext replicateContext) { 435 replicateCount.incrementAndGet(); 436 lastEntries = new ArrayList<>(replicateContext.entries); 437 return true; 438 } 439 440 @Override 441 public void start() { 442 startAsync(); 443 } 444 445 @Override 446 public void stop() { 447 stopAsync(); 448 } 449 450 @Override 451 protected void doStart() { 452 startedCount.incrementAndGet(); 453 notifyStarted(); 454 } 455 456 @Override 457 protected void doStop() { 458 stoppedCount.incrementAndGet(); 459 notifyStopped(); 460 } 461 } 462 463 public static class InterClusterReplicationEndpointForTest 464 extends HBaseInterClusterReplicationEndpoint { 465 466 static AtomicInteger replicateCount = new AtomicInteger(); 467 static boolean failedOnce; 468 469 public InterClusterReplicationEndpointForTest() { 470 replicateCount.set(0); 471 } 472 473 @Override 474 public boolean replicate(ReplicateContext replicateContext) { 475 boolean success = super.replicate(replicateContext); 476 if (success) { 477 replicateCount.addAndGet(replicateContext.entries.size()); 478 } 479 return success; 480 } 481 482 @Override 483 protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { 484 // Fail only once, we don't want to slow down the test. 485 if (failedOnce) { 486 return () -> ordinal; 487 } else { 488 failedOnce = true; 489 return () -> { 490 throw new IOException("Sample Exception: Failed to replicate."); 491 }; 492 } 493 } 494 } 495 496 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 497 static int COUNT = 10; 498 static AtomicReference<Exception> ex = new AtomicReference<>(null); 499 static AtomicBoolean replicated = new AtomicBoolean(false); 500 @Override 501 public boolean replicate(ReplicateContext replicateContext) { 502 try { 503 // check row 504 doAssert(row); 505 } catch (Exception e) { 506 ex.set(e); 507 } 508 509 super.replicate(replicateContext); 510 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 511 512 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 513 return replicated.get(); 514 } 515 } 516 517 // return a WALEntry filter which only accepts "row", but not other rows 518 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 519 static AtomicReference<Exception> ex = new AtomicReference<>(null); 520 521 @Override 522 public boolean replicate(ReplicateContext replicateContext) { 523 try { 524 super.replicate(replicateContext); 525 doAssert(row); 526 } catch (Exception e) { 527 ex.set(e); 528 } 529 return true; 530 } 531 532 @Override 533 public WALEntryFilter getWALEntryfilter() { 534 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 535 @Override 536 public Entry filter(Entry entry) { 537 ArrayList<Cell> cells = entry.getEdit().getCells(); 538 int size = cells.size(); 539 for (int i = size-1; i >= 0; i--) { 540 Cell cell = cells.get(i); 541 if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), 542 row, 0, row.length)) { 543 cells.remove(i); 544 } 545 } 546 return entry; 547 } 548 }); 549 } 550 } 551 552 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 553 private static boolean passedEntry = false; 554 @Override 555 public Entry filter(Entry entry) { 556 passedEntry = true; 557 return entry; 558 } 559 560 public static boolean hasPassedAnEntry(){ 561 return passedEntry; 562 } 563 } 564 565 public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter { 566 567 } 568}