001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.doNothing; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.OptionalLong; 033import java.util.UUID; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import java.util.concurrent.atomic.AtomicLong; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellBuilderFactory; 043import org.apache.hadoop.hbase.CellBuilderType; 044import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.MiniHBaseCluster; 051import org.apache.hadoop.hbase.Server; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.Waiter; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.regionserver.HRegionServer; 057import org.apache.hadoop.hbase.regionserver.RegionServerServices; 058import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 059import org.apache.hadoop.hbase.replication.ReplicationPeer; 060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 061import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 062import org.apache.hadoop.hbase.replication.WALEntryFilter; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.testclassification.ReplicationTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 068import org.apache.hadoop.hbase.wal.WAL; 069import org.apache.hadoop.hbase.wal.WALEdit; 070import org.apache.hadoop.hbase.wal.WALFactory; 071import org.apache.hadoop.hbase.wal.WALKeyImpl; 072import org.apache.hadoop.hbase.wal.WALProvider; 073import org.junit.AfterClass; 074import org.junit.BeforeClass; 075import org.junit.ClassRule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.mockito.Mockito; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082@Category({ ReplicationTests.class, MediumTests.class }) 083public class TestReplicationSource { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestReplicationSource.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class); 090 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 091 private final static HBaseTestingUtility TEST_UTIL_PEER = new HBaseTestingUtility(); 092 private static FileSystem FS; 093 private static Path oldLogDir; 094 private static Path logDir; 095 private static Configuration conf = TEST_UTIL.getConfiguration(); 096 097 @BeforeClass 098 public static void setUpBeforeClass() throws Exception { 099 TEST_UTIL.startMiniDFSCluster(1); 100 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 101 Path rootDir = TEST_UTIL.createRootDir(); 102 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 103 if (FS.exists(oldLogDir)) { 104 FS.delete(oldLogDir, true); 105 } 106 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 107 if (FS.exists(logDir)) { 108 FS.delete(logDir, true); 109 } 110 } 111 112 @AfterClass 113 public static void tearDownAfterClass() throws Exception { 114 TEST_UTIL_PEER.shutdownMiniHBaseCluster(); 115 TEST_UTIL.shutdownMiniHBaseCluster(); 116 TEST_UTIL.shutdownMiniDFSCluster(); 117 } 118 119 /** 120 * Test the default ReplicationSource skips queuing hbase:meta WAL files. 121 */ 122 @Test 123 public void testDefaultSkipsMetaWAL() throws IOException { 124 ReplicationSource rs = new ReplicationSource(); 125 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 126 conf.setInt("replication.source.maxretriesmultiplier", 1); 127 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 128 when(mockPeer.getConfiguration()).thenReturn(conf); 129 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 130 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 131 when(peerConfig.getReplicationEndpointImpl()) 132 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 133 when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 134 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 135 when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 136 Mockito.when(manager.getGlobalMetrics()) 137 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 138 String queueId = "qid"; 139 RegionServerServices rss = 140 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 141 rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), 142 new MetricsSource(queueId)); 143 try { 144 rs.startup(); 145 assertTrue(rs.isSourceActive()); 146 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 147 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 148 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 149 rs.enqueueLog(new Path("a.1")); 150 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 151 } finally { 152 rs.terminate("Done"); 153 rss.stop("Done"); 154 } 155 } 156 157 /** 158 * Test that we filter out meta edits, etc. 159 */ 160 @Test 161 public void testWALEntryFilter() throws IOException { 162 // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource 163 // instance and init it. 164 ReplicationSource rs = new ReplicationSource(); 165 UUID uuid = UUID.randomUUID(); 166 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 167 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 168 when(mockPeer.getConfiguration()).thenReturn(conf); 169 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 170 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 171 when(peerConfig.getReplicationEndpointImpl()) 172 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 173 when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 174 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 175 when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 176 String queueId = "qid"; 177 RegionServerServices rss = 178 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 179 rs.init(conf, null, manager, null, mockPeer, rss, queueId, uuid, p -> OptionalLong.empty(), 180 new MetricsSource(queueId)); 181 try { 182 rs.startup(); 183 TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); 184 WALEntryFilter wef = rs.getWalEntryFilter(); 185 // Test non-system WAL edit. 186 WALEdit we = new WALEdit() 187 .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW) 188 .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build()); 189 WAL.Entry e = new WAL.Entry( 190 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we); 191 assertTrue(wef.filter(e) == e); 192 // Test system WAL edit. 193 e = new WAL.Entry( 194 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we); 195 assertNull(wef.filter(e)); 196 } finally { 197 rs.terminate("Done"); 198 rss.stop("Done"); 199 } 200 } 201 202 /** 203 * Sanity check that we can move logs around while we are reading from them. Should this test 204 * fail, ReplicationSource would have a hard time reading logs that are being archived. 205 */ 206 // This tests doesn't belong in here... it is not about ReplicationSource. 207 @Test 208 public void testLogMoving() throws Exception { 209 Path logPath = new Path(logDir, "log"); 210 if (!FS.exists(logDir)) { 211 FS.mkdirs(logDir); 212 } 213 if (!FS.exists(oldLogDir)) { 214 FS.mkdirs(oldLogDir); 215 } 216 WALProvider.Writer writer = 217 WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); 218 for (int i = 0; i < 3; i++) { 219 byte[] b = Bytes.toBytes(Integer.toString(i)); 220 KeyValue kv = new KeyValue(b, b, b); 221 WALEdit edit = new WALEdit(); 222 edit.add(kv); 223 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 224 writer.append(new WAL.Entry(key, edit)); 225 writer.sync(false); 226 } 227 writer.close(); 228 229 WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration()); 230 WAL.Entry entry = reader.next(); 231 assertNotNull(entry); 232 233 Path oldLogPath = new Path(oldLogDir, "log"); 234 FS.rename(logPath, oldLogPath); 235 236 entry = reader.next(); 237 assertNotNull(entry); 238 239 reader.next(); 240 entry = reader.next(); 241 242 assertNull(entry); 243 reader.close(); 244 } 245 246 /** 247 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from 248 * TestReplicationSource because doesn't need cluster. 249 */ 250 @Test 251 public void testTerminateTimeout() throws Exception { 252 ReplicationSource source = new ReplicationSource(); 253 ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint(); 254 try { 255 replicationEndpoint.start(); 256 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 257 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 258 Configuration testConf = HBaseConfiguration.create(); 259 testConf.setInt("replication.source.maxretriesmultiplier", 1); 260 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 261 when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 262 source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, 263 p -> OptionalLong.empty(), null); 264 ExecutorService executor = Executors.newSingleThreadExecutor(); 265 Future<?> future = executor.submit(() -> source.terminate("testing source termination")); 266 long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); 267 Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone); 268 } finally { 269 replicationEndpoint.stop(); 270 } 271 } 272 273 @Test 274 public void testTerminateClearsBuffer() throws Exception { 275 ReplicationSource source = new ReplicationSource(); 276 ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class); 277 MetricsReplicationGlobalSourceSource mockMetrics = 278 mock(MetricsReplicationGlobalSourceSource.class); 279 AtomicLong buffer = new AtomicLong(); 280 Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer); 281 Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics); 282 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 283 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 284 Configuration testConf = HBaseConfiguration.create(); 285 source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer", 286 null, p -> OptionalLong.empty(), mock(MetricsSource.class)); 287 ReplicationSourceWALReader reader = 288 new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); 289 ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source); 290 shipper.entryReader = reader; 291 source.workerThreads.put("testPeer", shipper); 292 WALEntryBatch batch = new WALEntryBatch(10, logDir); 293 WAL.Entry mockEntry = mock(WAL.Entry.class); 294 WALEdit mockEdit = mock(WALEdit.class); 295 WALKeyImpl mockKey = mock(WALKeyImpl.class); 296 when(mockEntry.getEdit()).thenReturn(mockEdit); 297 when(mockEdit.isEmpty()).thenReturn(false); 298 when(mockEntry.getKey()).thenReturn(mockKey); 299 when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); 300 when(mockEdit.heapSize()).thenReturn(10000L); 301 when(mockEdit.size()).thenReturn(0); 302 ArrayList<Cell> cells = new ArrayList<>(); 303 KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"), 304 Bytes.toBytes("v1")); 305 cells.add(kv); 306 when(mockEdit.getCells()).thenReturn(cells); 307 reader.addEntryToBatch(batch, mockEntry); 308 reader.entryBatchQueue.put(batch); 309 source.terminate("test"); 310 assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); 311 } 312 313 /** 314 * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192 315 */ 316 @Test 317 public void testServerShutdownRecoveredQueue() throws Exception { 318 try { 319 // Ensure single-threaded WAL 320 conf.set("hbase.wal.provider", "defaultProvider"); 321 conf.setInt("replication.sleep.before.failover", 2000); 322 // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. 323 conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); 324 MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); 325 TEST_UTIL_PEER.startMiniCluster(1); 326 327 HRegionServer serverA = cluster.getRegionServer(0); 328 final ReplicationSourceManager managerA = 329 serverA.getReplicationSourceService().getReplicationManager(); 330 HRegionServer serverB = cluster.getRegionServer(1); 331 final ReplicationSourceManager managerB = 332 serverB.getReplicationSourceService().getReplicationManager(); 333 final Admin admin = TEST_UTIL.getAdmin(); 334 335 final String peerId = "TestPeer"; 336 admin.addReplicationPeer(peerId, 337 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); 338 // Wait for replication sources to come up 339 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 340 @Override 341 public boolean evaluate() { 342 return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); 343 } 344 }); 345 // Disabling peer makes sure there is at least one log to claim when the server dies 346 // The recovered queue will also stay there until the peer is disabled even if the 347 // WALs it contains have no data. 348 admin.disableReplicationPeer(peerId); 349 350 // Stopping serverA 351 // It's queues should be claimed by the only other alive server i.e. serverB 352 cluster.stopRegionServer(serverA.getServerName()); 353 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 354 @Override 355 public boolean evaluate() throws Exception { 356 return managerB.getOldSources().size() == 1; 357 } 358 }); 359 360 final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); 361 serverC.waitForServerOnline(); 362 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 363 @Override 364 public boolean evaluate() throws Exception { 365 return serverC.getReplicationSourceService() != null; 366 } 367 }); 368 final ReplicationSourceManager managerC = 369 ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); 370 // Sanity check 371 assertEquals(0, managerC.getOldSources().size()); 372 373 // Stopping serverB 374 // Now serverC should have two recovered queues: 375 // 1. The serverB's normal queue 376 // 2. serverA's recovered queue on serverB 377 cluster.stopRegionServer(serverB.getServerName()); 378 Waiter.waitFor(conf, 20000, 379 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2); 380 admin.enableReplicationPeer(peerId); 381 Waiter.waitFor(conf, 20000, 382 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0); 383 } finally { 384 conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); 385 } 386 } 387 388 /** 389 * Regionserver implementation that adds a delay on the graceful shutdown. 390 */ 391 public static class ShutdownDelayRegionServer extends HRegionServer { 392 public ShutdownDelayRegionServer(Configuration conf) throws IOException { 393 super(conf); 394 } 395 396 @Override 397 protected void stopServiceThreads() { 398 // Add a delay before service threads are shutdown. 399 // This will keep the zookeeper connection alive for the duration of the delay. 400 LOG.info("Adding a delay to the regionserver shutdown"); 401 try { 402 Thread.sleep(2000); 403 } catch (InterruptedException ex) { 404 LOG.error("Interrupted while sleeping"); 405 } 406 super.stopServiceThreads(); 407 } 408 } 409 410 /** 411 * Deadend Endpoint. Does nothing. 412 */ 413 public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint { 414 private final UUID uuid = UUID.randomUUID(); 415 416 @Override 417 public void init(Context context) throws IOException { 418 this.ctx = context; 419 } 420 421 @Override 422 public WALEntryFilter getWALEntryfilter() { 423 return null; 424 } 425 426 @Override 427 public synchronized UUID getPeerUUID() { 428 return this.uuid; 429 } 430 431 @Override 432 protected void doStart() { 433 notifyStarted(); 434 } 435 436 @Override 437 protected void doStop() { 438 notifyStopped(); 439 } 440 441 @Override 442 public boolean canReplicateToSameCluster() { 443 return true; 444 } 445 } 446 447 /** 448 * Deadend Endpoint. Does nothing. 449 */ 450 public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { 451 452 static int count = 0; 453 454 @Override 455 public synchronized UUID getPeerUUID() { 456 if (count == 0) { 457 count++; 458 throw new RuntimeException(); 459 } else { 460 return super.getPeerUUID(); 461 } 462 } 463 464 } 465 466 /** 467 * Bad Endpoint with failing connection to peer on demand. 468 */ 469 public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { 470 static boolean failing = true; 471 472 @Override 473 public synchronized UUID getPeerUUID() { 474 return failing ? null : super.getPeerUUID(); 475 } 476 } 477 478 public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { 479 480 static int count = 0; 481 482 @Override 483 public synchronized UUID getPeerUUID() { 484 throw new RuntimeException(); 485 } 486 487 } 488 489 /** 490 * Test HBASE-20497 Moved here from TestReplicationSource because doesn't need cluster. 491 */ 492 @Test 493 public void testRecoveredReplicationSourceShipperGetPosition() throws Exception { 494 String walGroupId = "fake-wal-group-id"; 495 ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); 496 ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); 497 RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); 498 Server server = mock(Server.class); 499 when(server.getServerName()).thenReturn(serverName); 500 when(source.getServer()).thenReturn(server); 501 when(source.getServerWALsBelongTo()).thenReturn(deadServer); 502 ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); 503 when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) 504 .thenReturn(1001L); 505 when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) 506 .thenReturn(-1L); 507 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 508 conf.setInt("replication.source.maxretriesmultiplier", -1); 509 MetricsSource metricsSource = mock(MetricsSource.class); 510 doNothing().when(metricsSource).incrSizeOfLogQueue(); 511 ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source); 512 logQueue.enqueueLog(new Path("/www/html/test"), walGroupId); 513 RecoveredReplicationSourceShipper shipper = 514 new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage); 515 assertEquals(1001L, shipper.getStartPosition()); 516 } 517 518 private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, 519 String endpointName) throws IOException { 520 conf.setInt("replication.source.maxretriesmultiplier", 1); 521 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 522 when(mockPeer.getConfiguration()).thenReturn(conf); 523 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 524 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 525 FaultyReplicationEndpoint.count = 0; 526 when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); 527 when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 528 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 529 when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 530 Mockito.when(manager.getGlobalMetrics()) 531 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 532 String queueId = "qid"; 533 RegionServerServices rss = 534 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 535 rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), 536 new MetricsSource(queueId)); 537 return rss; 538 } 539 540 /** 541 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 542 * and <b>eplication.source.regionserver.abort</b> is set to false. 543 */ 544 @Test 545 public void testAbortFalseOnError() throws IOException { 546 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 547 conf.setBoolean("replication.source.regionserver.abort", false); 548 ReplicationSource rs = new ReplicationSource(); 549 RegionServerServices rss = 550 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 551 try { 552 rs.startup(); 553 assertTrue(rs.isSourceActive()); 554 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 555 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 556 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 557 rs.enqueueLog(new Path("a.1")); 558 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 559 } finally { 560 rs.terminate("Done"); 561 rss.stop("Done"); 562 } 563 } 564 565 @Test 566 public void testReplicationSourceInitializingMetric() throws IOException { 567 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 568 conf.setBoolean("replication.source.regionserver.abort", false); 569 ReplicationSource rs = new ReplicationSource(); 570 RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName()); 571 try { 572 rs.startup(); 573 assertTrue(rs.isSourceActive()); 574 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); 575 BadReplicationEndpoint.failing = false; 576 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); 577 } finally { 578 rs.terminate("Done"); 579 rss.stop("Done"); 580 } 581 } 582 583 /** 584 * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, 585 * when <b>replication.source.regionserver.abort</b> is set to false. 586 */ 587 @Test 588 public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { 589 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 590 ReplicationSource rs = new ReplicationSource(); 591 RegionServerServices rss = 592 setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName()); 593 try { 594 rs.startup(); 595 assertTrue(true); 596 } finally { 597 rs.terminate("Done"); 598 rss.stop("Done"); 599 } 600 } 601 602 /** 603 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 604 * and <b>replication.source.regionserver.abort</b> is set to true. 605 */ 606 @Test 607 public void testAbortTrueOnError() throws IOException { 608 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 609 ReplicationSource rs = new ReplicationSource(); 610 RegionServerServices rss = 611 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 612 try { 613 rs.startup(); 614 assertTrue(rs.isSourceActive()); 615 Waiter.waitFor(conf, 1000, () -> rss.isAborted()); 616 assertTrue(rss.isAborted()); 617 Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive()); 618 assertFalse(rs.isSourceActive()); 619 } finally { 620 rs.terminate("Done"); 621 rss.stop("Done"); 622 } 623 } 624 625 /* 626 * Test age of oldest wal metric. 627 */ 628 @Test 629 public void testAgeOfOldestWal() throws Exception { 630 try { 631 ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); 632 EnvironmentEdgeManager.injectEdge(manualEdge); 633 634 String id = "1"; 635 MetricsSource metrics = new MetricsSource(id); 636 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 637 conf.setInt("replication.source.maxretriesmultiplier", 1); 638 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 639 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 640 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 641 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 642 Mockito.when(peerConfig.getReplicationEndpointImpl()) 643 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 644 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 645 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 646 Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 647 Mockito.when(manager.getGlobalMetrics()) 648 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 649 RegionServerServices rss = 650 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 651 652 ReplicationSource source = new ReplicationSource(); 653 source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(), 654 metrics); 655 656 final Path log1 = new Path(logDir, "log-walgroup-a.8"); 657 manualEdge.setValue(10); 658 // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. 659 source.enqueueLog(log1); 660 MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); 661 assertEquals(2, metricsSource1.getOldestWalAge()); 662 663 final Path log2 = new Path(logDir, "log-walgroup-b.4"); 664 // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 665 source.enqueueLog(log2); 666 assertEquals(6, metricsSource1.getOldestWalAge()); 667 // Clear all metrics. 668 metrics.clear(); 669 } finally { 670 EnvironmentEdgeManager.reset(); 671 } 672 } 673 674 private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { 675 MetricsReplicationSourceFactory factory = 676 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class); 677 return factory.getSource(sourceId); 678 } 679}