001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertNull; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.OptionalLong; 032import java.util.UUID; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellBuilderType; 041import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.Server; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.Waiter; 052import org.apache.hadoop.hbase.client.Admin; 053import org.apache.hadoop.hbase.regionserver.HRegionServer; 054import org.apache.hadoop.hbase.regionserver.RegionServerServices; 055import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 056import org.apache.hadoop.hbase.replication.ReplicationPeer; 057import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 058import org.apache.hadoop.hbase.replication.ReplicationQueueData; 059import org.apache.hadoop.hbase.replication.ReplicationQueueId; 060import org.apache.hadoop.hbase.replication.WALEntryFilter; 061import org.apache.hadoop.hbase.testclassification.MediumTests; 062import org.apache.hadoop.hbase.testclassification.ReplicationTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 065import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 066import org.apache.hadoop.hbase.wal.WAL; 067import org.apache.hadoop.hbase.wal.WALEdit; 068import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 069import org.apache.hadoop.hbase.wal.WALFactory; 070import org.apache.hadoop.hbase.wal.WALKeyImpl; 071import org.apache.hadoop.hbase.wal.WALProvider; 072import org.apache.hadoop.hbase.wal.WALStreamReader; 073import org.junit.jupiter.api.AfterAll; 074import org.junit.jupiter.api.BeforeAll; 075import org.junit.jupiter.api.Tag; 076import org.junit.jupiter.api.Test; 077import org.mockito.Mockito; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 082 083@Tag(ReplicationTests.TAG) 084@Tag(MediumTests.TAG) 085public class TestReplicationSource { 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class); 088 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 089 private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil(); 090 private static FileSystem FS; 091 private static Path oldLogDir; 092 private static Path logDir; 093 private static Configuration conf = TEST_UTIL.getConfiguration(); 094 095 @BeforeAll 096 public static void setUpBeforeClass() throws Exception { 097 TEST_UTIL.startMiniDFSCluster(1); 098 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 099 Path rootDir = TEST_UTIL.createRootDir(); 100 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 101 if (FS.exists(oldLogDir)) { 102 FS.delete(oldLogDir, true); 103 } 104 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 105 if (FS.exists(logDir)) { 106 FS.delete(logDir, true); 107 } 108 } 109 110 @AfterAll 111 public static void tearDownAfterClass() throws Exception { 112 TEST_UTIL_PEER.shutdownMiniHBaseCluster(); 113 TEST_UTIL.shutdownMiniHBaseCluster(); 114 TEST_UTIL.shutdownMiniDFSCluster(); 115 } 116 117 /** 118 * Test the default ReplicationSource skips queuing hbase:meta WAL files. 119 */ 120 @Test 121 public void testDefaultSkipsMetaWAL() throws IOException { 122 ReplicationSource rs = new ReplicationSource(); 123 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 124 conf.setInt("replication.source.maxretriesmultiplier", 1); 125 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 126 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 127 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 128 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 129 Mockito.when(peerConfig.getReplicationEndpointImpl()) 130 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 131 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 132 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 133 Mockito.when(manager.getGlobalMetrics()) 134 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 135 136 RegionServerServices rss = 137 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 138 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 139 rs.init(conf, null, manager, null, mockPeer, rss, 140 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 141 new MetricsSource(queueId.toString())); 142 try { 143 rs.startup(); 144 assertTrue(rs.isSourceActive()); 145 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 146 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 147 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 148 rs.enqueueLog(new Path("a.1")); 149 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 150 } finally { 151 rs.terminate("Done"); 152 rss.stop("Done"); 153 } 154 } 155 156 /** 157 * Test that we filter out meta edits, etc. 158 */ 159 @Test 160 public void testWALEntryFilter() throws IOException { 161 // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource 162 // instance and init it. 163 ReplicationSource rs = new ReplicationSource(); 164 UUID uuid = UUID.randomUUID(); 165 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 166 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 167 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 168 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 169 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 170 Mockito.when(peerConfig.getReplicationEndpointImpl()) 171 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 172 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 173 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 174 RegionServerServices rss = 175 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 176 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 177 rs.init(conf, null, manager, null, mockPeer, rss, 178 new ReplicationQueueData(queueId, ImmutableMap.of()), uuid, p -> OptionalLong.empty(), 179 new MetricsSource(queueId.toString())); 180 try { 181 rs.startup(); 182 TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); 183 WALEntryFilter wef = rs.getWalEntryFilter(); 184 // Test non-system WAL edit. 185 WALEdit we = WALEditInternalHelper.addExtendedCell(new WALEdit(), 186 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) 187 .setRow(HConstants.EMPTY_START_ROW).setFamily(HConstants.CATALOG_FAMILY) 188 .setType(Cell.Type.Put).build()); 189 WAL.Entry e = new WAL.Entry( 190 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we); 191 assertTrue(wef.filter(e) == e); 192 // Test system WAL edit. 193 e = new WAL.Entry( 194 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we); 195 assertNull(wef.filter(e)); 196 } finally { 197 rs.terminate("Done"); 198 rss.stop("Done"); 199 } 200 } 201 202 /** 203 * Sanity check that we can move logs around while we are reading from them. Should this test 204 * fail, ReplicationSource would have a hard time reading logs that are being archived. 205 */ 206 // This tests doesn't belong in here... it is not about ReplicationSource. 207 @Test 208 public void testLogMoving() throws Exception { 209 Path logPath = new Path(logDir, "log"); 210 if (!FS.exists(logDir)) { 211 FS.mkdirs(logDir); 212 } 213 if (!FS.exists(oldLogDir)) { 214 FS.mkdirs(oldLogDir); 215 } 216 WALProvider.Writer writer = 217 WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); 218 for (int i = 0; i < 3; i++) { 219 byte[] b = Bytes.toBytes(Integer.toString(i)); 220 KeyValue kv = new KeyValue(b, b, b); 221 WALEdit edit = new WALEdit(); 222 WALEditInternalHelper.addExtendedCell(edit, kv); 223 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 224 writer.append(new WAL.Entry(key, edit)); 225 writer.sync(false); 226 } 227 writer.close(); 228 229 WALStreamReader reader = 230 WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration()); 231 WAL.Entry entry = reader.next(); 232 assertNotNull(entry); 233 234 Path oldLogPath = new Path(oldLogDir, "log"); 235 FS.rename(logPath, oldLogPath); 236 237 entry = reader.next(); 238 assertNotNull(entry); 239 240 reader.next(); 241 entry = reader.next(); 242 243 assertNull(entry); 244 reader.close(); 245 } 246 247 /** 248 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from 249 * TestReplicationSource because doesn't need cluster. 250 */ 251 @Test 252 public void testTerminateTimeout() throws Exception { 253 ReplicationSource source = new ReplicationSource(); 254 ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint(); 255 try { 256 replicationEndpoint.start(); 257 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 258 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 259 Configuration testConf = HBaseConfiguration.create(); 260 testConf.setInt("replication.source.maxretriesmultiplier", 1); 261 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 262 ReplicationQueueId queueId = 263 new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); 264 source.init(testConf, null, manager, null, mockPeer, null, 265 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 266 null); 267 ExecutorService executor = Executors.newSingleThreadExecutor(); 268 Future<?> future = executor.submit(() -> source.terminate("testing source termination")); 269 long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); 270 Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone); 271 } finally { 272 replicationEndpoint.stop(); 273 } 274 } 275 276 @Test 277 public void testTerminateClearsBuffer() throws Exception { 278 ReplicationSource source = new ReplicationSource(); 279 ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null, 280 null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); 281 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 282 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 283 Configuration testConf = HBaseConfiguration.create(); 284 ReplicationQueueId queueId = 285 new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); 286 source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), 287 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 288 mock(MetricsSource.class)); 289 ReplicationSourceWALReader reader = 290 new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); 291 ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader); 292 source.workerThreads.put("testPeer", shipper); 293 WALEntryBatch batch = new WALEntryBatch(10, logDir); 294 WAL.Entry mockEntry = mock(WAL.Entry.class); 295 WALEdit mockEdit = mock(WALEdit.class); 296 WALKeyImpl mockKey = mock(WALKeyImpl.class); 297 when(mockEntry.getEdit()).thenReturn(mockEdit); 298 when(mockEdit.isEmpty()).thenReturn(false); 299 when(mockEntry.getKey()).thenReturn(mockKey); 300 when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); 301 when(mockEdit.heapSize()).thenReturn(10000L); 302 when(mockEdit.size()).thenReturn(0); 303 ArrayList<Cell> cells = new ArrayList<>(); 304 KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"), 305 Bytes.toBytes("v1")); 306 cells.add(kv); 307 when(mockEdit.getCells()).thenReturn(cells); 308 reader.addEntryToBatch(batch, mockEntry); 309 reader.entryBatchQueue.put(batch); 310 source.terminate("test"); 311 assertEquals(0, source.getSourceManager().getTotalBufferUsed()); 312 } 313 314 /** 315 * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192 316 */ 317 @Test 318 public void testServerShutdownRecoveredQueue() throws Exception { 319 try { 320 // Ensure single-threaded WAL 321 conf.set("hbase.wal.provider", "defaultProvider"); 322 conf.setInt("replication.sleep.before.failover", 2000); 323 // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. 324 conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); 325 SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); 326 TEST_UTIL_PEER.startMiniCluster(1); 327 328 HRegionServer serverA = cluster.getRegionServer(0); 329 final ReplicationSourceManager managerA = 330 serverA.getReplicationSourceService().getReplicationManager(); 331 HRegionServer serverB = cluster.getRegionServer(1); 332 final ReplicationSourceManager managerB = 333 serverB.getReplicationSourceService().getReplicationManager(); 334 final Admin admin = TEST_UTIL.getAdmin(); 335 336 final String peerId = "TestPeer"; 337 admin.addReplicationPeer(peerId, ReplicationPeerConfig.newBuilder() 338 .setClusterKey(TEST_UTIL_PEER.getRpcConnnectionURI()).build()); 339 // Wait for replication sources to come up 340 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 341 @Override 342 public boolean evaluate() { 343 return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); 344 } 345 }); 346 // Disabling peer makes sure there is at least one log to claim when the server dies 347 // The recovered queue will also stay there until the peer is disabled even if the 348 // WALs it contains have no data. 349 admin.disableReplicationPeer(peerId); 350 351 // Stopping serverA 352 // It's queues should be claimed by the only other alive server i.e. serverB 353 cluster.stopRegionServer(serverA.getServerName()); 354 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 355 @Override 356 public boolean evaluate() throws Exception { 357 return managerB.getOldSources().size() == 1; 358 } 359 }); 360 361 final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); 362 serverC.waitForServerOnline(); 363 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 364 @Override 365 public boolean evaluate() throws Exception { 366 return serverC.getReplicationSourceService() != null; 367 } 368 }); 369 final ReplicationSourceManager managerC = 370 ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); 371 // Sanity check 372 assertEquals(0, managerC.getOldSources().size()); 373 374 // Stopping serverB 375 // Now serverC should have two recovered queues: 376 // 1. The serverB's normal queue 377 // 2. serverA's recovered queue on serverB 378 cluster.stopRegionServer(serverB.getServerName()); 379 Waiter.waitFor(conf, 20000, 380 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2); 381 admin.enableReplicationPeer(peerId); 382 Waiter.waitFor(conf, 20000, 383 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0); 384 } finally { 385 conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); 386 } 387 } 388 389 /** 390 * Regionserver implementation that adds a delay on the graceful shutdown. 391 */ 392 public static class ShutdownDelayRegionServer extends HRegionServer { 393 public ShutdownDelayRegionServer(Configuration conf) throws IOException { 394 super(conf); 395 } 396 397 @Override 398 protected void stopServiceThreads() { 399 // Add a delay before service threads are shutdown. 400 // This will keep the zookeeper connection alive for the duration of the delay. 401 LOG.info("Adding a delay to the regionserver shutdown"); 402 try { 403 Thread.sleep(2000); 404 } catch (InterruptedException ex) { 405 LOG.error("Interrupted while sleeping"); 406 } 407 super.stopServiceThreads(); 408 } 409 } 410 411 /** 412 * Deadend Endpoint. Does nothing. 413 */ 414 public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint { 415 private final UUID uuid = UUID.randomUUID(); 416 417 @Override 418 public void init(Context context) throws IOException { 419 this.ctx = context; 420 } 421 422 @Override 423 public WALEntryFilter getWALEntryfilter() { 424 return null; 425 } 426 427 @Override 428 public synchronized UUID getPeerUUID() { 429 return this.uuid; 430 } 431 432 @Override 433 protected void doStart() { 434 notifyStarted(); 435 } 436 437 @Override 438 protected void doStop() { 439 notifyStopped(); 440 } 441 442 @Override 443 public boolean canReplicateToSameCluster() { 444 return true; 445 } 446 } 447 448 /** 449 * Deadend Endpoint. Does nothing. 450 */ 451 public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { 452 453 static int count = 0; 454 455 @Override 456 public synchronized UUID getPeerUUID() { 457 if (count == 0) { 458 count++; 459 throw new RuntimeException(); 460 } else { 461 return super.getPeerUUID(); 462 } 463 } 464 465 } 466 467 /** 468 * Bad Endpoint with failing connection to peer on demand. 469 */ 470 public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { 471 static boolean failing = true; 472 473 @Override 474 public synchronized UUID getPeerUUID() { 475 return failing ? null : super.getPeerUUID(); 476 } 477 } 478 479 public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { 480 481 static int count = 0; 482 483 @Override 484 public synchronized UUID getPeerUUID() { 485 throw new RuntimeException(); 486 } 487 488 } 489 490 private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, 491 String endpointName) throws IOException { 492 conf.setInt("replication.source.maxretriesmultiplier", 1); 493 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 494 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 495 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 496 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 497 FaultyReplicationEndpoint.count = 0; 498 Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); 499 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 500 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 501 Mockito.when(manager.getGlobalMetrics()) 502 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 503 RegionServerServices rss = 504 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 505 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 506 rs.init(conf, null, manager, null, mockPeer, rss, 507 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 508 new MetricsSource(queueId.toString())); 509 return rss; 510 } 511 512 /** 513 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 514 * and <b>eplication.source.regionserver.abort</b> is set to false. 515 */ 516 @Test 517 public void testAbortFalseOnError() throws IOException { 518 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 519 conf.setBoolean("replication.source.regionserver.abort", false); 520 ReplicationSource rs = new ReplicationSource(); 521 RegionServerServices rss = 522 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 523 try { 524 rs.startup(); 525 assertTrue(rs.isSourceActive()); 526 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 527 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 528 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 529 rs.enqueueLog(new Path("a.1")); 530 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 531 } finally { 532 rs.terminate("Done"); 533 rss.stop("Done"); 534 } 535 } 536 537 @Test 538 public void testReplicationSourceInitializingMetric() throws IOException { 539 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 540 conf.setBoolean("replication.source.regionserver.abort", false); 541 ReplicationSource rs = new ReplicationSource(); 542 RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName()); 543 try { 544 rs.startup(); 545 assertTrue(rs.isSourceActive()); 546 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); 547 BadReplicationEndpoint.failing = false; 548 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); 549 } finally { 550 rs.terminate("Done"); 551 rss.stop("Done"); 552 } 553 } 554 555 /** 556 * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, 557 * when <b>replication.source.regionserver.abort</b> is set to false. 558 */ 559 @Test 560 public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { 561 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 562 ReplicationSource rs = new ReplicationSource(); 563 RegionServerServices rss = 564 setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName()); 565 try { 566 rs.startup(); 567 assertTrue(true); 568 } finally { 569 rs.terminate("Done"); 570 rss.stop("Done"); 571 } 572 } 573 574 /** 575 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 576 * and <b>replication.source.regionserver.abort</b> is set to true. 577 */ 578 @Test 579 public void testAbortTrueOnError() throws IOException { 580 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 581 ReplicationSource rs = new ReplicationSource(); 582 RegionServerServices rss = 583 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 584 try { 585 rs.startup(); 586 assertTrue(rs.isSourceActive()); 587 Waiter.waitFor(conf, 1000, () -> rss.isAborted()); 588 assertTrue(rss.isAborted()); 589 Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive()); 590 assertFalse(rs.isSourceActive()); 591 } finally { 592 rs.terminate("Done"); 593 rss.stop("Done"); 594 } 595 } 596 597 /* 598 * Test age of oldest wal metric. 599 */ 600 @Test 601 public void testAgeOfOldestWal() throws Exception { 602 try { 603 ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); 604 EnvironmentEdgeManager.injectEdge(manualEdge); 605 606 String peerId = "1"; 607 MetricsSource metrics = new MetricsSource(peerId); 608 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 609 conf.setInt("replication.source.maxretriesmultiplier", 1); 610 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 611 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 612 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 613 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 614 Mockito.when(peerConfig.getReplicationEndpointImpl()) 615 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 616 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 617 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 618 Mockito.when(manager.getGlobalMetrics()) 619 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 620 RegionServerServices rss = 621 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 622 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), peerId); 623 ReplicationSource source = new ReplicationSource(); 624 source.init(conf, null, manager, null, mockPeer, rss, 625 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 626 metrics); 627 628 final Path log1 = new Path(logDir, "log-walgroup-a.8"); 629 manualEdge.setValue(10); 630 // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. 631 source.enqueueLog(log1); 632 MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(peerId); 633 assertEquals(2, metricsSource1.getOldestWalAge()); 634 635 final Path log2 = new Path(logDir, "log-walgroup-b.4"); 636 // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 637 source.enqueueLog(log2); 638 assertEquals(6, metricsSource1.getOldestWalAge()); 639 // Clear all metrics. 640 metrics.clear(); 641 } finally { 642 EnvironmentEdgeManager.reset(); 643 } 644 } 645 646 private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { 647 MetricsReplicationSourceFactoryImpl factory = 648 (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory 649 .getInstance(MetricsReplicationSourceFactory.class); 650 return factory.getSource(sourceId); 651 } 652}