001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.lang.reflect.Field; 030import java.net.URLEncoder; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.NavigableMap; 037import java.util.NavigableSet; 038import java.util.Set; 039import java.util.SortedSet; 040import java.util.TreeMap; 041import java.util.TreeSet; 042import java.util.UUID; 043import java.util.concurrent.CountDownLatch; 044import java.util.stream.Collectors; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.ChoreService; 049import org.apache.hadoop.hbase.ClusterId; 050import org.apache.hadoop.hbase.HBaseClassTestRule; 051import org.apache.hadoop.hbase.HBaseConfiguration; 052import org.apache.hadoop.hbase.HBaseTestingUtil; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.KeyValue; 055import org.apache.hadoop.hbase.Server; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.Waiter; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.RegionInfo; 062import org.apache.hadoop.hbase.client.RegionInfoBuilder; 063import org.apache.hadoop.hbase.client.TableDescriptor; 064import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 065import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 066import org.apache.hadoop.hbase.regionserver.RegionServerServices; 067import org.apache.hadoop.hbase.replication.ReplicationFactory; 068import org.apache.hadoop.hbase.replication.ReplicationPeer; 069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 070import org.apache.hadoop.hbase.replication.ReplicationPeers; 071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; 073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 074import org.apache.hadoop.hbase.replication.ReplicationUtils; 075import org.apache.hadoop.hbase.replication.SyncReplicationState; 076import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; 077import org.apache.hadoop.hbase.testclassification.MediumTests; 078import org.apache.hadoop.hbase.testclassification.ReplicationTests; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.JVMClusterUtil; 083import org.apache.hadoop.hbase.util.MockServer; 084import org.apache.hadoop.hbase.util.Pair; 085import org.apache.hadoop.hbase.wal.WAL; 086import org.apache.hadoop.hbase.wal.WALEdit; 087import org.apache.hadoop.hbase.wal.WALFactory; 088import org.apache.hadoop.hbase.wal.WALKeyImpl; 089import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 090import org.apache.hadoop.hbase.zookeeper.ZKUtil; 091import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 092import org.junit.After; 093import org.junit.AfterClass; 094import org.junit.Before; 095import org.junit.ClassRule; 096import org.junit.Rule; 097import org.junit.Test; 098import org.junit.experimental.categories.Category; 099import org.junit.rules.TestName; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 104import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 105 106import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 109 110/** 111 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should set 112 * up the proper config for this class and initialize the proper cluster using HBaseTestingUtility. 113 */ 114@Category({ ReplicationTests.class, MediumTests.class }) 115public abstract class TestReplicationSourceManager { 116 117 @ClassRule 118 public static final HBaseClassTestRule CLASS_RULE = 119 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 120 121 protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationSourceManager.class); 122 123 protected static Configuration conf; 124 125 protected static HBaseTestingUtil utility; 126 127 protected static Replication replication; 128 129 protected static ReplicationSourceManager manager; 130 131 protected static ReplicationSourceManager managerOfCluster; 132 133 protected static ZKWatcher zkw; 134 135 protected static TableDescriptor htd; 136 137 protected static RegionInfo hri; 138 139 protected static final byte[] r1 = Bytes.toBytes("r1"); 140 141 protected static final byte[] r2 = Bytes.toBytes("r2"); 142 143 protected static final byte[] f1 = Bytes.toBytes("f1"); 144 145 protected static final byte[] f2 = Bytes.toBytes("f2"); 146 147 protected static final TableName test = TableName.valueOf("test"); 148 149 protected static final String slaveId = "1"; 150 151 protected static FileSystem fs; 152 153 protected static Path oldLogDir; 154 155 protected static Path logDir; 156 157 protected static Path remoteLogDir; 158 159 protected static CountDownLatch latch; 160 161 protected static List<String> files = new ArrayList<>(); 162 protected static NavigableMap<byte[], Integer> scopes; 163 164 protected static void setupZkAndReplication() throws Exception { 165 // The implementing class should set up the conf 166 assertNotNull(conf); 167 zkw = new ZKWatcher(conf, "test", null); 168 ZKUtil.createWithParents(zkw, "/hbase/replication"); 169 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); 170 ZKUtil.setData(zkw, "/hbase/replication/peers/1", 171 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 172 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); 173 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); 174 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", 175 ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 176 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state"); 177 ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state", 178 ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES); 179 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state"); 180 ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state", 181 ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES); 182 ZKUtil.createWithParents(zkw, "/hbase/replication/state"); 183 ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 184 185 ZKClusterId.setClusterId(zkw, new ClusterId()); 186 CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); 187 fs = FileSystem.get(conf); 188 oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME); 189 logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME); 190 remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME); 191 replication = new Replication(); 192 replication.initialize(new DummyServer(), fs, logDir, oldLogDir, 193 new WALFactory(conf, "test", null, false)); 194 managerOfCluster = getManagerFromCluster(); 195 if (managerOfCluster != null) { 196 // After replication procedure, we need to add peer by hand (other than by receiving 197 // notification from zk) 198 managerOfCluster.addPeer(slaveId); 199 } 200 201 manager = replication.getReplicationManager(); 202 manager.addSource(slaveId); 203 if (managerOfCluster != null) { 204 waitPeer(slaveId, managerOfCluster, true); 205 } 206 waitPeer(slaveId, manager, true); 207 208 htd = TableDescriptorBuilder.newBuilder(test) 209 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1) 210 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 211 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build(); 212 213 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 214 for (byte[] fam : htd.getColumnFamilyNames()) { 215 scopes.put(fam, 0); 216 } 217 hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build(); 218 } 219 220 private static ReplicationSourceManager getManagerFromCluster() { 221 // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. 222 if (utility.getMiniHBaseCluster() == null) { 223 return null; 224 } 225 return utility.getMiniHBaseCluster().getRegionServerThreads().stream() 226 .map(JVMClusterUtil.RegionServerThread::getRegionServer).findAny() 227 .map(RegionServerServices::getReplicationSourceService).map(r -> (Replication) r) 228 .map(Replication::getReplicationManager).get(); 229 } 230 231 @AfterClass 232 public static void tearDownAfterClass() throws Exception { 233 if (manager != null) { 234 manager.join(); 235 } 236 utility.shutdownMiniCluster(); 237 } 238 239 @Rule 240 public TestName testName = new TestName(); 241 242 private void cleanLogDir() throws IOException { 243 fs.delete(logDir, true); 244 fs.delete(oldLogDir, true); 245 fs.delete(remoteLogDir, true); 246 } 247 248 @Before 249 public void setUp() throws Exception { 250 LOG.info("Start " + testName.getMethodName()); 251 cleanLogDir(); 252 } 253 254 @After 255 public void tearDown() throws Exception { 256 LOG.info("End " + testName.getMethodName()); 257 cleanLogDir(); 258 List<String> ids = manager.getSources().stream().map(ReplicationSourceInterface::getPeerId) 259 .collect(Collectors.toList()); 260 for (String id : ids) { 261 if (slaveId.equals(id)) { 262 continue; 263 } 264 removePeerAndWait(id); 265 } 266 } 267 268 @Test 269 public void testLogRoll() throws Exception { 270 long baseline = 1000; 271 long time = baseline; 272 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 273 KeyValue kv = new KeyValue(r1, f1, r1); 274 WALEdit edit = new WALEdit(); 275 edit.add(kv); 276 277 WALFactory wals = 278 new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); 279 ReplicationSourceManager replicationManager = replication.getReplicationManager(); 280 wals.getWALProvider() 281 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 282 final WAL wal = wals.getWAL(hri); 283 manager.init(); 284 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame")) 285 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build(); 286 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 287 for (byte[] fam : htd.getColumnFamilyNames()) { 288 scopes.put(fam, 0); 289 } 290 // Testing normal log rolling every 20 291 for (long i = 1; i < 101; i++) { 292 if (i > 1 && i % 20 == 0) { 293 wal.rollWriter(); 294 } 295 LOG.info(Long.toString(i)); 296 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 297 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 298 wal.sync(txid); 299 } 300 301 // Simulate a rapid insert that's followed 302 // by a report that's still not totally complete (missing last one) 303 LOG.info(baseline + " and " + time); 304 baseline += 101; 305 time = baseline; 306 LOG.info(baseline + " and " + time); 307 308 for (int i = 0; i < 3; i++) { 309 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 310 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 311 } 312 wal.sync(); 313 314 int logNumber = 0; 315 for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId) 316 .entrySet()) { 317 logNumber += entry.getValue().size(); 318 } 319 assertEquals(6, logNumber); 320 321 wal.rollWriter(); 322 323 ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); 324 when(source.getQueueId()).thenReturn("1"); 325 when(source.isRecovered()).thenReturn(false); 326 when(source.isSyncReplication()).thenReturn(false); 327 manager.logPositionAndCleanOldLogs(source, 328 new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); 329 330 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 331 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 332 wal.sync(); 333 334 assertEquals(1, manager.getWALs().size()); 335 336 // TODO Need a case with only 2 WALs and we only want to delete the first one 337 } 338 339 @Test 340 public void testClaimQueues() throws Exception { 341 Server server = new DummyServer("hostname0.example.org"); 342 ReplicationQueueStorage rq = ReplicationStorageFactory 343 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 344 // populate some znodes in the peer znode 345 files.add("log1"); 346 files.add("log2"); 347 for (String file : files) { 348 rq.addWAL(server.getServerName(), "1", file); 349 } 350 // create 3 DummyServers 351 Server s1 = new DummyServer("dummyserver1.example.org"); 352 Server s2 = new DummyServer("dummyserver2.example.org"); 353 Server s3 = new DummyServer("dummyserver3.example.org"); 354 355 // create 3 DummyNodeFailoverWorkers 356 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); 357 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); 358 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); 359 360 latch = new CountDownLatch(3); 361 // start the threads 362 w1.start(); 363 w2.start(); 364 w3.start(); 365 // make sure only one is successful 366 int populatedMap = 0; 367 // wait for result now... till all the workers are done. 368 latch.await(); 369 populatedMap += 370 w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated(); 371 assertEquals(1, populatedMap); 372 server.abort("", null); 373 } 374 375 @Test 376 public void testCleanupFailoverQueues() throws Exception { 377 Server server = new DummyServer("hostname1.example.org"); 378 ReplicationQueueStorage rq = ReplicationStorageFactory 379 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 380 // populate some znodes in the peer znode 381 SortedSet<String> files = new TreeSet<>(); 382 String group = "testgroup"; 383 String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1"; 384 String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2"; 385 files.add(file1); 386 files.add(file2); 387 for (String file : files) { 388 rq.addWAL(server.getServerName(), "1", file); 389 } 390 Server s1 = new DummyServer("dummyserver1.example.org"); 391 ReplicationPeers rp1 = 392 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); 393 rp1.init(); 394 manager.claimQueue(server.getServerName(), "1"); 395 assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); 396 String id = "1-" + server.getServerName().getServerName(); 397 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); 398 ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); 399 when(source.getQueueId()).thenReturn(id); 400 when(source.isRecovered()).thenReturn(true); 401 when(source.isSyncReplication()).thenReturn(false); 402 manager.cleanOldLogs(file2, false, source); 403 // log1 should be deleted 404 assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); 405 } 406 407 @Test 408 public void testCleanupUnknownPeerZNode() throws Exception { 409 Server server = new DummyServer("hostname2.example.org"); 410 ReplicationQueueStorage rq = ReplicationStorageFactory 411 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 412 // populate some znodes in the peer znode 413 // add log to an unknown peer 414 String group = "testgroup"; 415 rq.addWAL(server.getServerName(), "2", group + ".log1"); 416 rq.addWAL(server.getServerName(), "2", group + ".log2"); 417 418 manager.claimQueue(server.getServerName(), "2"); 419 420 // The log of the unknown peer should be removed from zk 421 for (String peer : manager.getAllQueues()) { 422 assertTrue(peer.startsWith("1")); 423 } 424 } 425 426 /** 427 * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the 428 * compaction WALEdit. 429 */ 430 @Test 431 public void testCompactionWALEdits() throws Exception { 432 TableName tableName = TableName.valueOf("testCompactionWALEdits"); 433 WALProtos.CompactionDescriptor compactionDescriptor = 434 WALProtos.CompactionDescriptor.getDefaultInstance(); 435 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) 436 .setEndKey(HConstants.EMPTY_END_ROW).build(); 437 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); 438 ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); 439 } 440 441 @Test 442 public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { 443 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 444 // 1. Get the bulk load wal edit event 445 WALEdit logEdit = getBulkLoadWALEdit(scope); 446 // 2. Create wal key 447 WALKeyImpl logKey = new WALKeyImpl(scope); 448 449 // 3. Get the scopes for the key 450 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); 451 452 // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled 453 assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", 454 logKey.getReplicationScopes()); 455 } 456 457 @Test 458 public void testBulkLoadWALEdits() throws Exception { 459 // 1. Get the bulk load wal edit event 460 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 461 WALEdit logEdit = getBulkLoadWALEdit(scope); 462 // 2. Create wal key 463 WALKeyImpl logKey = new WALKeyImpl(scope); 464 // 3. Enable bulk load hfile replication 465 Configuration bulkLoadConf = HBaseConfiguration.create(conf); 466 bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 467 468 // 4. Get the scopes for the key 469 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); 470 471 NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); 472 // Assert family with replication scope global is present in the key scopes 473 assertTrue("This family scope is set to global, should be part of replication key scopes.", 474 scopes.containsKey(f1)); 475 // Assert family with replication scope local is not present in the key scopes 476 assertFalse("This family scope is set to local, should not be part of replication key scopes", 477 scopes.containsKey(f2)); 478 } 479 480 /** 481 * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the 482 * corresponding ReplicationSourceInterface correctly cleans up the corresponding replication 483 * queue and ReplicationPeer. See HBASE-16096. 484 */ 485 @Test 486 public void testPeerRemovalCleanup() throws Exception { 487 String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); 488 final String peerId = "FakePeer"; 489 final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 490 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); 491 try { 492 DummyServer server = new DummyServer(); 493 ReplicationQueueStorage rq = ReplicationStorageFactory 494 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 495 // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface 496 // initialization to throw an exception. 497 conf.set("replication.replicationsource.implementation", 498 FailInitializeDummyReplicationSource.class.getName()); 499 manager.getReplicationPeers(); 500 // Set up the znode and ReplicationPeer for the fake peer 501 // Don't wait for replication source to initialize, we know it won't. 502 addPeerAndWait(peerId, peerConfig, false); 503 504 // Sanity check 505 assertNull(manager.getSource(peerId)); 506 507 // Create a replication queue for the fake peer 508 rq.addWAL(server.getServerName(), peerId, "FakeFile"); 509 // Unregister peer, this should remove the peer and clear all queues associated with it 510 // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. 511 removePeerAndWait(peerId); 512 assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); 513 } finally { 514 conf.set("replication.replicationsource.implementation", replicationSourceImplName); 515 removePeerAndWait(peerId); 516 } 517 } 518 519 private static MetricsReplicationSourceSource getGlobalSource() throws Exception { 520 ReplicationSourceInterface source = manager.getSource(slaveId); 521 // Retrieve the global replication metrics source 522 Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); 523 f.setAccessible(true); 524 return (MetricsReplicationSourceSource) f.get(source.getSourceMetrics()); 525 } 526 527 private static long getSizeOfLatestPath() { 528 // If no mini cluster is running, there are extra replication manager influencing the metrics. 529 if (utility.getMiniHBaseCluster() == null) { 530 return 0; 531 } 532 return utility.getMiniHBaseCluster().getRegionServerThreads().stream() 533 .map(JVMClusterUtil.RegionServerThread::getRegionServer) 534 .map(RegionServerServices::getReplicationSourceService).map(r -> (Replication) r) 535 .map(Replication::getReplicationManager) 536 .mapToLong(ReplicationSourceManager::getSizeOfLatestPath).sum(); 537 } 538 539 @Test 540 public void testRemovePeerMetricsCleanup() throws Exception { 541 final String peerId = "DummyPeer"; 542 final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 543 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); 544 try { 545 MetricsReplicationSourceSource globalSource = getGlobalSource(); 546 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 547 final long sizeOfLatestPath = getSizeOfLatestPath(); 548 addPeerAndWait(peerId, peerConfig, true); 549 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 550 ReplicationSourceInterface source = manager.getSource(peerId); 551 // Sanity check 552 assertNotNull(source); 553 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 554 // Enqueue log and check if metrics updated 555 source.enqueueLog(new Path("abc")); 556 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 557 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 558 globalSource.getSizeOfLogQueue()); 559 560 // Removing the peer should reset the global metrics 561 removePeerAndWait(peerId); 562 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 563 564 // Adding the same peer back again should reset the single source metrics 565 addPeerAndWait(peerId, peerConfig, true); 566 source = manager.getSource(peerId); 567 assertNotNull(source); 568 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 569 globalSource.getSizeOfLogQueue()); 570 } finally { 571 removePeerAndWait(peerId); 572 } 573 } 574 575 @Test 576 public void testDisablePeerMetricsCleanup() throws Exception { 577 final String peerId = "DummyPeer"; 578 final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 579 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); 580 try { 581 MetricsReplicationSourceSource globalSource = getGlobalSource(); 582 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 583 final long sizeOfLatestPath = getSizeOfLatestPath(); 584 addPeerAndWait(peerId, peerConfig, true); 585 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 586 ReplicationSourceInterface source = manager.getSource(peerId); 587 // Sanity check 588 assertNotNull(source); 589 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 590 // Enqueue log and check if metrics updated 591 source.enqueueLog(new Path("abc")); 592 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 593 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 594 globalSource.getSizeOfLogQueue()); 595 596 // Refreshing the peer should decrement the global and single source metrics 597 manager.refreshSources(peerId); 598 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 599 600 source = manager.getSource(peerId); 601 assertNotNull(source); 602 assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 603 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 604 globalSource.getSizeOfLogQueue()); 605 } finally { 606 removePeerAndWait(peerId); 607 } 608 } 609 610 private ReplicationSourceInterface mockReplicationSource(String peerId) { 611 ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); 612 when(source.getPeerId()).thenReturn(peerId); 613 when(source.getQueueId()).thenReturn(peerId); 614 when(source.isRecovered()).thenReturn(false); 615 when(source.isSyncReplication()).thenReturn(true); 616 ReplicationPeerConfig config = mock(ReplicationPeerConfig.class); 617 when(config.getRemoteWALDir()) 618 .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); 619 ReplicationPeer peer = mock(ReplicationPeer.class); 620 when(peer.getPeerConfig()).thenReturn(config); 621 when(source.getPeer()).thenReturn(peer); 622 return source; 623 } 624 625 @Test 626 public void testRemoveRemoteWALs() throws Exception { 627 String peerId2 = slaveId + "_2"; 628 addPeerAndWait(peerId2, 629 ReplicationPeerConfig.newBuilder() 630 .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(), 631 true); 632 try { 633 // make sure that we can deal with files which does not exist 634 String walNameNotExists = 635 "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; 636 Path wal = new Path(logDir, walNameNotExists); 637 manager.preLogRoll(wal); 638 manager.postLogRoll(wal); 639 640 Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); 641 fs.mkdirs(remoteLogDirForPeer); 642 String walName = "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX; 643 Path remoteWAL = 644 new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); 645 fs.create(remoteWAL).close(); 646 wal = new Path(logDir, walName); 647 manager.preLogRoll(wal); 648 manager.postLogRoll(wal); 649 650 ReplicationSourceInterface source = mockReplicationSource(peerId2); 651 manager.cleanOldLogs(walName, true, source); 652 // still there if peer id does not match 653 assertTrue(fs.exists(remoteWAL)); 654 655 source = mockReplicationSource(slaveId); 656 manager.cleanOldLogs(walName, true, source); 657 assertFalse(fs.exists(remoteWAL)); 658 } finally { 659 removePeerAndWait(peerId2); 660 } 661 } 662 663 @Test 664 public void testSameWALPrefix() throws IOException { 665 Set<String> latestWalsBefore = 666 manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); 667 String walName1 = "localhost,8080,12345-45678-Peer.34567"; 668 String walName2 = "localhost,8080,12345.56789"; 669 manager.preLogRoll(new Path(walName1)); 670 manager.preLogRoll(new Path(walName2)); 671 672 Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName) 673 .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); 674 assertEquals(2, latestWals.size()); 675 assertTrue(latestWals.contains(walName1)); 676 assertTrue(latestWals.contains(walName2)); 677 } 678 679 /** 680 * Add a peer and wait for it to initialize 681 * @param waitForSource Whether to wait for replication source to initialize 682 */ 683 private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, 684 final boolean waitForSource) throws Exception { 685 final ReplicationPeers rp = manager.getReplicationPeers(); 686 rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE); 687 try { 688 manager.addPeer(peerId); 689 } catch (Exception e) { 690 // ignore the failed exception, because we'll test both success & failed case. 691 } 692 waitPeer(peerId, manager, waitForSource); 693 if (managerOfCluster != null) { 694 managerOfCluster.addPeer(peerId); 695 waitPeer(peerId, managerOfCluster, waitForSource); 696 } 697 } 698 699 private static void waitPeer(final String peerId, ReplicationSourceManager manager, 700 final boolean waitForSource) { 701 ReplicationPeers rp = manager.getReplicationPeers(); 702 Waiter.waitFor(conf, 20000, () -> { 703 if (waitForSource) { 704 ReplicationSourceInterface rs = manager.getSource(peerId); 705 if (rs == null) { 706 return false; 707 } 708 if (rs instanceof ReplicationSourceDummy) { 709 return ((ReplicationSourceDummy) rs).isStartup(); 710 } 711 return true; 712 } else { 713 return (rp.getPeer(peerId) != null); 714 } 715 }); 716 } 717 718 /** 719 * Remove a peer and wait for it to get cleaned up 720 */ 721 private void removePeerAndWait(final String peerId) throws Exception { 722 final ReplicationPeers rp = manager.getReplicationPeers(); 723 if (rp.getPeerStorage().listPeerIds().contains(peerId)) { 724 rp.getPeerStorage().removePeer(peerId); 725 try { 726 manager.removePeer(peerId); 727 } catch (Exception e) { 728 // ignore the failed exception and continue. 729 } 730 } 731 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 732 @Override 733 public boolean evaluate() throws Exception { 734 Collection<String> peers = rp.getPeerStorage().listPeerIds(); 735 return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) 736 && (!peers.contains(peerId)) && manager.getSource(peerId) == null; 737 } 738 }); 739 } 740 741 private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { 742 // 1. Create store files for the families 743 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 744 Map<String, Long> storeFilesSize = new HashMap<>(1); 745 List<Path> p = new ArrayList<>(1); 746 Path hfilePath1 = new Path(Bytes.toString(f1)); 747 p.add(hfilePath1); 748 try { 749 storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); 750 } catch (IOException e) { 751 LOG.debug("Failed to calculate the size of hfile " + hfilePath1); 752 storeFilesSize.put(hfilePath1.getName(), 0L); 753 } 754 storeFiles.put(f1, p); 755 scope.put(f1, 1); 756 p = new ArrayList<>(1); 757 Path hfilePath2 = new Path(Bytes.toString(f2)); 758 p.add(hfilePath2); 759 try { 760 storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); 761 } catch (IOException e) { 762 LOG.debug("Failed to calculate the size of hfile " + hfilePath2); 763 storeFilesSize.put(hfilePath2.getName(), 0L); 764 } 765 storeFiles.put(f2, p); 766 // 2. Create bulk load descriptor 767 BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), 768 UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); 769 770 // 3. create bulk load wal edit event 771 WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); 772 return logEdit; 773 } 774 775 static class DummyNodeFailoverWorker extends Thread { 776 private Map<String, Set<String>> logZnodesMap; 777 Server server; 778 private ServerName deadRS; 779 ReplicationQueueStorage rq; 780 781 public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { 782 this.deadRS = deadRS; 783 this.server = s; 784 this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 785 server.getConfiguration()); 786 } 787 788 @Override 789 public void run() { 790 try { 791 logZnodesMap = new HashMap<>(); 792 List<String> queues = rq.getAllQueues(deadRS); 793 for (String queue : queues) { 794 Pair<String, SortedSet<String>> pair = 795 rq.claimQueue(deadRS, queue, server.getServerName()); 796 if (pair != null) { 797 logZnodesMap.put(pair.getFirst(), pair.getSecond()); 798 } 799 } 800 server.abort("Done with testing", null); 801 } catch (Exception e) { 802 LOG.error("Got exception while running NodeFailoverWorker", e); 803 } finally { 804 latch.countDown(); 805 } 806 } 807 808 /** Returns 1 when the map is not empty. */ 809 private int isLogZnodesMapPopulated() { 810 Collection<Set<String>> sets = logZnodesMap.values(); 811 if (sets.size() > 1) { 812 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); 813 } 814 if (sets.size() == 1) { 815 Set<String> s = sets.iterator().next(); 816 for (String file : files) { 817 // at least one file was missing 818 if (!s.contains(file)) { 819 return 0; 820 } 821 } 822 return 1; // we found all the files 823 } 824 return 0; 825 } 826 } 827 828 static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { 829 830 @Override 831 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 832 ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, 833 UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 834 throws IOException { 835 throw new IOException("Failing deliberately"); 836 } 837 } 838 839 static class DummyServer extends MockServer { 840 String hostname; 841 842 DummyServer() { 843 hostname = "hostname.example.org"; 844 } 845 846 DummyServer(String hostname) { 847 this.hostname = hostname; 848 } 849 850 @Override 851 public Configuration getConfiguration() { 852 return conf; 853 } 854 855 @Override 856 public ZKWatcher getZooKeeper() { 857 return zkw; 858 } 859 860 @Override 861 public Connection getConnection() { 862 return null; 863 } 864 865 @Override 866 public ChoreService getChoreService() { 867 return null; 868 } 869 870 @Override 871 public ServerName getServerName() { 872 return ServerName.valueOf(hostname, 1234, 1L); 873 } 874 } 875}