001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.net.URLEncoder; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableMap; 035import java.util.NavigableSet; 036import java.util.Set; 037import java.util.SortedSet; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.concurrent.CountDownLatch; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.ClusterId; 048import org.apache.hadoop.hbase.CoordinatedStateManager; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HColumnDescriptor; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.HRegionInfo; 055import org.apache.hadoop.hbase.HTableDescriptor; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Server; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.client.ClusterConnection; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.RegionInfo; 064import org.apache.hadoop.hbase.client.RegionInfoBuilder; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 067import org.apache.hadoop.hbase.replication.ReplicationFactory; 068import org.apache.hadoop.hbase.replication.ReplicationPeer; 069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 070import org.apache.hadoop.hbase.replication.ReplicationPeers; 071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; 073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; 075import org.apache.hadoop.hbase.testclassification.MediumTests; 076import org.apache.hadoop.hbase.testclassification.ReplicationTests; 077import org.apache.hadoop.hbase.util.Bytes; 078import org.apache.hadoop.hbase.util.CommonFSUtils; 079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 080import org.apache.hadoop.hbase.util.JVMClusterUtil; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WAL; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALFactory; 085import org.apache.hadoop.hbase.wal.WALKeyImpl; 086import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 087import org.apache.hadoop.hbase.zookeeper.ZKUtil; 088import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 089import org.junit.After; 090import org.junit.AfterClass; 091import org.junit.Before; 092import org.junit.ClassRule; 093import org.junit.Rule; 094import org.junit.Test; 095import org.junit.experimental.categories.Category; 096import org.junit.rules.TestName; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 102 103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 106 107/** 108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should set 109 * up the proper config for this class and initialize the proper cluster using HBaseTestingUtility. 110 */ 111@Category({ ReplicationTests.class, MediumTests.class }) 112public abstract class TestReplicationSourceManager { 113 114 @ClassRule 115 public static final HBaseClassTestRule CLASS_RULE = 116 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 117 118 protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationSourceManager.class); 119 120 protected static Configuration conf; 121 122 protected static HBaseTestingUtility utility; 123 124 protected static Replication replication; 125 126 protected static ReplicationSourceManager manager; 127 128 protected static ReplicationSourceManager managerOfCluster; 129 130 protected static ZKWatcher zkw; 131 132 protected static HTableDescriptor htd; 133 134 protected static HRegionInfo hri; 135 136 protected static final byte[] r1 = Bytes.toBytes("r1"); 137 138 protected static final byte[] r2 = Bytes.toBytes("r2"); 139 140 protected static final byte[] f1 = Bytes.toBytes("f1"); 141 142 protected static final byte[] f2 = Bytes.toBytes("f2"); 143 144 protected static final TableName test = TableName.valueOf("test"); 145 146 protected static final String slaveId = "1"; 147 148 protected static FileSystem fs; 149 150 protected static Path oldLogDir; 151 152 protected static Path logDir; 153 154 protected static CountDownLatch latch; 155 156 protected static List<String> files = new ArrayList<>(); 157 protected static NavigableMap<byte[], Integer> scopes; 158 159 protected static void setupZkAndReplication() throws Exception { 160 // The implementing class should set up the conf 161 assertNotNull(conf); 162 zkw = new ZKWatcher(conf, "test", null); 163 ZKUtil.createWithParents(zkw, "/hbase/replication"); 164 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); 165 ZKUtil.setData(zkw, "/hbase/replication/peers/1", 166 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 167 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); 168 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); 169 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", 170 ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 171 ZKUtil.createWithParents(zkw, "/hbase/replication/state"); 172 ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 173 174 ZKClusterId.setClusterId(zkw, new ClusterId()); 175 CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); 176 fs = FileSystem.get(conf); 177 oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 178 logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); 179 replication = new Replication(); 180 replication.initialize(new DummyServer(), fs, logDir, oldLogDir, 181 new WALFactory(conf, "test", null)); 182 managerOfCluster = getManagerFromCluster(); 183 if (managerOfCluster != null) { 184 // After replication procedure, we need to add peer by hand (other than by receiving 185 // notification from zk) 186 managerOfCluster.addPeer(slaveId); 187 } 188 189 manager = replication.getReplicationManager(); 190 manager.addSource(slaveId); 191 if (managerOfCluster != null) { 192 waitPeer(slaveId, managerOfCluster, true); 193 } 194 waitPeer(slaveId, manager, true); 195 196 htd = new HTableDescriptor(test); 197 HColumnDescriptor col = new HColumnDescriptor(f1); 198 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 199 htd.addFamily(col); 200 col = new HColumnDescriptor(f2); 201 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); 202 htd.addFamily(col); 203 204 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 205 for (byte[] fam : htd.getFamiliesKeys()) { 206 scopes.put(fam, 0); 207 } 208 hri = new HRegionInfo(htd.getTableName(), r1, r2); 209 } 210 211 private static ReplicationSourceManager getManagerFromCluster() { 212 // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. 213 if (utility.getMiniHBaseCluster() == null) { 214 return null; 215 } 216 return utility.getMiniHBaseCluster().getRegionServerThreads().stream() 217 .map(JVMClusterUtil.RegionServerThread::getRegionServer).findAny() 218 .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r) 219 .map(Replication::getReplicationManager).get(); 220 } 221 222 @AfterClass 223 public static void tearDownAfterClass() throws Exception { 224 if (manager != null) { 225 manager.join(); 226 } 227 utility.shutdownMiniCluster(); 228 } 229 230 @Rule 231 public TestName testName = new TestName(); 232 233 private void cleanLogDir() throws IOException { 234 fs.delete(logDir, true); 235 fs.delete(oldLogDir, true); 236 } 237 238 @Before 239 public void setUp() throws Exception { 240 LOG.info("Start " + testName.getMethodName()); 241 cleanLogDir(); 242 } 243 244 @After 245 public void tearDown() throws Exception { 246 LOG.info("End " + testName.getMethodName()); 247 cleanLogDir(); 248 List<String> ids = manager.getSources().stream().map(ReplicationSourceInterface::getPeerId) 249 .collect(Collectors.toList()); 250 for (String id : ids) { 251 if (slaveId.equals(id)) { 252 continue; 253 } 254 removePeerAndWait(id); 255 } 256 } 257 258 @Test 259 public void testLogRoll() throws Exception { 260 long baseline = 1000; 261 long time = baseline; 262 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 263 KeyValue kv = new KeyValue(r1, f1, r1); 264 WALEdit edit = new WALEdit(); 265 edit.add(kv); 266 267 WALFactory wals = 268 new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); 269 ReplicationSourceManager replicationManager = replication.getReplicationManager(); 270 wals.getWALProvider() 271 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 272 final WAL wal = wals.getWAL(hri); 273 manager.init(); 274 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); 275 htd.addFamily(new HColumnDescriptor(f1)); 276 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 277 for (byte[] fam : htd.getFamiliesKeys()) { 278 scopes.put(fam, 0); 279 } 280 // Testing normal log rolling every 20 281 for (long i = 1; i < 101; i++) { 282 if (i > 1 && i % 20 == 0) { 283 wal.rollWriter(); 284 } 285 LOG.info(Long.toString(i)); 286 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 287 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 288 wal.sync(txid); 289 } 290 291 // Simulate a rapid insert that's followed 292 // by a report that's still not totally complete (missing last one) 293 LOG.info(baseline + " and " + time); 294 baseline += 101; 295 time = baseline; 296 LOG.info(baseline + " and " + time); 297 298 for (int i = 0; i < 3; i++) { 299 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 300 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 301 } 302 wal.sync(); 303 304 int logNumber = 0; 305 for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId) 306 .entrySet()) { 307 logNumber += entry.getValue().size(); 308 } 309 assertEquals(6, logNumber); 310 311 wal.rollWriter(); 312 313 manager.logPositionAndCleanOldLogs(manager.getSources().get(0), 314 new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); 315 316 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 317 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 318 wal.sync(); 319 320 assertEquals(1, manager.getWALs().size()); 321 322 // TODO Need a case with only 2 WALs and we only want to delete the first one 323 } 324 325 @Test 326 public void testClaimQueues() throws Exception { 327 Server server = new DummyServer("hostname0.example.org"); 328 ReplicationQueueStorage rq = ReplicationStorageFactory 329 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 330 // populate some znodes in the peer znode 331 files.add("log1"); 332 files.add("log2"); 333 for (String file : files) { 334 rq.addWAL(server.getServerName(), "1", file); 335 } 336 // create 3 DummyServers 337 Server s1 = new DummyServer("dummyserver1.example.org"); 338 Server s2 = new DummyServer("dummyserver2.example.org"); 339 Server s3 = new DummyServer("dummyserver3.example.org"); 340 341 // create 3 DummyNodeFailoverWorkers 342 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); 343 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); 344 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); 345 346 latch = new CountDownLatch(3); 347 // start the threads 348 w1.start(); 349 w2.start(); 350 w3.start(); 351 // make sure only one is successful 352 int populatedMap = 0; 353 // wait for result now... till all the workers are done. 354 latch.await(); 355 populatedMap += 356 w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated(); 357 assertEquals(1, populatedMap); 358 server.abort("", null); 359 } 360 361 @Test 362 public void testCleanupFailoverQueues() throws Exception { 363 Server server = new DummyServer("hostname1.example.org"); 364 ReplicationQueueStorage rq = ReplicationStorageFactory 365 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 366 // populate some znodes in the peer znode 367 SortedSet<String> files = new TreeSet<>(); 368 String group = "testgroup"; 369 String file1 = group + ".log1"; 370 String file2 = group + ".log2"; 371 files.add(file1); 372 files.add(file2); 373 for (String file : files) { 374 rq.addWAL(server.getServerName(), "1", file); 375 } 376 Server s1 = new DummyServer("dummyserver1.example.org"); 377 ReplicationPeers rp1 = 378 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); 379 rp1.init(); 380 manager.claimQueue(server.getServerName(), "1"); 381 assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); 382 String id = "1-" + server.getServerName().getServerName(); 383 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); 384 manager.cleanOldLogs(file2, false, id, true); 385 // log1 should be deleted 386 assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); 387 } 388 389 @Test 390 public void testCleanupUnknownPeerZNode() throws Exception { 391 Server server = new DummyServer("hostname2.example.org"); 392 ReplicationQueueStorage rq = ReplicationStorageFactory 393 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 394 // populate some znodes in the peer znode 395 // add log to an unknown peer 396 String group = "testgroup"; 397 rq.addWAL(server.getServerName(), "2", group + ".log1"); 398 rq.addWAL(server.getServerName(), "2", group + ".log2"); 399 400 manager.claimQueue(server.getServerName(), "2"); 401 402 // The log of the unknown peer should be removed from zk 403 for (String peer : manager.getAllQueues()) { 404 assertTrue(peer.startsWith("1")); 405 } 406 } 407 408 /** 409 * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the 410 * compaction WALEdit. 411 */ 412 @Test 413 public void testCompactionWALEdits() throws Exception { 414 TableName tableName = TableName.valueOf("testCompactionWALEdits"); 415 WALProtos.CompactionDescriptor compactionDescriptor = 416 WALProtos.CompactionDescriptor.getDefaultInstance(); 417 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) 418 .setEndKey(HConstants.EMPTY_END_ROW).build(); 419 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); 420 ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); 421 } 422 423 @Test 424 public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { 425 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 426 // 1. Get the bulk load wal edit event 427 WALEdit logEdit = getBulkLoadWALEdit(scope); 428 // 2. Create wal key 429 WALKeyImpl logKey = new WALKeyImpl(scope); 430 431 // 3. Get the scopes for the key 432 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); 433 434 // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled 435 assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", 436 logKey.getReplicationScopes()); 437 } 438 439 @Test 440 public void testBulkLoadWALEdits() throws Exception { 441 // 1. Get the bulk load wal edit event 442 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 443 WALEdit logEdit = getBulkLoadWALEdit(scope); 444 // 2. Create wal key 445 WALKeyImpl logKey = new WALKeyImpl(scope); 446 // 3. Enable bulk load hfile replication 447 Configuration bulkLoadConf = HBaseConfiguration.create(conf); 448 bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 449 450 // 4. Get the scopes for the key 451 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); 452 453 NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); 454 // Assert family with replication scope global is present in the key scopes 455 assertTrue("This family scope is set to global, should be part of replication key scopes.", 456 scopes.containsKey(f1)); 457 // Assert family with replication scope local is not present in the key scopes 458 assertFalse("This family scope is set to local, should not be part of replication key scopes", 459 scopes.containsKey(f2)); 460 } 461 462 /** 463 * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the 464 * corresponding ReplicationSourceInterface correctly cleans up the corresponding replication 465 * queue and ReplicationPeer. See HBASE-16096. n 466 */ 467 @Test 468 public void testPeerRemovalCleanup() throws Exception { 469 String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); 470 final String peerId = "FakePeer"; 471 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 472 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 473 try { 474 DummyServer server = new DummyServer(); 475 ReplicationQueueStorage rq = ReplicationStorageFactory 476 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 477 // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface 478 // initialization to throw an exception. 479 conf.set("replication.replicationsource.implementation", 480 FailInitializeDummyReplicationSource.class.getName()); 481 final ReplicationPeers rp = manager.getReplicationPeers(); 482 // Set up the znode and ReplicationPeer for the fake peer 483 // Don't wait for replication source to initialize, we know it won't. 484 addPeerAndWait(peerId, peerConfig, false); 485 486 // Sanity check 487 assertNull(manager.getSource(peerId)); 488 489 // Create a replication queue for the fake peer 490 rq.addWAL(server.getServerName(), peerId, "FakeFile"); 491 // Unregister peer, this should remove the peer and clear all queues associated with it 492 // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. 493 removePeerAndWait(peerId); 494 assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); 495 } finally { 496 conf.set("replication.replicationsource.implementation", replicationSourceImplName); 497 removePeerAndWait(peerId); 498 } 499 } 500 501 private static MetricsReplicationSourceSource getGlobalSource() throws Exception { 502 ReplicationSourceInterface source = manager.getSource(slaveId); 503 // Retrieve the global replication metrics source 504 Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); 505 f.setAccessible(true); 506 return (MetricsReplicationSourceSource) f.get(source.getSourceMetrics()); 507 } 508 509 private static long getSizeOfLatestPath() { 510 // If no mini cluster is running, there are extra replication manager influencing the metrics. 511 if (utility.getMiniHBaseCluster() == null) { 512 return 0; 513 } 514 return utility.getMiniHBaseCluster().getRegionServerThreads().stream() 515 .map(JVMClusterUtil.RegionServerThread::getRegionServer) 516 .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r) 517 .map(Replication::getReplicationManager) 518 .mapToLong(ReplicationSourceManager::getSizeOfLatestPath).sum(); 519 } 520 521 @Test 522 public void testRemovePeerMetricsCleanup() throws Exception { 523 final String peerId = "DummyPeer"; 524 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 525 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 526 try { 527 MetricsReplicationSourceSource globalSource = getGlobalSource(); 528 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 529 final long sizeOfLatestPath = getSizeOfLatestPath(); 530 addPeerAndWait(peerId, peerConfig, true); 531 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 532 ReplicationSourceInterface source = manager.getSource(peerId); 533 // Sanity check 534 assertNotNull(source); 535 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 536 // Enqueue log and check if metrics updated 537 source.enqueueLog(new Path("abc")); 538 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 539 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 540 globalSource.getSizeOfLogQueue()); 541 542 // Removing the peer should reset the global metrics 543 removePeerAndWait(peerId); 544 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 545 546 // Adding the same peer back again should reset the single source metrics 547 addPeerAndWait(peerId, peerConfig, true); 548 source = manager.getSource(peerId); 549 assertNotNull(source); 550 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 551 globalSource.getSizeOfLogQueue()); 552 } finally { 553 removePeerAndWait(peerId); 554 } 555 } 556 557 /** 558 * Add a peer and wait for it to initialize nn * @param waitForSource Whether to wait for 559 * replication source to initialize n 560 */ 561 private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, 562 final boolean waitForSource) throws Exception { 563 final ReplicationPeers rp = manager.getReplicationPeers(); 564 rp.getPeerStorage().addPeer(peerId, peerConfig, true); 565 try { 566 manager.addPeer(peerId); 567 } catch (Exception e) { 568 // ignore the failed exception, because we'll test both success & failed case. 569 } 570 waitPeer(peerId, manager, waitForSource); 571 if (managerOfCluster != null) { 572 managerOfCluster.addPeer(peerId); 573 waitPeer(peerId, managerOfCluster, waitForSource); 574 } 575 } 576 577 private static void waitPeer(final String peerId, ReplicationSourceManager manager, 578 final boolean waitForSource) { 579 ReplicationPeers rp = manager.getReplicationPeers(); 580 Waiter.waitFor(conf, 20000, () -> { 581 if (waitForSource) { 582 ReplicationSourceInterface rs = manager.getSource(peerId); 583 if (rs == null) { 584 return false; 585 } 586 if (rs instanceof ReplicationSourceDummy) { 587 return ((ReplicationSourceDummy) rs).isStartup(); 588 } 589 return true; 590 } else { 591 return (rp.getPeer(peerId) != null); 592 } 593 }); 594 } 595 596 /** 597 * Remove a peer and wait for it to get cleaned up nn 598 */ 599 private void removePeerAndWait(final String peerId) throws Exception { 600 final ReplicationPeers rp = manager.getReplicationPeers(); 601 if (rp.getPeerStorage().listPeerIds().contains(peerId)) { 602 rp.getPeerStorage().removePeer(peerId); 603 try { 604 manager.removePeer(peerId); 605 } catch (Exception e) { 606 // ignore the failed exception and continue. 607 } 608 } 609 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 610 @Override 611 public boolean evaluate() throws Exception { 612 Collection<String> peers = rp.getPeerStorage().listPeerIds(); 613 return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) 614 && (!peers.contains(peerId)) && manager.getSource(peerId) == null; 615 } 616 }); 617 } 618 619 @Test 620 public void testSameWALPrefix() throws IOException { 621 Set<String> latestWalsBefore = 622 manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); 623 String walName1 = "localhost,8080,12345-45678-Peer.34567"; 624 String walName2 = "localhost,8080,12345.56789"; 625 manager.preLogRoll(new Path(walName1)); 626 manager.preLogRoll(new Path(walName2)); 627 628 Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName) 629 .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); 630 assertEquals(2, latestWals.size()); 631 assertTrue(latestWals.contains(walName1)); 632 assertTrue(latestWals.contains(walName2)); 633 } 634 635 private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { 636 // 1. Create store files for the families 637 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 638 Map<String, Long> storeFilesSize = new HashMap<>(1); 639 List<Path> p = new ArrayList<>(1); 640 Path hfilePath1 = new Path(Bytes.toString(f1)); 641 p.add(hfilePath1); 642 try { 643 storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); 644 } catch (IOException e) { 645 LOG.debug("Failed to calculate the size of hfile " + hfilePath1); 646 storeFilesSize.put(hfilePath1.getName(), 0L); 647 } 648 storeFiles.put(f1, p); 649 scope.put(f1, 1); 650 p = new ArrayList<>(1); 651 Path hfilePath2 = new Path(Bytes.toString(f2)); 652 p.add(hfilePath2); 653 try { 654 storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); 655 } catch (IOException e) { 656 LOG.debug("Failed to calculate the size of hfile " + hfilePath2); 657 storeFilesSize.put(hfilePath2.getName(), 0L); 658 } 659 storeFiles.put(f2, p); 660 // 2. Create bulk load descriptor 661 BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), 662 UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); 663 664 // 3. create bulk load wal edit event 665 WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); 666 return logEdit; 667 } 668 669 static class DummyNodeFailoverWorker extends Thread { 670 private Map<String, Set<String>> logZnodesMap; 671 Server server; 672 private ServerName deadRS; 673 ReplicationQueueStorage rq; 674 675 public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { 676 this.deadRS = deadRS; 677 this.server = s; 678 this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 679 server.getConfiguration()); 680 } 681 682 @Override 683 public void run() { 684 try { 685 logZnodesMap = new HashMap<>(); 686 List<String> queues = rq.getAllQueues(deadRS); 687 for (String queue : queues) { 688 Pair<String, SortedSet<String>> pair = 689 rq.claimQueue(deadRS, queue, server.getServerName()); 690 if (pair != null) { 691 logZnodesMap.put(pair.getFirst(), pair.getSecond()); 692 } 693 } 694 server.abort("Done with testing", null); 695 } catch (Exception e) { 696 LOG.error("Got exception while running NodeFailoverWorker", e); 697 } finally { 698 latch.countDown(); 699 } 700 } 701 702 /** Returns 1 when the map is not empty. */ 703 private int isLogZnodesMapPopulated() { 704 Collection<Set<String>> sets = logZnodesMap.values(); 705 if (sets.size() > 1) { 706 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); 707 } 708 if (sets.size() == 1) { 709 Set<String> s = sets.iterator().next(); 710 for (String file : files) { 711 // at least one file was missing 712 if (!s.contains(file)) { 713 return 0; 714 } 715 } 716 return 1; // we found all the files 717 } 718 return 0; 719 } 720 } 721 722 static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { 723 724 @Override 725 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 726 ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, 727 UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 728 throws IOException { 729 throw new IOException("Failing deliberately"); 730 } 731 } 732 733 static class DummyServer implements Server { 734 String hostname; 735 736 DummyServer() { 737 hostname = "hostname.example.org"; 738 } 739 740 DummyServer(String hostname) { 741 this.hostname = hostname; 742 } 743 744 @Override 745 public Configuration getConfiguration() { 746 return conf; 747 } 748 749 @Override 750 public ZKWatcher getZooKeeper() { 751 return zkw; 752 } 753 754 @Override 755 public CoordinatedStateManager getCoordinatedStateManager() { 756 return null; 757 } 758 759 @Override 760 public ClusterConnection getConnection() { 761 return null; 762 } 763 764 @Override 765 public ServerName getServerName() { 766 return ServerName.valueOf(hostname, 1234, 1L); 767 } 768 769 @Override 770 public void abort(String why, Throwable e) { 771 // To change body of implemented methods use File | Settings | File Templates. 772 } 773 774 @Override 775 public boolean isAborted() { 776 return false; 777 } 778 779 @Override 780 public void stop(String why) { 781 // To change body of implemented methods use File | Settings | File Templates. 782 } 783 784 @Override 785 public boolean isStopped() { 786 return false; // To change body of implemented methods use File | Settings | File Templates. 787 } 788 789 @Override 790 public ChoreService getChoreService() { 791 return null; 792 } 793 794 @Override 795 public ClusterConnection getClusterConnection() { 796 // TODO Auto-generated method stub 797 return null; 798 } 799 800 @Override 801 public FileSystem getFileSystem() { 802 return null; 803 } 804 805 @Override 806 public boolean isStopping() { 807 return false; 808 } 809 810 @Override 811 public Connection createConnection(Configuration conf) throws IOException { 812 return null; 813 } 814 } 815}