001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.OptionalLong; 032import java.util.UUID; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellBuilderFactory; 041import org.apache.hadoop.hbase.CellBuilderType; 042import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.Server; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.Waiter; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.regionserver.HRegionServer; 055import org.apache.hadoop.hbase.regionserver.RegionServerServices; 056import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 057import org.apache.hadoop.hbase.replication.ReplicationPeer; 058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 059import org.apache.hadoop.hbase.replication.ReplicationQueueData; 060import org.apache.hadoop.hbase.replication.ReplicationQueueId; 061import org.apache.hadoop.hbase.replication.WALEntryFilter; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.ReplicationTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALFactory; 070import org.apache.hadoop.hbase.wal.WALKeyImpl; 071import org.apache.hadoop.hbase.wal.WALProvider; 072import org.apache.hadoop.hbase.wal.WALStreamReader; 073import org.junit.AfterClass; 074import org.junit.BeforeClass; 075import org.junit.ClassRule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.mockito.Mockito; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 083 084@Category({ ReplicationTests.class, MediumTests.class }) 085public class TestReplicationSource { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestReplicationSource.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class); 092 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 093 private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil(); 094 private static FileSystem FS; 095 private static Path oldLogDir; 096 private static Path logDir; 097 private static Configuration conf = TEST_UTIL.getConfiguration(); 098 099 @BeforeClass 100 public static void setUpBeforeClass() throws Exception { 101 TEST_UTIL.startMiniDFSCluster(1); 102 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 103 Path rootDir = TEST_UTIL.createRootDir(); 104 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 105 if (FS.exists(oldLogDir)) { 106 FS.delete(oldLogDir, true); 107 } 108 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 109 if (FS.exists(logDir)) { 110 FS.delete(logDir, true); 111 } 112 } 113 114 @AfterClass 115 public static void tearDownAfterClass() throws Exception { 116 TEST_UTIL_PEER.shutdownMiniHBaseCluster(); 117 TEST_UTIL.shutdownMiniHBaseCluster(); 118 TEST_UTIL.shutdownMiniDFSCluster(); 119 } 120 121 /** 122 * Test the default ReplicationSource skips queuing hbase:meta WAL files. 123 */ 124 @Test 125 public void testDefaultSkipsMetaWAL() throws IOException { 126 ReplicationSource rs = new ReplicationSource(); 127 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 128 conf.setInt("replication.source.maxretriesmultiplier", 1); 129 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 130 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 131 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 132 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 133 Mockito.when(peerConfig.getReplicationEndpointImpl()) 134 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 135 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 136 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 137 Mockito.when(manager.getGlobalMetrics()) 138 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 139 140 RegionServerServices rss = 141 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 142 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 143 rs.init(conf, null, manager, null, mockPeer, rss, 144 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 145 new MetricsSource(queueId.toString())); 146 try { 147 rs.startup(); 148 assertTrue(rs.isSourceActive()); 149 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 150 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 151 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 152 rs.enqueueLog(new Path("a.1")); 153 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 154 } finally { 155 rs.terminate("Done"); 156 rss.stop("Done"); 157 } 158 } 159 160 /** 161 * Test that we filter out meta edits, etc. 162 */ 163 @Test 164 public void testWALEntryFilter() throws IOException { 165 // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource 166 // instance and init it. 167 ReplicationSource rs = new ReplicationSource(); 168 UUID uuid = UUID.randomUUID(); 169 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 170 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 171 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 172 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 173 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 174 Mockito.when(peerConfig.getReplicationEndpointImpl()) 175 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 176 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 177 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 178 RegionServerServices rss = 179 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 180 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 181 rs.init(conf, null, manager, null, mockPeer, rss, 182 new ReplicationQueueData(queueId, ImmutableMap.of()), uuid, p -> OptionalLong.empty(), 183 new MetricsSource(queueId.toString())); 184 try { 185 rs.startup(); 186 TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); 187 WALEntryFilter wef = rs.getWalEntryFilter(); 188 // Test non-system WAL edit. 189 WALEdit we = new WALEdit() 190 .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW) 191 .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build()); 192 WAL.Entry e = new WAL.Entry( 193 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we); 194 assertTrue(wef.filter(e) == e); 195 // Test system WAL edit. 196 e = new WAL.Entry( 197 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we); 198 assertNull(wef.filter(e)); 199 } finally { 200 rs.terminate("Done"); 201 rss.stop("Done"); 202 } 203 } 204 205 /** 206 * Sanity check that we can move logs around while we are reading from them. Should this test 207 * fail, ReplicationSource would have a hard time reading logs that are being archived. 208 */ 209 // This tests doesn't belong in here... it is not about ReplicationSource. 210 @Test 211 public void testLogMoving() throws Exception { 212 Path logPath = new Path(logDir, "log"); 213 if (!FS.exists(logDir)) { 214 FS.mkdirs(logDir); 215 } 216 if (!FS.exists(oldLogDir)) { 217 FS.mkdirs(oldLogDir); 218 } 219 WALProvider.Writer writer = 220 WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); 221 for (int i = 0; i < 3; i++) { 222 byte[] b = Bytes.toBytes(Integer.toString(i)); 223 KeyValue kv = new KeyValue(b, b, b); 224 WALEdit edit = new WALEdit(); 225 edit.add(kv); 226 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 227 writer.append(new WAL.Entry(key, edit)); 228 writer.sync(false); 229 } 230 writer.close(); 231 232 WALStreamReader reader = 233 WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration()); 234 WAL.Entry entry = reader.next(); 235 assertNotNull(entry); 236 237 Path oldLogPath = new Path(oldLogDir, "log"); 238 FS.rename(logPath, oldLogPath); 239 240 entry = reader.next(); 241 assertNotNull(entry); 242 243 reader.next(); 244 entry = reader.next(); 245 246 assertNull(entry); 247 reader.close(); 248 } 249 250 /** 251 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from 252 * TestReplicationSource because doesn't need cluster. 253 */ 254 @Test 255 public void testTerminateTimeout() throws Exception { 256 ReplicationSource source = new ReplicationSource(); 257 ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint(); 258 try { 259 replicationEndpoint.start(); 260 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 261 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 262 Configuration testConf = HBaseConfiguration.create(); 263 testConf.setInt("replication.source.maxretriesmultiplier", 1); 264 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 265 ReplicationQueueId queueId = 266 new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); 267 source.init(testConf, null, manager, null, mockPeer, null, 268 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 269 null); 270 ExecutorService executor = Executors.newSingleThreadExecutor(); 271 Future<?> future = executor.submit(() -> source.terminate("testing source termination")); 272 long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); 273 Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone); 274 } finally { 275 replicationEndpoint.stop(); 276 } 277 } 278 279 @Test 280 public void testTerminateClearsBuffer() throws Exception { 281 ReplicationSource source = new ReplicationSource(); 282 ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null, 283 null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); 284 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 285 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 286 Configuration testConf = HBaseConfiguration.create(); 287 ReplicationQueueId queueId = 288 new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); 289 source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), 290 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 291 mock(MetricsSource.class)); 292 ReplicationSourceWALReader reader = 293 new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); 294 ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader); 295 source.workerThreads.put("testPeer", shipper); 296 WALEntryBatch batch = new WALEntryBatch(10, logDir); 297 WAL.Entry mockEntry = mock(WAL.Entry.class); 298 WALEdit mockEdit = mock(WALEdit.class); 299 WALKeyImpl mockKey = mock(WALKeyImpl.class); 300 when(mockEntry.getEdit()).thenReturn(mockEdit); 301 when(mockEdit.isEmpty()).thenReturn(false); 302 when(mockEntry.getKey()).thenReturn(mockKey); 303 when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); 304 when(mockEdit.heapSize()).thenReturn(10000L); 305 when(mockEdit.size()).thenReturn(0); 306 ArrayList<Cell> cells = new ArrayList<>(); 307 KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"), 308 Bytes.toBytes("v1")); 309 cells.add(kv); 310 when(mockEdit.getCells()).thenReturn(cells); 311 reader.addEntryToBatch(batch, mockEntry); 312 reader.entryBatchQueue.put(batch); 313 source.terminate("test"); 314 assertEquals(0, source.getSourceManager().getTotalBufferUsed()); 315 } 316 317 /** 318 * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192 319 */ 320 @Test 321 public void testServerShutdownRecoveredQueue() throws Exception { 322 try { 323 // Ensure single-threaded WAL 324 conf.set("hbase.wal.provider", "defaultProvider"); 325 conf.setInt("replication.sleep.before.failover", 2000); 326 // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. 327 conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); 328 SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); 329 TEST_UTIL_PEER.startMiniCluster(1); 330 331 HRegionServer serverA = cluster.getRegionServer(0); 332 final ReplicationSourceManager managerA = 333 serverA.getReplicationSourceService().getReplicationManager(); 334 HRegionServer serverB = cluster.getRegionServer(1); 335 final ReplicationSourceManager managerB = 336 serverB.getReplicationSourceService().getReplicationManager(); 337 final Admin admin = TEST_UTIL.getAdmin(); 338 339 final String peerId = "TestPeer"; 340 admin.addReplicationPeer(peerId, 341 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); 342 // Wait for replication sources to come up 343 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 344 @Override 345 public boolean evaluate() { 346 return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); 347 } 348 }); 349 // Disabling peer makes sure there is at least one log to claim when the server dies 350 // The recovered queue will also stay there until the peer is disabled even if the 351 // WALs it contains have no data. 352 admin.disableReplicationPeer(peerId); 353 354 // Stopping serverA 355 // It's queues should be claimed by the only other alive server i.e. serverB 356 cluster.stopRegionServer(serverA.getServerName()); 357 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 358 @Override 359 public boolean evaluate() throws Exception { 360 return managerB.getOldSources().size() == 1; 361 } 362 }); 363 364 final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); 365 serverC.waitForServerOnline(); 366 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 367 @Override 368 public boolean evaluate() throws Exception { 369 return serverC.getReplicationSourceService() != null; 370 } 371 }); 372 final ReplicationSourceManager managerC = 373 ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); 374 // Sanity check 375 assertEquals(0, managerC.getOldSources().size()); 376 377 // Stopping serverB 378 // Now serverC should have two recovered queues: 379 // 1. The serverB's normal queue 380 // 2. serverA's recovered queue on serverB 381 cluster.stopRegionServer(serverB.getServerName()); 382 Waiter.waitFor(conf, 20000, 383 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2); 384 admin.enableReplicationPeer(peerId); 385 Waiter.waitFor(conf, 20000, 386 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0); 387 } finally { 388 conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); 389 } 390 } 391 392 /** 393 * Regionserver implementation that adds a delay on the graceful shutdown. 394 */ 395 public static class ShutdownDelayRegionServer extends HRegionServer { 396 public ShutdownDelayRegionServer(Configuration conf) throws IOException { 397 super(conf); 398 } 399 400 @Override 401 protected void stopServiceThreads() { 402 // Add a delay before service threads are shutdown. 403 // This will keep the zookeeper connection alive for the duration of the delay. 404 LOG.info("Adding a delay to the regionserver shutdown"); 405 try { 406 Thread.sleep(2000); 407 } catch (InterruptedException ex) { 408 LOG.error("Interrupted while sleeping"); 409 } 410 super.stopServiceThreads(); 411 } 412 } 413 414 /** 415 * Deadend Endpoint. Does nothing. 416 */ 417 public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint { 418 private final UUID uuid = UUID.randomUUID(); 419 420 @Override 421 public void init(Context context) throws IOException { 422 this.ctx = context; 423 } 424 425 @Override 426 public WALEntryFilter getWALEntryfilter() { 427 return null; 428 } 429 430 @Override 431 public synchronized UUID getPeerUUID() { 432 return this.uuid; 433 } 434 435 @Override 436 protected void doStart() { 437 notifyStarted(); 438 } 439 440 @Override 441 protected void doStop() { 442 notifyStopped(); 443 } 444 445 @Override 446 public boolean canReplicateToSameCluster() { 447 return true; 448 } 449 } 450 451 /** 452 * Deadend Endpoint. Does nothing. 453 */ 454 public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { 455 456 static int count = 0; 457 458 @Override 459 public synchronized UUID getPeerUUID() { 460 if (count == 0) { 461 count++; 462 throw new RuntimeException(); 463 } else { 464 return super.getPeerUUID(); 465 } 466 } 467 468 } 469 470 /** 471 * Bad Endpoint with failing connection to peer on demand. 472 */ 473 public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { 474 static boolean failing = true; 475 476 @Override 477 public synchronized UUID getPeerUUID() { 478 return failing ? null : super.getPeerUUID(); 479 } 480 } 481 482 public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { 483 484 static int count = 0; 485 486 @Override 487 public synchronized UUID getPeerUUID() { 488 throw new RuntimeException(); 489 } 490 491 } 492 493 private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, 494 String endpointName) throws IOException { 495 conf.setInt("replication.source.maxretriesmultiplier", 1); 496 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 497 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 498 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 499 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 500 FaultyReplicationEndpoint.count = 0; 501 Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); 502 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 503 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 504 Mockito.when(manager.getGlobalMetrics()) 505 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 506 RegionServerServices rss = 507 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 508 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 509 rs.init(conf, null, manager, null, mockPeer, rss, 510 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 511 new MetricsSource(queueId.toString())); 512 return rss; 513 } 514 515 /** 516 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 517 * and <b>eplication.source.regionserver.abort</b> is set to false. 518 */ 519 @Test 520 public void testAbortFalseOnError() throws IOException { 521 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 522 conf.setBoolean("replication.source.regionserver.abort", false); 523 ReplicationSource rs = new ReplicationSource(); 524 RegionServerServices rss = 525 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 526 try { 527 rs.startup(); 528 assertTrue(rs.isSourceActive()); 529 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 530 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 531 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 532 rs.enqueueLog(new Path("a.1")); 533 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 534 } finally { 535 rs.terminate("Done"); 536 rss.stop("Done"); 537 } 538 } 539 540 @Test 541 public void testReplicationSourceInitializingMetric() throws IOException { 542 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 543 conf.setBoolean("replication.source.regionserver.abort", false); 544 ReplicationSource rs = new ReplicationSource(); 545 RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName()); 546 try { 547 rs.startup(); 548 assertTrue(rs.isSourceActive()); 549 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); 550 BadReplicationEndpoint.failing = false; 551 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); 552 } finally { 553 rs.terminate("Done"); 554 rss.stop("Done"); 555 } 556 } 557 558 /** 559 * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, 560 * when <b>replication.source.regionserver.abort</b> is set to false. 561 */ 562 @Test 563 public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { 564 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 565 ReplicationSource rs = new ReplicationSource(); 566 RegionServerServices rss = 567 setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName()); 568 try { 569 rs.startup(); 570 assertTrue(true); 571 } finally { 572 rs.terminate("Done"); 573 rss.stop("Done"); 574 } 575 } 576 577 /** 578 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 579 * and <b>replication.source.regionserver.abort</b> is set to true. 580 */ 581 @Test 582 public void testAbortTrueOnError() throws IOException { 583 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 584 ReplicationSource rs = new ReplicationSource(); 585 RegionServerServices rss = 586 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 587 try { 588 rs.startup(); 589 assertTrue(rs.isSourceActive()); 590 Waiter.waitFor(conf, 1000, () -> rss.isAborted()); 591 assertTrue(rss.isAborted()); 592 Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive()); 593 assertFalse(rs.isSourceActive()); 594 } finally { 595 rs.terminate("Done"); 596 rss.stop("Done"); 597 } 598 } 599 600 /* 601 * Test age of oldest wal metric. 602 */ 603 @Test 604 public void testAgeOfOldestWal() throws Exception { 605 try { 606 ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); 607 EnvironmentEdgeManager.injectEdge(manualEdge); 608 609 String peerId = "1"; 610 MetricsSource metrics = new MetricsSource(peerId); 611 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 612 conf.setInt("replication.source.maxretriesmultiplier", 1); 613 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 614 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 615 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 616 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 617 Mockito.when(peerConfig.getReplicationEndpointImpl()) 618 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 619 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 620 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 621 Mockito.when(manager.getGlobalMetrics()) 622 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 623 RegionServerServices rss = 624 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 625 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), peerId); 626 ReplicationSource source = new ReplicationSource(); 627 source.init(conf, null, manager, null, mockPeer, rss, 628 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 629 metrics); 630 631 final Path log1 = new Path(logDir, "log-walgroup-a.8"); 632 manualEdge.setValue(10); 633 // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. 634 source.enqueueLog(log1); 635 MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(peerId); 636 assertEquals(2, metricsSource1.getOldestWalAge()); 637 638 final Path log2 = new Path(logDir, "log-walgroup-b.4"); 639 // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 640 source.enqueueLog(log2); 641 assertEquals(6, metricsSource1.getOldestWalAge()); 642 // Clear all metrics. 643 metrics.clear(); 644 } finally { 645 EnvironmentEdgeManager.reset(); 646 } 647 } 648 649 private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { 650 MetricsReplicationSourceFactoryImpl factory = 651 (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory 652 .getInstance(MetricsReplicationSourceFactory.class); 653 return factory.getSource(sourceId); 654 } 655}