001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.mockito.Mockito.doNothing; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.spy; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.when; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.UUID; 031import java.util.concurrent.Callable; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.Waiter; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 045import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 046import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 047import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 048import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 049import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.ReplicationTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 054import org.apache.hadoop.hbase.util.Threads; 055import org.apache.hadoop.hbase.wal.WAL.Entry; 056import org.apache.hadoop.hbase.zookeeper.ZKConfig; 057import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 058import org.junit.AfterClass; 059import org.junit.Assert; 060import org.junit.Before; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * Tests ReplicationSource and ReplicationEndpoint interactions 070 */ 071@Category({ ReplicationTests.class, MediumTests.class }) 072public class TestReplicationEndpoint extends TestReplicationBase { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestReplicationEndpoint.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class); 079 080 static int numRegionServers; 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 TestReplicationBase.setUpBeforeClass(); 085 numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 086 } 087 088 @AfterClass 089 public static void tearDownAfterClass() throws Exception { 090 TestReplicationBase.tearDownAfterClass(); 091 // check stop is called 092 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 093 } 094 095 @Before 096 public void setup() throws Exception { 097 ReplicationEndpointForTest.contructedCount.set(0); 098 ReplicationEndpointForTest.startedCount.set(0); 099 ReplicationEndpointForTest.replicateCount.set(0); 100 ReplicationEndpointReturningFalse.replicated.set(false); 101 ReplicationEndpointForTest.lastEntries = null; 102 final List<RegionServerThread> rsThreads = 103 UTIL1.getMiniHBaseCluster().getRegionServerThreads(); 104 for (RegionServerThread rs : rsThreads) { 105 UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 106 } 107 // Wait for all log roll to finish 108 UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 109 @Override 110 public boolean evaluate() throws Exception { 111 for (RegionServerThread rs : rsThreads) { 112 if (!rs.getRegionServer().walRollRequestFinished()) { 113 return false; 114 } 115 } 116 return true; 117 } 118 119 @Override 120 public String explainFailure() throws Exception { 121 List<String> logRollInProgressRsList = new ArrayList<>(); 122 for (RegionServerThread rs : rsThreads) { 123 if (!rs.getRegionServer().walRollRequestFinished()) { 124 logRollInProgressRsList.add(rs.getRegionServer().toString()); 125 } 126 } 127 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 128 } 129 }); 130 } 131 132 @Test 133 public void testCustomReplicationEndpoint() throws Exception { 134 // test installing a custom replication endpoint other than the default one. 135 admin.addPeer("testCustomReplicationEndpoint", 136 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 137 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); 138 139 // check whether the class has been constructed and started 140 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 141 @Override 142 public boolean evaluate() throws Exception { 143 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 144 } 145 }); 146 147 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 148 @Override 149 public boolean evaluate() throws Exception { 150 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 151 } 152 }); 153 154 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 155 156 // now replicate some data. 157 doPut(Bytes.toBytes("row42")); 158 159 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 160 @Override 161 public boolean evaluate() throws Exception { 162 return ReplicationEndpointForTest.replicateCount.get() >= 1; 163 } 164 }); 165 166 doAssert(Bytes.toBytes("row42")); 167 168 admin.removePeer("testCustomReplicationEndpoint"); 169 } 170 171 @Test 172 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 173 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 174 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 175 int peerCount = admin.getPeersCount(); 176 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 177 admin.addPeer(id, 178 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 179 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); 180 // This test is flakey and then there is so much stuff flying around in here its, hard to 181 // debug. Peer needs to be up for the edit to make it across. This wait on 182 // peer count seems to be a hack that has us not progress till peer is up. 183 if (admin.getPeersCount() <= peerCount) { 184 LOG.info("Waiting on peercount to go up from " + peerCount); 185 Threads.sleep(100); 186 } 187 // now replicate some data 188 doPut(row); 189 190 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 191 @Override 192 public boolean evaluate() throws Exception { 193 // Looks like replication endpoint returns false unless we put more than 10 edits. We 194 // only send over one edit. 195 int count = ReplicationEndpointForTest.replicateCount.get(); 196 LOG.info("count=" + count); 197 return ReplicationEndpointReturningFalse.replicated.get(); 198 } 199 }); 200 if (ReplicationEndpointReturningFalse.ex.get() != null) { 201 throw ReplicationEndpointReturningFalse.ex.get(); 202 } 203 204 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); 205 } 206 207 @Test 208 public void testInterClusterReplication() throws Exception { 209 final String id = "testInterClusterReplication"; 210 211 List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName); 212 // This trick of waiting on peer to show up is taken from test above. 213 int peerCount = admin.getPeersCount(); 214 admin.addPeer(id, 215 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) 216 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), 217 null); 218 // This test is flakey and then there is so much stuff flying around in here its, hard to 219 // debug. Peer needs to be up for the edit to make it across. This wait on 220 // peer count seems to be a hack that has us not progress till peer is up. 221 if (admin.getPeersCount() <= peerCount) { 222 LOG.info("Waiting on peercount to go up from " + peerCount); 223 Threads.sleep(100); 224 } 225 226 int totEdits = 0; 227 228 // Make sure edits are spread across regions because we do region based batching 229 // before shipping edits. 230 for (HRegion region: regions) { 231 RegionInfo hri = region.getRegionInfo(); 232 byte[] row = hri.getStartKey(); 233 for (int i = 0; i < 100; i++) { 234 if (row.length > 0) { 235 Put put = new Put(row); 236 put.addColumn(famName, row, row); 237 region.put(put); 238 totEdits++; 239 } 240 } 241 } 242 243 final int numEdits = totEdits; 244 LOG.info("Waiting on replication of {}", numEdits); 245 Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() { 246 @Override 247 public boolean evaluate() throws Exception { 248 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 249 } 250 251 @Override 252 public String explainFailure() throws Exception { 253 String failure = "Failed to replicate all edits, expected = " + numEdits 254 + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get(); 255 return failure; 256 } 257 }); 258 259 admin.removePeer("testInterClusterReplication"); 260 UTIL1.deleteTableData(tableName); 261 } 262 263 @Test 264 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 265 ReplicationPeerConfig rpc = 266 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 267 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 268 // test that we can create mutliple WALFilters reflectively 269 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 270 EverythingPassesWALEntryFilter.class.getName() + "," + 271 EverythingPassesWALEntryFilterSubclass.class.getName()); 272 admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 273 // now replicate some data. 274 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 275 doPut(connection, Bytes.toBytes("row1")); 276 doPut(connection, row); 277 doPut(connection, Bytes.toBytes("row2")); 278 } 279 280 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 281 @Override 282 public boolean evaluate() throws Exception { 283 return ReplicationEndpointForTest.replicateCount.get() >= 1; 284 } 285 }); 286 287 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 288 //make sure our reflectively created filter is in the filter chain 289 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 290 admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); 291 } 292 293 @Test(expected = IOException.class) 294 public void testWALEntryFilterAddValidation() throws Exception { 295 ReplicationPeerConfig rpc = 296 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 297 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 298 // test that we can create mutliple WALFilters reflectively 299 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 300 "IAmNotARealWalEntryFilter"); 301 admin.addPeer("testWALEntryFilterAddValidation", rpc); 302 } 303 304 @Test(expected = IOException.class) 305 public void testWALEntryFilterUpdateValidation() throws Exception { 306 ReplicationPeerConfig rpc = 307 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 308 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 309 // test that we can create mutliple WALFilters reflectively 310 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 311 "IAmNotARealWalEntryFilter"); 312 admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); 313 } 314 315 @Test 316 public void testMetricsSourceBaseSourcePassthrough() { 317 /* 318 * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a 319 * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of 320 * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows 321 * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on 322 * MetricsSource actually calls down through the two layers of wrapping to the actual 323 * BaseSource. 324 */ 325 String id = "id"; 326 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 327 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 328 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 329 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 330 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 331 332 MetricsReplicationSourceSource singleSourceSource = 333 new MetricsReplicationSourceSourceImpl(singleRms, id); 334 MetricsReplicationSourceSource globalSourceSource = 335 new MetricsReplicationGlobalSourceSource(globalRms); 336 MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); 337 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); 338 339 Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>(); 340 MetricsSource source = 341 new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); 342 343 344 String gaugeName = "gauge"; 345 String singleGaugeName = "source.id." + gaugeName; 346 String globalGaugeName = "source." + gaugeName; 347 long delta = 1; 348 String counterName = "counter"; 349 String singleCounterName = "source.id." + counterName; 350 String globalCounterName = "source." + counterName; 351 long count = 2; 352 source.decGauge(gaugeName, delta); 353 source.getMetricsContext(); 354 source.getMetricsDescription(); 355 source.getMetricsJmxContext(); 356 source.getMetricsName(); 357 source.incCounters(counterName, count); 358 source.incGauge(gaugeName, delta); 359 source.init(); 360 source.removeMetric(gaugeName); 361 source.setGauge(gaugeName, delta); 362 source.updateHistogram(counterName, count); 363 source.incrFailedRecoveryQueue(); 364 365 366 verify(singleRms).decGauge(singleGaugeName, delta); 367 verify(globalRms).decGauge(globalGaugeName, delta); 368 verify(globalRms).getMetricsContext(); 369 verify(globalRms).getMetricsJmxContext(); 370 verify(globalRms).getMetricsName(); 371 verify(singleRms).incCounters(singleCounterName, count); 372 verify(globalRms).incCounters(globalCounterName, count); 373 verify(singleRms).incGauge(singleGaugeName, delta); 374 verify(globalRms).incGauge(globalGaugeName, delta); 375 verify(globalRms).init(); 376 verify(singleRms).removeMetric(singleGaugeName); 377 verify(globalRms).removeMetric(globalGaugeName); 378 verify(singleRms).setGauge(singleGaugeName, delta); 379 verify(globalRms).setGauge(globalGaugeName, delta); 380 verify(singleRms).updateHistogram(singleCounterName, count); 381 verify(globalRms).updateHistogram(globalCounterName, count); 382 verify(spyglobalSourceSource).incrFailedRecoveryQueue(); 383 384 //check singleSourceSourceByTable metrics. 385 // singleSourceSourceByTable map entry will be created only 386 // after calling #setAgeOfLastShippedOpByTable 387 boolean containsRandomNewTable = source.getSingleSourceSourceByTable() 388 .containsKey("RandomNewTable"); 389 Assert.assertEquals(false, containsRandomNewTable); 390 source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable"); 391 containsRandomNewTable = source.getSingleSourceSourceByTable() 392 .containsKey("RandomNewTable"); 393 Assert.assertEquals(true, containsRandomNewTable); 394 MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable() 395 .get("RandomNewTable"); 396 // cannot put more concreate value here to verify because the age is arbitrary. 397 // as long as it's greater than 0, we see it as correct answer. 398 Assert.assertTrue(msr.getLastShippedAge() > 0); 399 400 } 401 402 private void doPut(byte[] row) throws IOException { 403 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 404 doPut(connection, row); 405 } 406 } 407 408 private void doPut(final Connection connection, final byte [] row) throws IOException { 409 try (Table t = connection.getTable(tableName)) { 410 Put put = new Put(row); 411 put.addColumn(famName, row, row); 412 t.put(put); 413 } 414 } 415 416 private static void doAssert(byte[] row) throws Exception { 417 if (ReplicationEndpointForTest.lastEntries == null) { 418 return; // first call 419 } 420 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 421 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 422 Assert.assertEquals(1, cells.size()); 423 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 424 cells.get(0).getRowLength(), row, 0, row.length)); 425 } 426 427 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 428 static UUID uuid = UTIL1.getRandomUUID(); 429 static AtomicInteger contructedCount = new AtomicInteger(); 430 static AtomicInteger startedCount = new AtomicInteger(); 431 static AtomicInteger stoppedCount = new AtomicInteger(); 432 static AtomicInteger replicateCount = new AtomicInteger(); 433 static volatile List<Entry> lastEntries = null; 434 435 public ReplicationEndpointForTest() { 436 replicateCount.set(0); 437 contructedCount.incrementAndGet(); 438 } 439 440 @Override 441 public UUID getPeerUUID() { 442 return uuid; 443 } 444 445 @Override 446 public boolean replicate(ReplicateContext replicateContext) { 447 replicateCount.incrementAndGet(); 448 lastEntries = new ArrayList<>(replicateContext.entries); 449 return true; 450 } 451 452 @Override 453 public void start() { 454 startAsync(); 455 } 456 457 @Override 458 public void stop() { 459 stopAsync(); 460 } 461 462 @Override 463 protected void doStart() { 464 startedCount.incrementAndGet(); 465 notifyStarted(); 466 } 467 468 @Override 469 protected void doStop() { 470 stoppedCount.incrementAndGet(); 471 notifyStopped(); 472 } 473 } 474 475 public static class InterClusterReplicationEndpointForTest 476 extends HBaseInterClusterReplicationEndpoint { 477 478 static AtomicInteger replicateCount = new AtomicInteger(); 479 static boolean failedOnce; 480 481 public InterClusterReplicationEndpointForTest() { 482 replicateCount.set(0); 483 } 484 485 @Override 486 public boolean replicate(ReplicateContext replicateContext) { 487 boolean success = super.replicate(replicateContext); 488 if (success) { 489 replicateCount.addAndGet(replicateContext.entries.size()); 490 } 491 return success; 492 } 493 494 @Override 495 protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { 496 // Fail only once, we don't want to slow down the test. 497 if (failedOnce) { 498 return () -> ordinal; 499 } else { 500 failedOnce = true; 501 return () -> { 502 throw new IOException("Sample Exception: Failed to replicate."); 503 }; 504 } 505 } 506 } 507 508 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 509 static int COUNT = 10; 510 static AtomicReference<Exception> ex = new AtomicReference<>(null); 511 static AtomicBoolean replicated = new AtomicBoolean(false); 512 @Override 513 public boolean replicate(ReplicateContext replicateContext) { 514 try { 515 // check row 516 doAssert(row); 517 } catch (Exception e) { 518 ex.set(e); 519 } 520 521 super.replicate(replicateContext); 522 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 523 524 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 525 return replicated.get(); 526 } 527 } 528 529 // return a WALEntry filter which only accepts "row", but not other rows 530 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 531 static AtomicReference<Exception> ex = new AtomicReference<>(null); 532 533 @Override 534 public boolean replicate(ReplicateContext replicateContext) { 535 try { 536 super.replicate(replicateContext); 537 doAssert(row); 538 } catch (Exception e) { 539 ex.set(e); 540 } 541 return true; 542 } 543 544 @Override 545 public WALEntryFilter getWALEntryfilter() { 546 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 547 @Override 548 public Entry filter(Entry entry) { 549 ArrayList<Cell> cells = entry.getEdit().getCells(); 550 int size = cells.size(); 551 for (int i = size-1; i >= 0; i--) { 552 Cell cell = cells.get(i); 553 if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), 554 row, 0, row.length)) { 555 cells.remove(i); 556 } 557 } 558 return entry; 559 } 560 }); 561 } 562 } 563 564 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 565 private static boolean passedEntry = false; 566 @Override 567 public Entry filter(Entry entry) { 568 passedEntry = true; 569 return entry; 570 } 571 572 public static boolean hasPassedAnEntry() { 573 return passedEntry; 574 } 575 } 576 577 public static class EverythingPassesWALEntryFilterSubclass 578 extends EverythingPassesWALEntryFilter { 579 } 580}