001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.doNothing; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.OptionalLong; 033import java.util.UUID; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import java.util.concurrent.atomic.AtomicLong; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellBuilderFactory; 043import org.apache.hadoop.hbase.CellBuilderType; 044import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HBaseTestingUtil; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.Server; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.Waiter; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.regionserver.HRegionServer; 057import org.apache.hadoop.hbase.regionserver.RegionServerServices; 058import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 059import org.apache.hadoop.hbase.replication.ReplicationPeer; 060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 061import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 062import org.apache.hadoop.hbase.replication.WALEntryFilter; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.testclassification.ReplicationTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 068import org.apache.hadoop.hbase.wal.WAL; 069import org.apache.hadoop.hbase.wal.WALEdit; 070import org.apache.hadoop.hbase.wal.WALFactory; 071import org.apache.hadoop.hbase.wal.WALKeyImpl; 072import org.apache.hadoop.hbase.wal.WALProvider; 073import org.apache.hadoop.hbase.wal.WALStreamReader; 074import org.junit.AfterClass; 075import org.junit.BeforeClass; 076import org.junit.ClassRule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.mockito.Mockito; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083@Category({ ReplicationTests.class, MediumTests.class }) 084public class TestReplicationSource { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestReplicationSource.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class); 091 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 092 private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil(); 093 private static FileSystem FS; 094 private static Path oldLogDir; 095 private static Path logDir; 096 private static Configuration conf = TEST_UTIL.getConfiguration(); 097 098 @BeforeClass 099 public static void setUpBeforeClass() throws Exception { 100 TEST_UTIL.startMiniDFSCluster(1); 101 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 102 Path rootDir = TEST_UTIL.createRootDir(); 103 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 104 if (FS.exists(oldLogDir)) { 105 FS.delete(oldLogDir, true); 106 } 107 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 108 if (FS.exists(logDir)) { 109 FS.delete(logDir, true); 110 } 111 } 112 113 @AfterClass 114 public static void tearDownAfterClass() throws Exception { 115 TEST_UTIL_PEER.shutdownMiniHBaseCluster(); 116 TEST_UTIL.shutdownMiniHBaseCluster(); 117 TEST_UTIL.shutdownMiniDFSCluster(); 118 } 119 120 /** 121 * Test the default ReplicationSource skips queuing hbase:meta WAL files. 122 */ 123 @Test 124 public void testDefaultSkipsMetaWAL() throws IOException { 125 ReplicationSource rs = new ReplicationSource(); 126 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 127 conf.setInt("replication.source.maxretriesmultiplier", 1); 128 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 129 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 130 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 131 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 132 Mockito.when(peerConfig.getReplicationEndpointImpl()) 133 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 134 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 135 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 136 Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 137 Mockito.when(manager.getGlobalMetrics()) 138 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 139 String queueId = "qid"; 140 RegionServerServices rss = 141 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 142 rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), 143 new MetricsSource(queueId)); 144 try { 145 rs.startup(); 146 assertTrue(rs.isSourceActive()); 147 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 148 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 149 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 150 rs.enqueueLog(new Path("a.1")); 151 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 152 } finally { 153 rs.terminate("Done"); 154 rss.stop("Done"); 155 } 156 } 157 158 /** 159 * Test that we filter out meta edits, etc. 160 */ 161 @Test 162 public void testWALEntryFilter() throws IOException { 163 // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource 164 // instance and init it. 165 ReplicationSource rs = new ReplicationSource(); 166 UUID uuid = UUID.randomUUID(); 167 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 168 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 169 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 170 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 171 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 172 Mockito.when(peerConfig.getReplicationEndpointImpl()) 173 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 174 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 175 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 176 Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 177 String queueId = "qid"; 178 RegionServerServices rss = 179 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 180 rs.init(conf, null, manager, null, mockPeer, rss, queueId, uuid, p -> OptionalLong.empty(), 181 new MetricsSource(queueId)); 182 try { 183 rs.startup(); 184 TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); 185 WALEntryFilter wef = rs.getWalEntryFilter(); 186 // Test non-system WAL edit. 187 WALEdit we = new WALEdit() 188 .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW) 189 .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build()); 190 WAL.Entry e = new WAL.Entry( 191 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we); 192 assertTrue(wef.filter(e) == e); 193 // Test system WAL edit. 194 e = new WAL.Entry( 195 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we); 196 assertNull(wef.filter(e)); 197 } finally { 198 rs.terminate("Done"); 199 rss.stop("Done"); 200 } 201 } 202 203 /** 204 * Sanity check that we can move logs around while we are reading from them. Should this test 205 * fail, ReplicationSource would have a hard time reading logs that are being archived. 206 */ 207 // This tests doesn't belong in here... it is not about ReplicationSource. 208 @Test 209 public void testLogMoving() throws Exception { 210 Path logPath = new Path(logDir, "log"); 211 if (!FS.exists(logDir)) { 212 FS.mkdirs(logDir); 213 } 214 if (!FS.exists(oldLogDir)) { 215 FS.mkdirs(oldLogDir); 216 } 217 WALProvider.Writer writer = 218 WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); 219 for (int i = 0; i < 3; i++) { 220 byte[] b = Bytes.toBytes(Integer.toString(i)); 221 KeyValue kv = new KeyValue(b, b, b); 222 WALEdit edit = new WALEdit(); 223 edit.add(kv); 224 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 225 writer.append(new WAL.Entry(key, edit)); 226 writer.sync(false); 227 } 228 writer.close(); 229 230 WALStreamReader reader = 231 WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration()); 232 WAL.Entry entry = reader.next(); 233 assertNotNull(entry); 234 235 Path oldLogPath = new Path(oldLogDir, "log"); 236 FS.rename(logPath, oldLogPath); 237 238 entry = reader.next(); 239 assertNotNull(entry); 240 241 reader.next(); 242 entry = reader.next(); 243 244 assertNull(entry); 245 reader.close(); 246 } 247 248 /** 249 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from 250 * TestReplicationSource because doesn't need cluster. 251 */ 252 @Test 253 public void testTerminateTimeout() throws Exception { 254 ReplicationSource source = new ReplicationSource(); 255 ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint(); 256 try { 257 replicationEndpoint.start(); 258 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 259 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 260 Configuration testConf = HBaseConfiguration.create(); 261 testConf.setInt("replication.source.maxretriesmultiplier", 1); 262 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 263 Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 264 source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, 265 p -> OptionalLong.empty(), null); 266 ExecutorService executor = Executors.newSingleThreadExecutor(); 267 Future<?> future = executor.submit(() -> source.terminate("testing source termination")); 268 long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); 269 Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone); 270 } finally { 271 replicationEndpoint.stop(); 272 } 273 } 274 275 @Test 276 public void testTerminateClearsBuffer() throws Exception { 277 ReplicationSource source = new ReplicationSource(); 278 ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class); 279 MetricsReplicationGlobalSourceSource mockMetrics = 280 mock(MetricsReplicationGlobalSourceSource.class); 281 AtomicLong buffer = new AtomicLong(); 282 Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer); 283 Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics); 284 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 285 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 286 Configuration testConf = HBaseConfiguration.create(); 287 source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer", 288 null, p -> OptionalLong.empty(), mock(MetricsSource.class)); 289 ReplicationSourceWALReader reader = 290 new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); 291 ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source); 292 shipper.entryReader = reader; 293 source.workerThreads.put("testPeer", shipper); 294 WALEntryBatch batch = new WALEntryBatch(10, logDir); 295 WAL.Entry mockEntry = mock(WAL.Entry.class); 296 WALEdit mockEdit = mock(WALEdit.class); 297 WALKeyImpl mockKey = mock(WALKeyImpl.class); 298 when(mockEntry.getEdit()).thenReturn(mockEdit); 299 when(mockEdit.isEmpty()).thenReturn(false); 300 when(mockEntry.getKey()).thenReturn(mockKey); 301 when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); 302 when(mockEdit.heapSize()).thenReturn(10000L); 303 when(mockEdit.size()).thenReturn(0); 304 ArrayList<Cell> cells = new ArrayList<>(); 305 KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"), 306 Bytes.toBytes("v1")); 307 cells.add(kv); 308 when(mockEdit.getCells()).thenReturn(cells); 309 reader.addEntryToBatch(batch, mockEntry); 310 reader.entryBatchQueue.put(batch); 311 source.terminate("test"); 312 assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); 313 } 314 315 /** 316 * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192 317 */ 318 @Test 319 public void testServerShutdownRecoveredQueue() throws Exception { 320 try { 321 // Ensure single-threaded WAL 322 conf.set("hbase.wal.provider", "defaultProvider"); 323 conf.setInt("replication.sleep.before.failover", 2000); 324 // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. 325 conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); 326 SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); 327 TEST_UTIL_PEER.startMiniCluster(1); 328 329 HRegionServer serverA = cluster.getRegionServer(0); 330 final ReplicationSourceManager managerA = 331 serverA.getReplicationSourceService().getReplicationManager(); 332 HRegionServer serverB = cluster.getRegionServer(1); 333 final ReplicationSourceManager managerB = 334 serverB.getReplicationSourceService().getReplicationManager(); 335 final Admin admin = TEST_UTIL.getAdmin(); 336 337 final String peerId = "TestPeer"; 338 admin.addReplicationPeer(peerId, 339 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); 340 // Wait for replication sources to come up 341 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 342 @Override 343 public boolean evaluate() { 344 return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); 345 } 346 }); 347 // Disabling peer makes sure there is at least one log to claim when the server dies 348 // The recovered queue will also stay there until the peer is disabled even if the 349 // WALs it contains have no data. 350 admin.disableReplicationPeer(peerId); 351 352 // Stopping serverA 353 // It's queues should be claimed by the only other alive server i.e. serverB 354 cluster.stopRegionServer(serverA.getServerName()); 355 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 356 @Override 357 public boolean evaluate() throws Exception { 358 return managerB.getOldSources().size() == 1; 359 } 360 }); 361 362 final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); 363 serverC.waitForServerOnline(); 364 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 365 @Override 366 public boolean evaluate() throws Exception { 367 return serverC.getReplicationSourceService() != null; 368 } 369 }); 370 final ReplicationSourceManager managerC = 371 ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); 372 // Sanity check 373 assertEquals(0, managerC.getOldSources().size()); 374 375 // Stopping serverB 376 // Now serverC should have two recovered queues: 377 // 1. The serverB's normal queue 378 // 2. serverA's recovered queue on serverB 379 cluster.stopRegionServer(serverB.getServerName()); 380 Waiter.waitFor(conf, 20000, 381 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2); 382 admin.enableReplicationPeer(peerId); 383 Waiter.waitFor(conf, 20000, 384 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0); 385 } finally { 386 conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); 387 } 388 } 389 390 /** 391 * Regionserver implementation that adds a delay on the graceful shutdown. 392 */ 393 public static class ShutdownDelayRegionServer extends HRegionServer { 394 public ShutdownDelayRegionServer(Configuration conf) throws IOException { 395 super(conf); 396 } 397 398 @Override 399 protected void stopServiceThreads() { 400 // Add a delay before service threads are shutdown. 401 // This will keep the zookeeper connection alive for the duration of the delay. 402 LOG.info("Adding a delay to the regionserver shutdown"); 403 try { 404 Thread.sleep(2000); 405 } catch (InterruptedException ex) { 406 LOG.error("Interrupted while sleeping"); 407 } 408 super.stopServiceThreads(); 409 } 410 } 411 412 /** 413 * Deadend Endpoint. Does nothing. 414 */ 415 public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint { 416 private final UUID uuid = UUID.randomUUID(); 417 418 @Override 419 public void init(Context context) throws IOException { 420 this.ctx = context; 421 } 422 423 @Override 424 public WALEntryFilter getWALEntryfilter() { 425 return null; 426 } 427 428 @Override 429 public synchronized UUID getPeerUUID() { 430 return this.uuid; 431 } 432 433 @Override 434 protected void doStart() { 435 notifyStarted(); 436 } 437 438 @Override 439 protected void doStop() { 440 notifyStopped(); 441 } 442 443 @Override 444 public boolean canReplicateToSameCluster() { 445 return true; 446 } 447 } 448 449 /** 450 * Deadend Endpoint. Does nothing. 451 */ 452 public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { 453 454 static int count = 0; 455 456 @Override 457 public synchronized UUID getPeerUUID() { 458 if (count == 0) { 459 count++; 460 throw new RuntimeException(); 461 } else { 462 return super.getPeerUUID(); 463 } 464 } 465 466 } 467 468 /** 469 * Bad Endpoint with failing connection to peer on demand. 470 */ 471 public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { 472 static boolean failing = true; 473 474 @Override 475 public synchronized UUID getPeerUUID() { 476 return failing ? null : super.getPeerUUID(); 477 } 478 } 479 480 public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { 481 482 static int count = 0; 483 484 @Override 485 public synchronized UUID getPeerUUID() { 486 throw new RuntimeException(); 487 } 488 489 } 490 491 /** 492 * Test HBASE-20497 Moved here from TestReplicationSource because doesn't need cluster. 493 */ 494 @Test 495 public void testRecoveredReplicationSourceShipperGetPosition() throws Exception { 496 String walGroupId = "fake-wal-group-id"; 497 ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); 498 ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); 499 RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); 500 Server server = mock(Server.class); 501 Mockito.when(server.getServerName()).thenReturn(serverName); 502 Mockito.when(source.getServer()).thenReturn(server); 503 Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); 504 ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); 505 Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) 506 .thenReturn(1001L); 507 Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) 508 .thenReturn(-1L); 509 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 510 conf.setInt("replication.source.maxretriesmultiplier", -1); 511 MetricsSource metricsSource = mock(MetricsSource.class); 512 doNothing().when(metricsSource).incrSizeOfLogQueue(); 513 ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source); 514 logQueue.enqueueLog(new Path("/www/html/test"), walGroupId); 515 RecoveredReplicationSourceShipper shipper = 516 new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage); 517 assertEquals(1001L, shipper.getStartPosition()); 518 } 519 520 private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, 521 String endpointName) throws IOException { 522 conf.setInt("replication.source.maxretriesmultiplier", 1); 523 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 524 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 525 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 526 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 527 FaultyReplicationEndpoint.count = 0; 528 Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); 529 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 530 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 531 Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 532 Mockito.when(manager.getGlobalMetrics()) 533 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 534 String queueId = "qid"; 535 RegionServerServices rss = 536 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 537 rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), 538 new MetricsSource(queueId)); 539 return rss; 540 } 541 542 /** 543 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 544 * and <b>eplication.source.regionserver.abort</b> is set to false. 545 */ 546 @Test 547 public void testAbortFalseOnError() throws IOException { 548 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 549 conf.setBoolean("replication.source.regionserver.abort", false); 550 ReplicationSource rs = new ReplicationSource(); 551 RegionServerServices rss = 552 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 553 try { 554 rs.startup(); 555 assertTrue(rs.isSourceActive()); 556 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 557 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 558 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 559 rs.enqueueLog(new Path("a.1")); 560 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 561 } finally { 562 rs.terminate("Done"); 563 rss.stop("Done"); 564 } 565 } 566 567 @Test 568 public void testReplicationSourceInitializingMetric() throws IOException { 569 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 570 conf.setBoolean("replication.source.regionserver.abort", false); 571 ReplicationSource rs = new ReplicationSource(); 572 RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName()); 573 try { 574 rs.startup(); 575 assertTrue(rs.isSourceActive()); 576 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); 577 BadReplicationEndpoint.failing = false; 578 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); 579 } finally { 580 rs.terminate("Done"); 581 rss.stop("Done"); 582 } 583 } 584 585 /** 586 * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, 587 * when <b>replication.source.regionserver.abort</b> is set to false. 588 */ 589 @Test 590 public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { 591 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 592 ReplicationSource rs = new ReplicationSource(); 593 RegionServerServices rss = 594 setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName()); 595 try { 596 rs.startup(); 597 assertTrue(true); 598 } finally { 599 rs.terminate("Done"); 600 rss.stop("Done"); 601 } 602 } 603 604 /** 605 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 606 * and <b>replication.source.regionserver.abort</b> is set to true. 607 */ 608 @Test 609 public void testAbortTrueOnError() throws IOException { 610 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 611 ReplicationSource rs = new ReplicationSource(); 612 RegionServerServices rss = 613 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 614 try { 615 rs.startup(); 616 assertTrue(rs.isSourceActive()); 617 Waiter.waitFor(conf, 1000, () -> rss.isAborted()); 618 assertTrue(rss.isAborted()); 619 Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive()); 620 assertFalse(rs.isSourceActive()); 621 } finally { 622 rs.terminate("Done"); 623 rss.stop("Done"); 624 } 625 } 626 627 /* 628 * Test age of oldest wal metric. 629 */ 630 @Test 631 public void testAgeOfOldestWal() throws Exception { 632 try { 633 ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); 634 EnvironmentEdgeManager.injectEdge(manualEdge); 635 636 String id = "1"; 637 MetricsSource metrics = new MetricsSource(id); 638 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 639 conf.setInt("replication.source.maxretriesmultiplier", 1); 640 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 641 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 642 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 643 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 644 Mockito.when(peerConfig.getReplicationEndpointImpl()) 645 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 646 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 647 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 648 Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); 649 Mockito.when(manager.getGlobalMetrics()) 650 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 651 RegionServerServices rss = 652 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 653 654 ReplicationSource source = new ReplicationSource(); 655 source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(), 656 metrics); 657 658 final Path log1 = new Path(logDir, "log-walgroup-a.8"); 659 manualEdge.setValue(10); 660 // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. 661 source.enqueueLog(log1); 662 MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); 663 assertEquals(2, metricsSource1.getOldestWalAge()); 664 665 final Path log2 = new Path(logDir, "log-walgroup-b.4"); 666 // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 667 source.enqueueLog(log2); 668 assertEquals(6, metricsSource1.getOldestWalAge()); 669 // Clear all metrics. 670 metrics.clear(); 671 } finally { 672 EnvironmentEdgeManager.reset(); 673 } 674 } 675 676 private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { 677 MetricsReplicationSourceFactoryImpl factory = 678 (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory 679 .getInstance(MetricsReplicationSourceFactory.class); 680 return factory.getSource(sourceId); 681 } 682}