001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.net.URLEncoder; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableMap; 035import java.util.NavigableSet; 036import java.util.Set; 037import java.util.SortedSet; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.concurrent.CountDownLatch; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.ClusterId; 048import org.apache.hadoop.hbase.CoordinatedStateManager; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HColumnDescriptor; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.HRegionInfo; 055import org.apache.hadoop.hbase.HTableDescriptor; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Server; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.client.ClusterConnection; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.RegionInfo; 064import org.apache.hadoop.hbase.client.RegionInfoBuilder; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 067import org.apache.hadoop.hbase.replication.ReplicationFactory; 068import org.apache.hadoop.hbase.replication.ReplicationPeer; 069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 070import org.apache.hadoop.hbase.replication.ReplicationPeers; 071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; 073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; 075import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; 076import org.apache.hadoop.hbase.testclassification.MediumTests; 077import org.apache.hadoop.hbase.testclassification.ReplicationTests; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CommonFSUtils; 080import org.apache.hadoop.hbase.util.JVMClusterUtil; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WAL; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALFactory; 085import org.apache.hadoop.hbase.wal.WALKeyImpl; 086import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 087import org.apache.hadoop.hbase.zookeeper.ZKUtil; 088import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 089import org.junit.After; 090import org.junit.AfterClass; 091import org.junit.Before; 092import org.junit.ClassRule; 093import org.junit.Rule; 094import org.junit.Test; 095import org.junit.experimental.categories.Category; 096import org.junit.rules.TestName; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 102 103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 106 107/** 108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should 109 * set up the proper config for this class and initialize the proper cluster using 110 * HBaseTestingUtility. 111 */ 112@Category({ReplicationTests.class, MediumTests.class}) 113public abstract class TestReplicationSourceManager { 114 115 @ClassRule 116 public static final HBaseClassTestRule CLASS_RULE = 117 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 118 119 protected static final Logger LOG = 120 LoggerFactory.getLogger(TestReplicationSourceManager.class); 121 122 protected static Configuration conf; 123 124 protected static HBaseTestingUtility utility; 125 126 protected static Replication replication; 127 128 protected static ReplicationSourceManager manager; 129 130 protected static ReplicationSourceManager managerOfCluster; 131 132 protected static ZKWatcher zkw; 133 134 protected static HTableDescriptor htd; 135 136 protected static HRegionInfo hri; 137 138 protected static final byte[] r1 = Bytes.toBytes("r1"); 139 140 protected static final byte[] r2 = Bytes.toBytes("r2"); 141 142 protected static final byte[] f1 = Bytes.toBytes("f1"); 143 144 protected static final byte[] f2 = Bytes.toBytes("f2"); 145 146 protected static final TableName test = 147 TableName.valueOf("test"); 148 149 protected static final String slaveId = "1"; 150 151 protected static FileSystem fs; 152 153 protected static Path oldLogDir; 154 155 protected static Path logDir; 156 157 protected static CountDownLatch latch; 158 159 protected static List<String> files = new ArrayList<>(); 160 protected static NavigableMap<byte[], Integer> scopes; 161 162 protected static void setupZkAndReplication() throws Exception { 163 // The implementing class should set up the conf 164 assertNotNull(conf); 165 zkw = new ZKWatcher(conf, "test", null); 166 ZKUtil.createWithParents(zkw, "/hbase/replication"); 167 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); 168 ZKUtil.setData(zkw, "/hbase/replication/peers/1", 169 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 170 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); 171 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); 172 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", 173 ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 174 ZKUtil.createWithParents(zkw, "/hbase/replication/state"); 175 ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 176 177 ZKClusterId.setClusterId(zkw, new ClusterId()); 178 CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); 179 fs = FileSystem.get(conf); 180 oldLogDir = new Path(utility.getDataTestDir(), 181 HConstants.HREGION_OLDLOGDIR_NAME); 182 logDir = new Path(utility.getDataTestDir(), 183 HConstants.HREGION_LOGDIR_NAME); 184 replication = new Replication(); 185 replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); 186 managerOfCluster = getManagerFromCluster(); 187 if (managerOfCluster != null) { 188 // After replication procedure, we need to add peer by hand (other than by receiving 189 // notification from zk) 190 managerOfCluster.addPeer(slaveId); 191 } 192 193 manager = replication.getReplicationManager(); 194 manager.addSource(slaveId); 195 if (managerOfCluster != null) { 196 waitPeer(slaveId, managerOfCluster, true); 197 } 198 waitPeer(slaveId, manager, true); 199 200 htd = new HTableDescriptor(test); 201 HColumnDescriptor col = new HColumnDescriptor(f1); 202 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 203 htd.addFamily(col); 204 col = new HColumnDescriptor(f2); 205 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); 206 htd.addFamily(col); 207 208 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 209 for(byte[] fam : htd.getFamiliesKeys()) { 210 scopes.put(fam, 0); 211 } 212 hri = new HRegionInfo(htd.getTableName(), r1, r2); 213 } 214 215 private static ReplicationSourceManager getManagerFromCluster() { 216 // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. 217 if (utility.getMiniHBaseCluster() == null) { 218 return null; 219 } 220 return utility.getMiniHBaseCluster().getRegionServerThreads() 221 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 222 .findAny() 223 .map(HRegionServer::getReplicationSourceService) 224 .map(r -> (Replication)r) 225 .map(Replication::getReplicationManager) 226 .get(); 227 } 228 229 @AfterClass 230 public static void tearDownAfterClass() throws Exception { 231 if (manager != null) { 232 manager.join(); 233 } 234 utility.shutdownMiniCluster(); 235 } 236 237 @Rule 238 public TestName testName = new TestName(); 239 240 private void cleanLogDir() throws IOException { 241 fs.delete(logDir, true); 242 fs.delete(oldLogDir, true); 243 } 244 245 @Before 246 public void setUp() throws Exception { 247 LOG.info("Start " + testName.getMethodName()); 248 cleanLogDir(); 249 } 250 251 @After 252 public void tearDown() throws Exception { 253 LOG.info("End " + testName.getMethodName()); 254 cleanLogDir(); 255 List<String> ids = manager.getSources().stream() 256 .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList()); 257 for (String id : ids) { 258 if (slaveId.equals(id)) { 259 continue; 260 } 261 removePeerAndWait(id); 262 } 263 } 264 265 @Test 266 public void testLogRoll() throws Exception { 267 long baseline = 1000; 268 long time = baseline; 269 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 270 KeyValue kv = new KeyValue(r1, f1, r1); 271 WALEdit edit = new WALEdit(); 272 edit.add(kv); 273 274 WALFactory wals = 275 new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); 276 ReplicationSourceManager replicationManager = replication.getReplicationManager(); 277 wals.getWALProvider() 278 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 279 final WAL wal = wals.getWAL(hri); 280 manager.init(); 281 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); 282 htd.addFamily(new HColumnDescriptor(f1)); 283 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 284 for(byte[] fam : htd.getFamiliesKeys()) { 285 scopes.put(fam, 0); 286 } 287 // Testing normal log rolling every 20 288 for(long i = 1; i < 101; i++) { 289 if(i > 1 && i % 20 == 0) { 290 wal.rollWriter(); 291 } 292 LOG.info(Long.toString(i)); 293 final long txid = wal.appendData(hri, 294 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 295 edit); 296 wal.sync(txid); 297 } 298 299 // Simulate a rapid insert that's followed 300 // by a report that's still not totally complete (missing last one) 301 LOG.info(baseline + " and " + time); 302 baseline += 101; 303 time = baseline; 304 LOG.info(baseline + " and " + time); 305 306 for (int i = 0; i < 3; i++) { 307 wal.appendData(hri, 308 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 309 edit); 310 } 311 wal.sync(); 312 313 int logNumber = 0; 314 for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId) 315 .entrySet()) { 316 logNumber += entry.getValue().size(); 317 } 318 assertEquals(6, logNumber); 319 320 wal.rollWriter(); 321 322 manager.logPositionAndCleanOldLogs("1", false, 323 new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); 324 325 wal.appendData(hri, 326 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 327 edit); 328 wal.sync(); 329 330 assertEquals(1, manager.getWALs().size()); 331 332 333 // TODO Need a case with only 2 WALs and we only want to delete the first one 334 } 335 336 @Test 337 public void testClaimQueues() throws Exception { 338 Server server = new DummyServer("hostname0.example.org"); 339 ReplicationQueueStorage rq = ReplicationStorageFactory 340 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 341 // populate some znodes in the peer znode 342 files.add("log1"); 343 files.add("log2"); 344 for (String file : files) { 345 rq.addWAL(server.getServerName(), "1", file); 346 } 347 // create 3 DummyServers 348 Server s1 = new DummyServer("dummyserver1.example.org"); 349 Server s2 = new DummyServer("dummyserver2.example.org"); 350 Server s3 = new DummyServer("dummyserver3.example.org"); 351 352 // create 3 DummyNodeFailoverWorkers 353 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); 354 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); 355 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); 356 357 latch = new CountDownLatch(3); 358 // start the threads 359 w1.start(); 360 w2.start(); 361 w3.start(); 362 // make sure only one is successful 363 int populatedMap = 0; 364 // wait for result now... till all the workers are done. 365 latch.await(); 366 populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() 367 + w3.isLogZnodesMapPopulated(); 368 assertEquals(1, populatedMap); 369 server.abort("", null); 370 } 371 372 @Test 373 public void testCleanupFailoverQueues() throws Exception { 374 Server server = new DummyServer("hostname1.example.org"); 375 ReplicationQueueStorage rq = ReplicationStorageFactory 376 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 377 // populate some znodes in the peer znode 378 SortedSet<String> files = new TreeSet<>(); 379 String group = "testgroup"; 380 String file1 = group + ".log1"; 381 String file2 = group + ".log2"; 382 files.add(file1); 383 files.add(file2); 384 for (String file : files) { 385 rq.addWAL(server.getServerName(), "1", file); 386 } 387 Server s1 = new DummyServer("dummyserver1.example.org"); 388 ReplicationPeers rp1 = 389 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); 390 rp1.init(); 391 NodeFailoverWorker w1 = 392 manager.new NodeFailoverWorker(server.getServerName()); 393 w1.run(); 394 assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); 395 String id = "1-" + server.getServerName().getServerName(); 396 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); 397 manager.cleanOldLogs(file2, false, id, true); 398 // log1 should be deleted 399 assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); 400 } 401 402 @Test 403 public void testCleanupUnknownPeerZNode() throws Exception { 404 Server server = new DummyServer("hostname2.example.org"); 405 ReplicationQueueStorage rq = ReplicationStorageFactory 406 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 407 // populate some znodes in the peer znode 408 // add log to an unknown peer 409 String group = "testgroup"; 410 rq.addWAL(server.getServerName(), "2", group + ".log1"); 411 rq.addWAL(server.getServerName(), "2", group + ".log2"); 412 413 NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); 414 w1.run(); 415 416 // The log of the unknown peer should be removed from zk 417 for (String peer : manager.getAllQueues()) { 418 assertTrue(peer.startsWith("1")); 419 } 420 } 421 422 /** 423 * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the 424 * compaction WALEdit. 425 */ 426 @Test 427 public void testCompactionWALEdits() throws Exception { 428 TableName tableName = TableName.valueOf("testCompactionWALEdits"); 429 WALProtos.CompactionDescriptor compactionDescriptor = 430 WALProtos.CompactionDescriptor.getDefaultInstance(); 431 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) 432 .setEndKey(HConstants.EMPTY_END_ROW).build(); 433 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); 434 ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); 435 } 436 437 @Test 438 public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { 439 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 440 // 1. Get the bulk load wal edit event 441 WALEdit logEdit = getBulkLoadWALEdit(scope); 442 // 2. Create wal key 443 WALKeyImpl logKey = new WALKeyImpl(scope); 444 445 // 3. Get the scopes for the key 446 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); 447 448 // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled 449 assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", 450 logKey.getReplicationScopes()); 451 } 452 453 @Test 454 public void testBulkLoadWALEdits() throws Exception { 455 // 1. Get the bulk load wal edit event 456 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 457 WALEdit logEdit = getBulkLoadWALEdit(scope); 458 // 2. Create wal key 459 WALKeyImpl logKey = new WALKeyImpl(scope); 460 // 3. Enable bulk load hfile replication 461 Configuration bulkLoadConf = HBaseConfiguration.create(conf); 462 bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 463 464 // 4. Get the scopes for the key 465 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); 466 467 NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); 468 // Assert family with replication scope global is present in the key scopes 469 assertTrue("This family scope is set to global, should be part of replication key scopes.", 470 scopes.containsKey(f1)); 471 // Assert family with replication scope local is not present in the key scopes 472 assertFalse("This family scope is set to local, should not be part of replication key scopes", 473 scopes.containsKey(f2)); 474 } 475 476 /** 477 * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the 478 * corresponding ReplicationSourceInterface correctly cleans up the corresponding 479 * replication queue and ReplicationPeer. 480 * See HBASE-16096. 481 * @throws Exception 482 */ 483 @Test 484 public void testPeerRemovalCleanup() throws Exception{ 485 String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); 486 final String peerId = "FakePeer"; 487 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 488 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 489 try { 490 DummyServer server = new DummyServer(); 491 ReplicationQueueStorage rq = ReplicationStorageFactory 492 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 493 // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface 494 // initialization to throw an exception. 495 conf.set("replication.replicationsource.implementation", 496 FailInitializeDummyReplicationSource.class.getName()); 497 final ReplicationPeers rp = manager.getReplicationPeers(); 498 // Set up the znode and ReplicationPeer for the fake peer 499 // Don't wait for replication source to initialize, we know it won't. 500 addPeerAndWait(peerId, peerConfig, false); 501 502 // Sanity check 503 assertNull(manager.getSource(peerId)); 504 505 // Create a replication queue for the fake peer 506 rq.addWAL(server.getServerName(), peerId, "FakeFile"); 507 // Unregister peer, this should remove the peer and clear all queues associated with it 508 // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. 509 removePeerAndWait(peerId); 510 assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); 511 } finally { 512 conf.set("replication.replicationsource.implementation", replicationSourceImplName); 513 removePeerAndWait(peerId); 514 } 515 } 516 517 private static MetricsReplicationSourceSource getGlobalSource() throws Exception { 518 ReplicationSourceInterface source = manager.getSource(slaveId); 519 // Retrieve the global replication metrics source 520 Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); 521 f.setAccessible(true); 522 return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); 523 } 524 525 private static long getSizeOfLatestPath() { 526 // If no mini cluster is running, there are extra replication manager influencing the metrics. 527 if (utility.getMiniHBaseCluster() == null) { 528 return 0; 529 } 530 return utility.getMiniHBaseCluster().getRegionServerThreads() 531 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 532 .map(HRegionServer::getReplicationSourceService) 533 .map(r -> (Replication)r) 534 .map(Replication::getReplicationManager) 535 .mapToLong(ReplicationSourceManager::getSizeOfLatestPath) 536 .sum(); 537 } 538 539 @Test 540 public void testRemovePeerMetricsCleanup() throws Exception { 541 final String peerId = "DummyPeer"; 542 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 543 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 544 try { 545 MetricsReplicationSourceSource globalSource = getGlobalSource(); 546 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 547 final long sizeOfLatestPath = getSizeOfLatestPath(); 548 addPeerAndWait(peerId, peerConfig, true); 549 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 550 ReplicationSourceInterface source = manager.getSource(peerId); 551 // Sanity check 552 assertNotNull(source); 553 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 554 // Enqueue log and check if metrics updated 555 source.enqueueLog(new Path("abc")); 556 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 557 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 558 globalSource.getSizeOfLogQueue()); 559 560 // Removing the peer should reset the global metrics 561 removePeerAndWait(peerId); 562 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 563 564 // Adding the same peer back again should reset the single source metrics 565 addPeerAndWait(peerId, peerConfig, true); 566 source = manager.getSource(peerId); 567 assertNotNull(source); 568 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 569 globalSource.getSizeOfLogQueue()); 570 } finally { 571 removePeerAndWait(peerId); 572 } 573 } 574 575 /** 576 * Add a peer and wait for it to initialize 577 * @param peerId 578 * @param peerConfig 579 * @param waitForSource Whether to wait for replication source to initialize 580 * @throws Exception 581 */ 582 private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, 583 final boolean waitForSource) throws Exception { 584 final ReplicationPeers rp = manager.getReplicationPeers(); 585 rp.getPeerStorage().addPeer(peerId, peerConfig, true); 586 try { 587 manager.addPeer(peerId); 588 } catch (Exception e) { 589 // ignore the failed exception, because we'll test both success & failed case. 590 } 591 waitPeer(peerId, manager, waitForSource); 592 if (managerOfCluster != null) { 593 managerOfCluster.addPeer(peerId); 594 waitPeer(peerId, managerOfCluster, waitForSource); 595 } 596 } 597 598 private static void waitPeer(final String peerId, 599 ReplicationSourceManager manager, final boolean waitForSource) { 600 ReplicationPeers rp = manager.getReplicationPeers(); 601 Waiter.waitFor(conf, 20000, () -> { 602 if (waitForSource) { 603 ReplicationSourceInterface rs = manager.getSource(peerId); 604 if (rs == null) { 605 return false; 606 } 607 if (rs instanceof ReplicationSourceDummy) { 608 return ((ReplicationSourceDummy)rs).isStartup(); 609 } 610 return true; 611 } else { 612 return (rp.getPeer(peerId) != null); 613 } 614 }); 615 } 616 617 /** 618 * Remove a peer and wait for it to get cleaned up 619 * @param peerId 620 * @throws Exception 621 */ 622 private void removePeerAndWait(final String peerId) throws Exception { 623 final ReplicationPeers rp = manager.getReplicationPeers(); 624 if (rp.getPeerStorage().listPeerIds().contains(peerId)) { 625 rp.getPeerStorage().removePeer(peerId); 626 try { 627 manager.removePeer(peerId); 628 } catch (Exception e) { 629 // ignore the failed exception and continue. 630 } 631 } 632 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 633 @Override 634 public boolean evaluate() throws Exception { 635 Collection<String> peers = rp.getPeerStorage().listPeerIds(); 636 return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) 637 && (!peers.contains(peerId)) && manager.getSource(peerId) == null; 638 } 639 }); 640 } 641 642 private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { 643 // 1. Create store files for the families 644 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 645 Map<String, Long> storeFilesSize = new HashMap<>(1); 646 List<Path> p = new ArrayList<>(1); 647 Path hfilePath1 = new Path(Bytes.toString(f1)); 648 p.add(hfilePath1); 649 try { 650 storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); 651 } catch (IOException e) { 652 LOG.debug("Failed to calculate the size of hfile " + hfilePath1); 653 storeFilesSize.put(hfilePath1.getName(), 0L); 654 } 655 storeFiles.put(f1, p); 656 scope.put(f1, 1); 657 p = new ArrayList<>(1); 658 Path hfilePath2 = new Path(Bytes.toString(f2)); 659 p.add(hfilePath2); 660 try { 661 storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); 662 } catch (IOException e) { 663 LOG.debug("Failed to calculate the size of hfile " + hfilePath2); 664 storeFilesSize.put(hfilePath2.getName(), 0L); 665 } 666 storeFiles.put(f2, p); 667 // 2. Create bulk load descriptor 668 BulkLoadDescriptor desc = 669 ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), 670 UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); 671 672 // 3. create bulk load wal edit event 673 WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); 674 return logEdit; 675 } 676 677 static class DummyNodeFailoverWorker extends Thread { 678 private Map<String, Set<String>> logZnodesMap; 679 Server server; 680 private ServerName deadRS; 681 ReplicationQueueStorage rq; 682 683 public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { 684 this.deadRS = deadRS; 685 this.server = s; 686 this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 687 server.getConfiguration()); 688 } 689 690 @Override 691 public void run() { 692 try { 693 logZnodesMap = new HashMap<>(); 694 List<String> queues = rq.getAllQueues(deadRS); 695 for (String queue : queues) { 696 Pair<String, SortedSet<String>> pair = 697 rq.claimQueue(deadRS, queue, server.getServerName()); 698 if (pair != null) { 699 logZnodesMap.put(pair.getFirst(), pair.getSecond()); 700 } 701 } 702 server.abort("Done with testing", null); 703 } catch (Exception e) { 704 LOG.error("Got exception while running NodeFailoverWorker", e); 705 } finally { 706 latch.countDown(); 707 } 708 } 709 710 /** 711 * @return 1 when the map is not empty. 712 */ 713 private int isLogZnodesMapPopulated() { 714 Collection<Set<String>> sets = logZnodesMap.values(); 715 if (sets.size() > 1) { 716 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); 717 } 718 if (sets.size() == 1) { 719 Set<String> s = sets.iterator().next(); 720 for (String file : files) { 721 // at least one file was missing 722 if (!s.contains(file)) { 723 return 0; 724 } 725 } 726 return 1; // we found all the files 727 } 728 return 0; 729 } 730 } 731 732 static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { 733 734 @Override 735 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 736 ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, 737 UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 738 throws IOException { 739 throw new IOException("Failing deliberately"); 740 } 741 } 742 743 static class DummyServer implements Server { 744 String hostname; 745 746 DummyServer() { 747 hostname = "hostname.example.org"; 748 } 749 750 DummyServer(String hostname) { 751 this.hostname = hostname; 752 } 753 754 @Override 755 public Configuration getConfiguration() { 756 return conf; 757 } 758 759 @Override 760 public ZKWatcher getZooKeeper() { 761 return zkw; 762 } 763 764 @Override 765 public CoordinatedStateManager getCoordinatedStateManager() { 766 return null; 767 } 768 @Override 769 public ClusterConnection getConnection() { 770 return null; 771 } 772 773 @Override 774 public ServerName getServerName() { 775 return ServerName.valueOf(hostname, 1234, 1L); 776 } 777 778 @Override 779 public void abort(String why, Throwable e) { 780 // To change body of implemented methods use File | Settings | File Templates. 781 } 782 783 @Override 784 public boolean isAborted() { 785 return false; 786 } 787 788 @Override 789 public void stop(String why) { 790 // To change body of implemented methods use File | Settings | File Templates. 791 } 792 793 @Override 794 public boolean isStopped() { 795 return false; // To change body of implemented methods use File | Settings | File Templates. 796 } 797 798 @Override 799 public ChoreService getChoreService() { 800 return null; 801 } 802 803 @Override 804 public ClusterConnection getClusterConnection() { 805 // TODO Auto-generated method stub 806 return null; 807 } 808 809 @Override 810 public FileSystem getFileSystem() { 811 return null; 812 } 813 814 @Override 815 public boolean isStopping() { 816 return false; 817 } 818 819 @Override 820 public Connection createConnection(Configuration conf) throws IOException { 821 return null; 822 } 823 } 824}