001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.mockito.Mockito.mock; 021import static org.mockito.Mockito.verify; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.UUID; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.atomic.AtomicReference; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.Waiter; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 041import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 042import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 043import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 044import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 045import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.ReplicationTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.hbase.wal.WAL.Entry; 052import org.apache.hadoop.hbase.zookeeper.ZKConfig; 053import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 054import org.junit.AfterClass; 055import org.junit.Assert; 056import org.junit.Before; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * Tests ReplicationSource and ReplicationEndpoint interactions 066 */ 067@Category({ReplicationTests.class, MediumTests.class}) 068public class TestReplicationEndpoint extends TestReplicationBase { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestReplicationEndpoint.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class); 075 076 static int numRegionServers; 077 078 @BeforeClass 079 public static void setUpBeforeClass() throws Exception { 080 TestReplicationBase.setUpBeforeClass(); 081 admin.removePeer("2"); 082 numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size(); 083 } 084 085 @AfterClass 086 public static void tearDownAfterClass() throws Exception { 087 TestReplicationBase.tearDownAfterClass(); 088 // check stop is called 089 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 090 } 091 092 @Before 093 public void setup() throws Exception { 094 ReplicationEndpointForTest.contructedCount.set(0); 095 ReplicationEndpointForTest.startedCount.set(0); 096 ReplicationEndpointForTest.replicateCount.set(0); 097 ReplicationEndpointReturningFalse.replicated.set(false); 098 ReplicationEndpointForTest.lastEntries = null; 099 final List<RegionServerThread> rsThreads = 100 utility1.getMiniHBaseCluster().getRegionServerThreads(); 101 for (RegionServerThread rs : rsThreads) { 102 utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 103 } 104 // Wait for all log roll to finish 105 utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 106 @Override 107 public boolean evaluate() throws Exception { 108 for (RegionServerThread rs : rsThreads) { 109 if (!rs.getRegionServer().walRollRequestFinished()) { 110 return false; 111 } 112 } 113 return true; 114 } 115 116 @Override 117 public String explainFailure() throws Exception { 118 List<String> logRollInProgressRsList = new ArrayList<>(); 119 for (RegionServerThread rs : rsThreads) { 120 if (!rs.getRegionServer().walRollRequestFinished()) { 121 logRollInProgressRsList.add(rs.getRegionServer().toString()); 122 } 123 } 124 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 125 } 126 }); 127 } 128 129 @Test 130 public void testCustomReplicationEndpoint() throws Exception { 131 // test installing a custom replication endpoint other than the default one. 132 admin.addPeer("testCustomReplicationEndpoint", 133 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 134 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); 135 136 // check whether the class has been constructed and started 137 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 138 @Override 139 public boolean evaluate() throws Exception { 140 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 141 } 142 }); 143 144 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 145 @Override 146 public boolean evaluate() throws Exception { 147 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 148 } 149 }); 150 151 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 152 153 // now replicate some data. 154 doPut(Bytes.toBytes("row42")); 155 156 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 157 @Override 158 public boolean evaluate() throws Exception { 159 return ReplicationEndpointForTest.replicateCount.get() >= 1; 160 } 161 }); 162 163 doAssert(Bytes.toBytes("row42")); 164 165 admin.removePeer("testCustomReplicationEndpoint"); 166 } 167 168 @Test 169 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 170 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 171 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 172 int peerCount = admin.getPeersCount(); 173 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 174 admin.addPeer(id, 175 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 176 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); 177 // This test is flakey and then there is so much stuff flying around in here its, hard to 178 // debug. Peer needs to be up for the edit to make it across. This wait on 179 // peer count seems to be a hack that has us not progress till peer is up. 180 if (admin.getPeersCount() <= peerCount) { 181 LOG.info("Waiting on peercount to go up from " + peerCount); 182 Threads.sleep(100); 183 } 184 // now replicate some data 185 doPut(row); 186 187 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 188 @Override 189 public boolean evaluate() throws Exception { 190 // Looks like replication endpoint returns false unless we put more than 10 edits. We 191 // only send over one edit. 192 int count = ReplicationEndpointForTest.replicateCount.get(); 193 LOG.info("count=" + count); 194 return ReplicationEndpointReturningFalse.replicated.get(); 195 } 196 }); 197 if (ReplicationEndpointReturningFalse.ex.get() != null) { 198 throw ReplicationEndpointReturningFalse.ex.get(); 199 } 200 201 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); 202 } 203 204 @Test 205 public void testInterClusterReplication() throws Exception { 206 final String id = "testInterClusterReplication"; 207 208 List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName); 209 int totEdits = 0; 210 211 // Make sure edits are spread across regions because we do region based batching 212 // before shipping edits. 213 for(HRegion region: regions) { 214 RegionInfo hri = region.getRegionInfo(); 215 byte[] row = hri.getStartKey(); 216 for (int i = 0; i < 100; i++) { 217 if (row.length > 0) { 218 Put put = new Put(row); 219 put.addColumn(famName, row, row); 220 region.put(put); 221 totEdits++; 222 } 223 } 224 } 225 226 admin.addPeer(id, 227 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2)) 228 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), 229 null); 230 231 final int numEdits = totEdits; 232 Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() { 233 @Override 234 public boolean evaluate() throws Exception { 235 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 236 } 237 238 @Override 239 public String explainFailure() throws Exception { 240 String failure = "Failed to replicate all edits, expected = " + numEdits 241 + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get(); 242 return failure; 243 } 244 }); 245 246 admin.removePeer("testInterClusterReplication"); 247 utility1.deleteTableData(tableName); 248 } 249 250 @Test 251 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 252 ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 253 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 254 //test that we can create mutliple WALFilters reflectively 255 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 256 EverythingPassesWALEntryFilter.class.getName() + 257 "," + EverythingPassesWALEntryFilterSubclass.class.getName()); 258 admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 259 // now replicate some data. 260 try (Connection connection = ConnectionFactory.createConnection(conf1)) { 261 doPut(connection, Bytes.toBytes("row1")); 262 doPut(connection, row); 263 doPut(connection, Bytes.toBytes("row2")); 264 } 265 266 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { 267 @Override 268 public boolean evaluate() throws Exception { 269 return ReplicationEndpointForTest.replicateCount.get() >= 1; 270 } 271 }); 272 273 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 274 //make sure our reflectively created filter is in the filter chain 275 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 276 admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); 277 } 278 279 @Test (expected=IOException.class) 280 public void testWALEntryFilterAddValidation() throws Exception { 281 ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 282 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 283 //test that we can create mutliple WALFilters reflectively 284 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 285 "IAmNotARealWalEntryFilter"); 286 admin.addPeer("testWALEntryFilterAddValidation", rpc); 287 } 288 289 @Test (expected=IOException.class) 290 public void testWALEntryFilterUpdateValidation() throws Exception { 291 ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) 292 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); 293 //test that we can create mutliple WALFilters reflectively 294 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 295 "IAmNotARealWalEntryFilter"); 296 admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); 297 } 298 299 300 @Test 301 public void testMetricsSourceBaseSourcePassthrough(){ 302 /* 303 The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl 304 and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. 305 Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which 306 allows for custom JMX metrics. 307 This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through 308 the two layers of wrapping to the actual BaseSource. 309 */ 310 String id = "id"; 311 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 312 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 313 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 314 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 315 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 316 317 MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); 318 MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); 319 MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource); 320 String gaugeName = "gauge"; 321 String singleGaugeName = "source.id." + gaugeName; 322 String globalGaugeName = "source." + gaugeName; 323 long delta = 1; 324 String counterName = "counter"; 325 String singleCounterName = "source.id." + counterName; 326 String globalCounterName = "source." + counterName; 327 long count = 2; 328 source.decGauge(gaugeName, delta); 329 source.getMetricsContext(); 330 source.getMetricsDescription(); 331 source.getMetricsJmxContext(); 332 source.getMetricsName(); 333 source.incCounters(counterName, count); 334 source.incGauge(gaugeName, delta); 335 source.init(); 336 source.removeMetric(gaugeName); 337 source.setGauge(gaugeName, delta); 338 source.updateHistogram(counterName, count); 339 340 verify(singleRms).decGauge(singleGaugeName, delta); 341 verify(globalRms).decGauge(globalGaugeName, delta); 342 verify(globalRms).getMetricsContext(); 343 verify(globalRms).getMetricsJmxContext(); 344 verify(globalRms).getMetricsName(); 345 verify(singleRms).incCounters(singleCounterName, count); 346 verify(globalRms).incCounters(globalCounterName, count); 347 verify(singleRms).incGauge(singleGaugeName, delta); 348 verify(globalRms).incGauge(globalGaugeName, delta); 349 verify(globalRms).init(); 350 verify(singleRms).removeMetric(singleGaugeName); 351 verify(globalRms).removeMetric(globalGaugeName); 352 verify(singleRms).setGauge(singleGaugeName, delta); 353 verify(globalRms).setGauge(globalGaugeName, delta); 354 verify(singleRms).updateHistogram(singleCounterName, count); 355 verify(globalRms).updateHistogram(globalCounterName, count); 356 } 357 358 private void doPut(byte[] row) throws IOException { 359 try (Connection connection = ConnectionFactory.createConnection(conf1)) { 360 doPut(connection, row); 361 } 362 } 363 364 private void doPut(final Connection connection, final byte [] row) throws IOException { 365 try (Table t = connection.getTable(tableName)) { 366 Put put = new Put(row); 367 put.addColumn(famName, row, row); 368 t.put(put); 369 } 370 } 371 372 private static void doAssert(byte[] row) throws Exception { 373 if (ReplicationEndpointForTest.lastEntries == null) { 374 return; // first call 375 } 376 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 377 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 378 Assert.assertEquals(1, cells.size()); 379 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 380 cells.get(0).getRowLength(), row, 0, row.length)); 381 } 382 383 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 384 static UUID uuid = UUID.randomUUID(); 385 static AtomicInteger contructedCount = new AtomicInteger(); 386 static AtomicInteger startedCount = new AtomicInteger(); 387 static AtomicInteger stoppedCount = new AtomicInteger(); 388 static AtomicInteger replicateCount = new AtomicInteger(); 389 static volatile List<Entry> lastEntries = null; 390 391 public ReplicationEndpointForTest() { 392 contructedCount.incrementAndGet(); 393 } 394 395 @Override 396 public UUID getPeerUUID() { 397 return uuid; 398 } 399 400 @Override 401 public boolean replicate(ReplicateContext replicateContext) { 402 replicateCount.incrementAndGet(); 403 lastEntries = new ArrayList<>(replicateContext.entries); 404 return true; 405 } 406 407 @Override 408 public void start() { 409 startAsync(); 410 } 411 412 @Override 413 public void stop() { 414 stopAsync(); 415 } 416 417 @Override 418 protected void doStart() { 419 startedCount.incrementAndGet(); 420 notifyStarted(); 421 } 422 423 @Override 424 protected void doStop() { 425 stoppedCount.incrementAndGet(); 426 notifyStopped(); 427 } 428 } 429 430 public static class InterClusterReplicationEndpointForTest 431 extends HBaseInterClusterReplicationEndpoint { 432 433 static AtomicInteger replicateCount = new AtomicInteger(); 434 static boolean failedOnce; 435 436 @Override 437 public boolean replicate(ReplicateContext replicateContext) { 438 boolean success = super.replicate(replicateContext); 439 if (success) { 440 replicateCount.addAndGet(replicateContext.entries.size()); 441 } 442 return success; 443 } 444 445 @Override 446 protected Replicator createReplicator(List<Entry> entries, int ordinal) { 447 // Fail only once, we don't want to slow down the test. 448 if (failedOnce) { 449 return new DummyReplicator(entries, ordinal); 450 } else { 451 failedOnce = true; 452 return new FailingDummyReplicator(entries, ordinal); 453 } 454 } 455 456 protected class DummyReplicator extends Replicator { 457 458 private int ordinal; 459 460 public DummyReplicator(List<Entry> entries, int ordinal) { 461 super(entries, ordinal); 462 this.ordinal = ordinal; 463 } 464 465 @Override 466 public Integer call() throws IOException { 467 return ordinal; 468 } 469 } 470 471 protected class FailingDummyReplicator extends DummyReplicator { 472 473 public FailingDummyReplicator(List<Entry> entries, int ordinal) { 474 super(entries, ordinal); 475 } 476 477 @Override 478 public Integer call() throws IOException { 479 throw new IOException("Sample Exception: Failed to replicate."); 480 } 481 } 482 } 483 484 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 485 static int COUNT = 10; 486 static AtomicReference<Exception> ex = new AtomicReference<>(null); 487 static AtomicBoolean replicated = new AtomicBoolean(false); 488 @Override 489 public boolean replicate(ReplicateContext replicateContext) { 490 try { 491 // check row 492 doAssert(row); 493 } catch (Exception e) { 494 ex.set(e); 495 } 496 497 super.replicate(replicateContext); 498 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 499 500 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 501 return replicated.get(); 502 } 503 } 504 505 // return a WALEntry filter which only accepts "row", but not other rows 506 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 507 static AtomicReference<Exception> ex = new AtomicReference<>(null); 508 509 @Override 510 public boolean replicate(ReplicateContext replicateContext) { 511 try { 512 super.replicate(replicateContext); 513 doAssert(row); 514 } catch (Exception e) { 515 ex.set(e); 516 } 517 return true; 518 } 519 520 @Override 521 public WALEntryFilter getWALEntryfilter() { 522 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 523 @Override 524 public Entry filter(Entry entry) { 525 ArrayList<Cell> cells = entry.getEdit().getCells(); 526 int size = cells.size(); 527 for (int i = size-1; i >= 0; i--) { 528 Cell cell = cells.get(i); 529 if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), 530 row, 0, row.length)) { 531 cells.remove(i); 532 } 533 } 534 return entry; 535 } 536 }); 537 } 538 } 539 540 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 541 private static boolean passedEntry = false; 542 @Override 543 public Entry filter(Entry entry) { 544 passedEntry = true; 545 return entry; 546 } 547 548 public static boolean hasPassedAnEntry(){ 549 return passedEntry; 550 } 551 } 552 553 public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter { 554 555 } 556}