001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.net.URLEncoder; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableMap; 035import java.util.NavigableSet; 036import java.util.Set; 037import java.util.SortedSet; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.concurrent.CountDownLatch; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.ClusterId; 048import org.apache.hadoop.hbase.CoordinatedStateManager; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HColumnDescriptor; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.HRegionInfo; 055import org.apache.hadoop.hbase.HTableDescriptor; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Server; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.client.ClusterConnection; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.RegionInfo; 064import org.apache.hadoop.hbase.client.RegionInfoBuilder; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 067import org.apache.hadoop.hbase.replication.ReplicationFactory; 068import org.apache.hadoop.hbase.replication.ReplicationPeer; 069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 070import org.apache.hadoop.hbase.replication.ReplicationPeers; 071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; 073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; 075import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; 076import org.apache.hadoop.hbase.testclassification.MediumTests; 077import org.apache.hadoop.hbase.testclassification.ReplicationTests; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CommonFSUtils; 080import org.apache.hadoop.hbase.util.JVMClusterUtil; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WAL; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALFactory; 085import org.apache.hadoop.hbase.wal.WALKeyImpl; 086import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 087import org.apache.hadoop.hbase.zookeeper.ZKUtil; 088import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 089import org.junit.After; 090import org.junit.AfterClass; 091import org.junit.Before; 092import org.junit.ClassRule; 093import org.junit.Rule; 094import org.junit.Test; 095import org.junit.experimental.categories.Category; 096import org.junit.rules.TestName; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 102 103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 106 107/** 108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should 109 * set up the proper config for this class and initialize the proper cluster using 110 * HBaseTestingUtility. 111 */ 112@Category({ReplicationTests.class, MediumTests.class}) 113public abstract class TestReplicationSourceManager { 114 115 @ClassRule 116 public static final HBaseClassTestRule CLASS_RULE = 117 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 118 119 protected static final Logger LOG = 120 LoggerFactory.getLogger(TestReplicationSourceManager.class); 121 122 protected static Configuration conf; 123 124 protected static HBaseTestingUtility utility; 125 126 protected static Replication replication; 127 128 protected static ReplicationSourceManager manager; 129 130 protected static ReplicationSourceManager managerOfCluster; 131 132 protected static ZKWatcher zkw; 133 134 protected static HTableDescriptor htd; 135 136 protected static HRegionInfo hri; 137 138 protected static final byte[] r1 = Bytes.toBytes("r1"); 139 140 protected static final byte[] r2 = Bytes.toBytes("r2"); 141 142 protected static final byte[] f1 = Bytes.toBytes("f1"); 143 144 protected static final byte[] f2 = Bytes.toBytes("f2"); 145 146 protected static final TableName test = 147 TableName.valueOf("test"); 148 149 protected static final String slaveId = "1"; 150 151 protected static FileSystem fs; 152 153 protected static Path oldLogDir; 154 155 protected static Path logDir; 156 157 protected static CountDownLatch latch; 158 159 protected static List<String> files = new ArrayList<>(); 160 protected static NavigableMap<byte[], Integer> scopes; 161 162 protected static void setupZkAndReplication() throws Exception { 163 // The implementing class should set up the conf 164 assertNotNull(conf); 165 zkw = new ZKWatcher(conf, "test", null); 166 ZKUtil.createWithParents(zkw, "/hbase/replication"); 167 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); 168 ZKUtil.setData(zkw, "/hbase/replication/peers/1", 169 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 170 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); 171 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); 172 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", 173 ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 174 ZKUtil.createWithParents(zkw, "/hbase/replication/state"); 175 ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 176 177 ZKClusterId.setClusterId(zkw, new ClusterId()); 178 CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); 179 fs = FileSystem.get(conf); 180 oldLogDir = new Path(utility.getDataTestDir(), 181 HConstants.HREGION_OLDLOGDIR_NAME); 182 logDir = new Path(utility.getDataTestDir(), 183 HConstants.HREGION_LOGDIR_NAME); 184 replication = new Replication(); 185 replication.initialize(new DummyServer(), fs, logDir, oldLogDir, 186 new WALFactory(conf, "test", null)); 187 managerOfCluster = getManagerFromCluster(); 188 if (managerOfCluster != null) { 189 // After replication procedure, we need to add peer by hand (other than by receiving 190 // notification from zk) 191 managerOfCluster.addPeer(slaveId); 192 } 193 194 manager = replication.getReplicationManager(); 195 manager.addSource(slaveId); 196 if (managerOfCluster != null) { 197 waitPeer(slaveId, managerOfCluster, true); 198 } 199 waitPeer(slaveId, manager, true); 200 201 htd = new HTableDescriptor(test); 202 HColumnDescriptor col = new HColumnDescriptor(f1); 203 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 204 htd.addFamily(col); 205 col = new HColumnDescriptor(f2); 206 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); 207 htd.addFamily(col); 208 209 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 210 for(byte[] fam : htd.getFamiliesKeys()) { 211 scopes.put(fam, 0); 212 } 213 hri = new HRegionInfo(htd.getTableName(), r1, r2); 214 } 215 216 private static ReplicationSourceManager getManagerFromCluster() { 217 // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. 218 if (utility.getMiniHBaseCluster() == null) { 219 return null; 220 } 221 return utility.getMiniHBaseCluster().getRegionServerThreads() 222 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 223 .findAny() 224 .map(HRegionServer::getReplicationSourceService) 225 .map(r -> (Replication)r) 226 .map(Replication::getReplicationManager) 227 .get(); 228 } 229 230 @AfterClass 231 public static void tearDownAfterClass() throws Exception { 232 if (manager != null) { 233 manager.join(); 234 } 235 utility.shutdownMiniCluster(); 236 } 237 238 @Rule 239 public TestName testName = new TestName(); 240 241 private void cleanLogDir() throws IOException { 242 fs.delete(logDir, true); 243 fs.delete(oldLogDir, true); 244 } 245 246 @Before 247 public void setUp() throws Exception { 248 LOG.info("Start " + testName.getMethodName()); 249 cleanLogDir(); 250 } 251 252 @After 253 public void tearDown() throws Exception { 254 LOG.info("End " + testName.getMethodName()); 255 cleanLogDir(); 256 List<String> ids = manager.getSources().stream() 257 .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList()); 258 for (String id : ids) { 259 if (slaveId.equals(id)) { 260 continue; 261 } 262 removePeerAndWait(id); 263 } 264 } 265 266 @Test 267 public void testLogRoll() throws Exception { 268 long baseline = 1000; 269 long time = baseline; 270 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 271 KeyValue kv = new KeyValue(r1, f1, r1); 272 WALEdit edit = new WALEdit(); 273 edit.add(kv); 274 275 WALFactory wals = 276 new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); 277 ReplicationSourceManager replicationManager = replication.getReplicationManager(); 278 wals.getWALProvider() 279 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 280 final WAL wal = wals.getWAL(hri); 281 manager.init(); 282 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); 283 htd.addFamily(new HColumnDescriptor(f1)); 284 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 285 for(byte[] fam : htd.getFamiliesKeys()) { 286 scopes.put(fam, 0); 287 } 288 // Testing normal log rolling every 20 289 for(long i = 1; i < 101; i++) { 290 if(i > 1 && i % 20 == 0) { 291 wal.rollWriter(); 292 } 293 LOG.info(Long.toString(i)); 294 final long txid = wal.appendData(hri, 295 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 296 edit); 297 wal.sync(txid); 298 } 299 300 // Simulate a rapid insert that's followed 301 // by a report that's still not totally complete (missing last one) 302 LOG.info(baseline + " and " + time); 303 baseline += 101; 304 time = baseline; 305 LOG.info(baseline + " and " + time); 306 307 for (int i = 0; i < 3; i++) { 308 wal.appendData(hri, 309 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 310 edit); 311 } 312 wal.sync(); 313 314 int logNumber = 0; 315 for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId) 316 .entrySet()) { 317 logNumber += entry.getValue().size(); 318 } 319 assertEquals(6, logNumber); 320 321 wal.rollWriter(); 322 323 manager.logPositionAndCleanOldLogs(manager.getSources().get(0), 324 new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); 325 326 wal.appendData(hri, 327 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 328 edit); 329 wal.sync(); 330 331 assertEquals(1, manager.getWALs().size()); 332 333 334 // TODO Need a case with only 2 WALs and we only want to delete the first one 335 } 336 337 @Test 338 public void testClaimQueues() throws Exception { 339 Server server = new DummyServer("hostname0.example.org"); 340 ReplicationQueueStorage rq = ReplicationStorageFactory 341 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 342 // populate some znodes in the peer znode 343 files.add("log1"); 344 files.add("log2"); 345 for (String file : files) { 346 rq.addWAL(server.getServerName(), "1", file); 347 } 348 // create 3 DummyServers 349 Server s1 = new DummyServer("dummyserver1.example.org"); 350 Server s2 = new DummyServer("dummyserver2.example.org"); 351 Server s3 = new DummyServer("dummyserver3.example.org"); 352 353 // create 3 DummyNodeFailoverWorkers 354 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); 355 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); 356 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); 357 358 latch = new CountDownLatch(3); 359 // start the threads 360 w1.start(); 361 w2.start(); 362 w3.start(); 363 // make sure only one is successful 364 int populatedMap = 0; 365 // wait for result now... till all the workers are done. 366 latch.await(); 367 populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() 368 + w3.isLogZnodesMapPopulated(); 369 assertEquals(1, populatedMap); 370 server.abort("", null); 371 } 372 373 @Test 374 public void testCleanupFailoverQueues() throws Exception { 375 Server server = new DummyServer("hostname1.example.org"); 376 ReplicationQueueStorage rq = ReplicationStorageFactory 377 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 378 // populate some znodes in the peer znode 379 SortedSet<String> files = new TreeSet<>(); 380 String group = "testgroup"; 381 String file1 = group + ".log1"; 382 String file2 = group + ".log2"; 383 files.add(file1); 384 files.add(file2); 385 for (String file : files) { 386 rq.addWAL(server.getServerName(), "1", file); 387 } 388 Server s1 = new DummyServer("dummyserver1.example.org"); 389 ReplicationPeers rp1 = 390 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); 391 rp1.init(); 392 NodeFailoverWorker w1 = 393 manager.new NodeFailoverWorker(server.getServerName()); 394 w1.run(); 395 assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); 396 String id = "1-" + server.getServerName().getServerName(); 397 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); 398 manager.cleanOldLogs(file2, false, id, true); 399 // log1 should be deleted 400 assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); 401 } 402 403 @Test 404 public void testCleanupUnknownPeerZNode() throws Exception { 405 Server server = new DummyServer("hostname2.example.org"); 406 ReplicationQueueStorage rq = ReplicationStorageFactory 407 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 408 // populate some znodes in the peer znode 409 // add log to an unknown peer 410 String group = "testgroup"; 411 rq.addWAL(server.getServerName(), "2", group + ".log1"); 412 rq.addWAL(server.getServerName(), "2", group + ".log2"); 413 414 NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); 415 w1.run(); 416 417 // The log of the unknown peer should be removed from zk 418 for (String peer : manager.getAllQueues()) { 419 assertTrue(peer.startsWith("1")); 420 } 421 } 422 423 /** 424 * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the 425 * compaction WALEdit. 426 */ 427 @Test 428 public void testCompactionWALEdits() throws Exception { 429 TableName tableName = TableName.valueOf("testCompactionWALEdits"); 430 WALProtos.CompactionDescriptor compactionDescriptor = 431 WALProtos.CompactionDescriptor.getDefaultInstance(); 432 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) 433 .setEndKey(HConstants.EMPTY_END_ROW).build(); 434 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); 435 ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); 436 } 437 438 @Test 439 public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { 440 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 441 // 1. Get the bulk load wal edit event 442 WALEdit logEdit = getBulkLoadWALEdit(scope); 443 // 2. Create wal key 444 WALKeyImpl logKey = new WALKeyImpl(scope); 445 446 // 3. Get the scopes for the key 447 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); 448 449 // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled 450 assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", 451 logKey.getReplicationScopes()); 452 } 453 454 @Test 455 public void testBulkLoadWALEdits() throws Exception { 456 // 1. Get the bulk load wal edit event 457 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 458 WALEdit logEdit = getBulkLoadWALEdit(scope); 459 // 2. Create wal key 460 WALKeyImpl logKey = new WALKeyImpl(scope); 461 // 3. Enable bulk load hfile replication 462 Configuration bulkLoadConf = HBaseConfiguration.create(conf); 463 bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 464 465 // 4. Get the scopes for the key 466 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); 467 468 NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); 469 // Assert family with replication scope global is present in the key scopes 470 assertTrue("This family scope is set to global, should be part of replication key scopes.", 471 scopes.containsKey(f1)); 472 // Assert family with replication scope local is not present in the key scopes 473 assertFalse("This family scope is set to local, should not be part of replication key scopes", 474 scopes.containsKey(f2)); 475 } 476 477 /** 478 * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the 479 * corresponding ReplicationSourceInterface correctly cleans up the corresponding 480 * replication queue and ReplicationPeer. 481 * See HBASE-16096. 482 * @throws Exception 483 */ 484 @Test 485 public void testPeerRemovalCleanup() throws Exception{ 486 String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); 487 final String peerId = "FakePeer"; 488 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 489 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 490 try { 491 DummyServer server = new DummyServer(); 492 ReplicationQueueStorage rq = ReplicationStorageFactory 493 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 494 // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface 495 // initialization to throw an exception. 496 conf.set("replication.replicationsource.implementation", 497 FailInitializeDummyReplicationSource.class.getName()); 498 final ReplicationPeers rp = manager.getReplicationPeers(); 499 // Set up the znode and ReplicationPeer for the fake peer 500 // Don't wait for replication source to initialize, we know it won't. 501 addPeerAndWait(peerId, peerConfig, false); 502 503 // Sanity check 504 assertNull(manager.getSource(peerId)); 505 506 // Create a replication queue for the fake peer 507 rq.addWAL(server.getServerName(), peerId, "FakeFile"); 508 // Unregister peer, this should remove the peer and clear all queues associated with it 509 // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. 510 removePeerAndWait(peerId); 511 assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); 512 } finally { 513 conf.set("replication.replicationsource.implementation", replicationSourceImplName); 514 removePeerAndWait(peerId); 515 } 516 } 517 518 private static MetricsReplicationSourceSource getGlobalSource() throws Exception { 519 ReplicationSourceInterface source = manager.getSource(slaveId); 520 // Retrieve the global replication metrics source 521 Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); 522 f.setAccessible(true); 523 return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); 524 } 525 526 private static long getSizeOfLatestPath() { 527 // If no mini cluster is running, there are extra replication manager influencing the metrics. 528 if (utility.getMiniHBaseCluster() == null) { 529 return 0; 530 } 531 return utility.getMiniHBaseCluster().getRegionServerThreads() 532 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 533 .map(HRegionServer::getReplicationSourceService) 534 .map(r -> (Replication)r) 535 .map(Replication::getReplicationManager) 536 .mapToLong(ReplicationSourceManager::getSizeOfLatestPath) 537 .sum(); 538 } 539 540 @Test 541 public void testRemovePeerMetricsCleanup() throws Exception { 542 final String peerId = "DummyPeer"; 543 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 544 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 545 try { 546 MetricsReplicationSourceSource globalSource = getGlobalSource(); 547 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 548 final long sizeOfLatestPath = getSizeOfLatestPath(); 549 addPeerAndWait(peerId, peerConfig, true); 550 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 551 ReplicationSourceInterface source = manager.getSource(peerId); 552 // Sanity check 553 assertNotNull(source); 554 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 555 // Enqueue log and check if metrics updated 556 source.enqueueLog(new Path("abc")); 557 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 558 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 559 globalSource.getSizeOfLogQueue()); 560 561 // Removing the peer should reset the global metrics 562 removePeerAndWait(peerId); 563 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 564 565 // Adding the same peer back again should reset the single source metrics 566 addPeerAndWait(peerId, peerConfig, true); 567 source = manager.getSource(peerId); 568 assertNotNull(source); 569 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 570 globalSource.getSizeOfLogQueue()); 571 } finally { 572 removePeerAndWait(peerId); 573 } 574 } 575 576 /** 577 * Add a peer and wait for it to initialize 578 * @param peerId 579 * @param peerConfig 580 * @param waitForSource Whether to wait for replication source to initialize 581 * @throws Exception 582 */ 583 private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, 584 final boolean waitForSource) throws Exception { 585 final ReplicationPeers rp = manager.getReplicationPeers(); 586 rp.getPeerStorage().addPeer(peerId, peerConfig, true); 587 try { 588 manager.addPeer(peerId); 589 } catch (Exception e) { 590 // ignore the failed exception, because we'll test both success & failed case. 591 } 592 waitPeer(peerId, manager, waitForSource); 593 if (managerOfCluster != null) { 594 managerOfCluster.addPeer(peerId); 595 waitPeer(peerId, managerOfCluster, waitForSource); 596 } 597 } 598 599 private static void waitPeer(final String peerId, 600 ReplicationSourceManager manager, final boolean waitForSource) { 601 ReplicationPeers rp = manager.getReplicationPeers(); 602 Waiter.waitFor(conf, 20000, () -> { 603 if (waitForSource) { 604 ReplicationSourceInterface rs = manager.getSource(peerId); 605 if (rs == null) { 606 return false; 607 } 608 if (rs instanceof ReplicationSourceDummy) { 609 return ((ReplicationSourceDummy)rs).isStartup(); 610 } 611 return true; 612 } else { 613 return (rp.getPeer(peerId) != null); 614 } 615 }); 616 } 617 618 /** 619 * Remove a peer and wait for it to get cleaned up 620 * @param peerId 621 * @throws Exception 622 */ 623 private void removePeerAndWait(final String peerId) throws Exception { 624 final ReplicationPeers rp = manager.getReplicationPeers(); 625 if (rp.getPeerStorage().listPeerIds().contains(peerId)) { 626 rp.getPeerStorage().removePeer(peerId); 627 try { 628 manager.removePeer(peerId); 629 } catch (Exception e) { 630 // ignore the failed exception and continue. 631 } 632 } 633 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 634 @Override 635 public boolean evaluate() throws Exception { 636 Collection<String> peers = rp.getPeerStorage().listPeerIds(); 637 return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) 638 && (!peers.contains(peerId)) && manager.getSource(peerId) == null; 639 } 640 }); 641 } 642 643 private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { 644 // 1. Create store files for the families 645 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 646 Map<String, Long> storeFilesSize = new HashMap<>(1); 647 List<Path> p = new ArrayList<>(1); 648 Path hfilePath1 = new Path(Bytes.toString(f1)); 649 p.add(hfilePath1); 650 try { 651 storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); 652 } catch (IOException e) { 653 LOG.debug("Failed to calculate the size of hfile " + hfilePath1); 654 storeFilesSize.put(hfilePath1.getName(), 0L); 655 } 656 storeFiles.put(f1, p); 657 scope.put(f1, 1); 658 p = new ArrayList<>(1); 659 Path hfilePath2 = new Path(Bytes.toString(f2)); 660 p.add(hfilePath2); 661 try { 662 storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); 663 } catch (IOException e) { 664 LOG.debug("Failed to calculate the size of hfile " + hfilePath2); 665 storeFilesSize.put(hfilePath2.getName(), 0L); 666 } 667 storeFiles.put(f2, p); 668 // 2. Create bulk load descriptor 669 BulkLoadDescriptor desc = 670 ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), 671 UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); 672 673 // 3. create bulk load wal edit event 674 WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); 675 return logEdit; 676 } 677 678 static class DummyNodeFailoverWorker extends Thread { 679 private Map<String, Set<String>> logZnodesMap; 680 Server server; 681 private ServerName deadRS; 682 ReplicationQueueStorage rq; 683 684 public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { 685 this.deadRS = deadRS; 686 this.server = s; 687 this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 688 server.getConfiguration()); 689 } 690 691 @Override 692 public void run() { 693 try { 694 logZnodesMap = new HashMap<>(); 695 List<String> queues = rq.getAllQueues(deadRS); 696 for (String queue : queues) { 697 Pair<String, SortedSet<String>> pair = 698 rq.claimQueue(deadRS, queue, server.getServerName()); 699 if (pair != null) { 700 logZnodesMap.put(pair.getFirst(), pair.getSecond()); 701 } 702 } 703 server.abort("Done with testing", null); 704 } catch (Exception e) { 705 LOG.error("Got exception while running NodeFailoverWorker", e); 706 } finally { 707 latch.countDown(); 708 } 709 } 710 711 /** 712 * @return 1 when the map is not empty. 713 */ 714 private int isLogZnodesMapPopulated() { 715 Collection<Set<String>> sets = logZnodesMap.values(); 716 if (sets.size() > 1) { 717 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); 718 } 719 if (sets.size() == 1) { 720 Set<String> s = sets.iterator().next(); 721 for (String file : files) { 722 // at least one file was missing 723 if (!s.contains(file)) { 724 return 0; 725 } 726 } 727 return 1; // we found all the files 728 } 729 return 0; 730 } 731 } 732 733 static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { 734 735 @Override 736 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 737 ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, 738 UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 739 throws IOException { 740 throw new IOException("Failing deliberately"); 741 } 742 } 743 744 static class DummyServer implements Server { 745 String hostname; 746 747 DummyServer() { 748 hostname = "hostname.example.org"; 749 } 750 751 DummyServer(String hostname) { 752 this.hostname = hostname; 753 } 754 755 @Override 756 public Configuration getConfiguration() { 757 return conf; 758 } 759 760 @Override 761 public ZKWatcher getZooKeeper() { 762 return zkw; 763 } 764 765 @Override 766 public CoordinatedStateManager getCoordinatedStateManager() { 767 return null; 768 } 769 @Override 770 public ClusterConnection getConnection() { 771 return null; 772 } 773 774 @Override 775 public ServerName getServerName() { 776 return ServerName.valueOf(hostname, 1234, 1L); 777 } 778 779 @Override 780 public void abort(String why, Throwable e) { 781 // To change body of implemented methods use File | Settings | File Templates. 782 } 783 784 @Override 785 public boolean isAborted() { 786 return false; 787 } 788 789 @Override 790 public void stop(String why) { 791 // To change body of implemented methods use File | Settings | File Templates. 792 } 793 794 @Override 795 public boolean isStopped() { 796 return false; // To change body of implemented methods use File | Settings | File Templates. 797 } 798 799 @Override 800 public ChoreService getChoreService() { 801 return null; 802 } 803 804 @Override 805 public ClusterConnection getClusterConnection() { 806 // TODO Auto-generated method stub 807 return null; 808 } 809 810 @Override 811 public FileSystem getFileSystem() { 812 return null; 813 } 814 815 @Override 816 public boolean isStopping() { 817 return false; 818 } 819 820 @Override 821 public Connection createConnection(Configuration conf) throws IOException { 822 return null; 823 } 824 } 825}