001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.SortedSet; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.concurrent.CountDownLatch; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.ClusterId; 048import org.apache.hadoop.hbase.CoordinatedStateManager; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HColumnDescriptor; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.HRegionInfo; 055import org.apache.hadoop.hbase.HTableDescriptor; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Server; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.client.ClusterConnection; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.RegionInfo; 064import org.apache.hadoop.hbase.client.RegionInfoBuilder; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 067import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 068import org.apache.hadoop.hbase.replication.ReplicationFactory; 069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 070import org.apache.hadoop.hbase.replication.ReplicationPeers; 071import org.apache.hadoop.hbase.replication.ReplicationQueues; 072import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; 073import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; 074import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; 075import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; 076import org.apache.hadoop.hbase.testclassification.MediumTests; 077import org.apache.hadoop.hbase.testclassification.ReplicationTests; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.FSUtils; 080import org.apache.hadoop.hbase.util.JVMClusterUtil; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WAL; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALFactory; 085import org.apache.hadoop.hbase.wal.WALKeyImpl; 086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 087import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 088import org.apache.hadoop.hbase.zookeeper.ZKUtil; 089import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 090import org.junit.After; 091import org.junit.AfterClass; 092import org.junit.Before; 093import org.junit.ClassRule; 094import org.junit.Rule; 095import org.junit.Test; 096import org.junit.experimental.categories.Category; 097import org.junit.rules.TestName; 098import org.slf4j.Logger; 099import org.slf4j.LoggerFactory; 100 101import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 102import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 103 104import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 107 108/** 109 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should 110 * set up the proper config for this class and initialize the proper cluster using 111 * HBaseTestingUtility. 112 */ 113@Category({ReplicationTests.class, MediumTests.class}) 114public abstract class TestReplicationSourceManager { 115 116 @ClassRule 117 public static final HBaseClassTestRule CLASS_RULE = 118 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 119 120 protected static final Logger LOG = 121 LoggerFactory.getLogger(TestReplicationSourceManager.class); 122 123 protected static Configuration conf; 124 125 protected static HBaseTestingUtility utility; 126 127 protected static Replication replication; 128 129 protected static ReplicationSourceManager manager; 130 131 protected static ReplicationSourceManager managerOfCluster; 132 133 protected static ZKWatcher zkw; 134 135 protected static HTableDescriptor htd; 136 137 protected static HRegionInfo hri; 138 139 protected static final byte[] r1 = Bytes.toBytes("r1"); 140 141 protected static final byte[] r2 = Bytes.toBytes("r2"); 142 143 protected static final byte[] f1 = Bytes.toBytes("f1"); 144 145 protected static final byte[] f2 = Bytes.toBytes("f2"); 146 147 protected static final TableName test = 148 TableName.valueOf("test"); 149 150 protected static final String slaveId = "1"; 151 152 protected static FileSystem fs; 153 154 protected static Path oldLogDir; 155 156 protected static Path logDir; 157 158 protected static CountDownLatch latch; 159 160 protected static List<String> files = new ArrayList<>(); 161 protected static NavigableMap<byte[], Integer> scopes; 162 163 protected static void setupZkAndReplication() throws Exception { 164 // The implementing class should set up the conf 165 assertNotNull(conf); 166 zkw = new ZKWatcher(conf, "test", null); 167 ZKUtil.createWithParents(zkw, "/hbase/replication"); 168 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); 169 ZKUtil.setData(zkw, "/hbase/replication/peers/1", 170 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 171 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); 172 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); 173 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", 174 ReplicationStateZKBase.ENABLED_ZNODE_BYTES); 175 ZKUtil.createWithParents(zkw, "/hbase/replication/state"); 176 ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); 177 178 ZKClusterId.setClusterId(zkw, new ClusterId()); 179 FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); 180 fs = FileSystem.get(conf); 181 oldLogDir = new Path(utility.getDataTestDir(), 182 HConstants.HREGION_OLDLOGDIR_NAME); 183 logDir = new Path(utility.getDataTestDir(), 184 HConstants.HREGION_LOGDIR_NAME); 185 replication = new Replication(); 186 replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); 187 managerOfCluster = getManagerFromCluster(); 188 manager = replication.getReplicationManager(); 189 manager.addSource(slaveId); 190 if (managerOfCluster != null) { 191 waitPeer(slaveId, managerOfCluster, true); 192 } 193 waitPeer(slaveId, manager, true); 194 195 htd = new HTableDescriptor(test); 196 HColumnDescriptor col = new HColumnDescriptor(f1); 197 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 198 htd.addFamily(col); 199 col = new HColumnDescriptor(f2); 200 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); 201 htd.addFamily(col); 202 203 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 204 for(byte[] fam : htd.getFamiliesKeys()) { 205 scopes.put(fam, 0); 206 } 207 hri = new HRegionInfo(htd.getTableName(), r1, r2); 208 } 209 210 private static ReplicationSourceManager getManagerFromCluster() { 211 // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. 212 if (utility.getMiniHBaseCluster() == null) { 213 return null; 214 } 215 return utility.getMiniHBaseCluster().getRegionServerThreads() 216 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 217 .findAny() 218 .map(HRegionServer::getReplicationSourceService) 219 .map(r -> (Replication)r) 220 .map(Replication::getReplicationManager) 221 .get(); 222 } 223 224 @AfterClass 225 public static void tearDownAfterClass() throws Exception { 226 if (manager != null) { 227 manager.join(); 228 } 229 utility.shutdownMiniCluster(); 230 } 231 232 @Rule 233 public TestName testName = new TestName(); 234 235 private void cleanLogDir() throws IOException { 236 fs.delete(logDir, true); 237 fs.delete(oldLogDir, true); 238 } 239 240 @Before 241 public void setUp() throws Exception { 242 LOG.info("Start " + testName.getMethodName()); 243 cleanLogDir(); 244 } 245 246 @After 247 public void tearDown() throws Exception { 248 LOG.info("End " + testName.getMethodName()); 249 cleanLogDir(); 250 List<String> ids = manager.getSources().stream() 251 .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList()); 252 for (String id : ids) { 253 if (slaveId.equals(id)) { 254 continue; 255 } 256 removePeerAndWait(id); 257 } 258 } 259 260 @Test 261 public void testLogRoll() throws Exception { 262 long baseline = 1000; 263 long time = baseline; 264 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 265 KeyValue kv = new KeyValue(r1, f1, r1); 266 WALEdit edit = new WALEdit(); 267 edit.add(kv); 268 269 WALFactory wals = 270 new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); 271 ReplicationSourceManager replicationManager = replication.getReplicationManager(); 272 wals.getWALProvider() 273 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 274 final WAL wal = wals.getWAL(hri); 275 manager.init(); 276 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); 277 htd.addFamily(new HColumnDescriptor(f1)); 278 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 279 for(byte[] fam : htd.getFamiliesKeys()) { 280 scopes.put(fam, 0); 281 } 282 // Testing normal log rolling every 20 283 for(long i = 1; i < 101; i++) { 284 if(i > 1 && i % 20 == 0) { 285 wal.rollWriter(); 286 } 287 LOG.info(Long.toString(i)); 288 final long txid = wal.append( 289 hri, 290 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 291 edit, 292 true); 293 wal.sync(txid); 294 } 295 296 // Simulate a rapid insert that's followed 297 // by a report that's still not totally complete (missing last one) 298 LOG.info(baseline + " and " + time); 299 baseline += 101; 300 time = baseline; 301 LOG.info(baseline + " and " + time); 302 303 for (int i = 0; i < 3; i++) { 304 wal.append(hri, 305 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 306 edit, true); 307 } 308 wal.sync(); 309 310 int logNumber = 0; 311 for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) { 312 logNumber += entry.getValue().size(); 313 } 314 assertEquals(6, logNumber); 315 316 wal.rollWriter(); 317 318 manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), 319 "1", 0, false, false); 320 321 wal.append(hri, 322 new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), 323 edit, 324 true); 325 wal.sync(); 326 327 assertEquals(1, manager.getWALs().size()); 328 329 330 // TODO Need a case with only 2 WALs and we only want to delete the first one 331 } 332 333 @Test 334 public void testClaimQueues() throws Exception { 335 final Server server = new DummyServer("hostname0.example.org"); 336 337 338 ReplicationQueues rq = 339 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, 340 server.getZooKeeper())); 341 rq.init(server.getServerName().toString()); 342 // populate some znodes in the peer znode 343 files.add("log1"); 344 files.add("log2"); 345 for (String file : files) { 346 rq.addLog("1", file); 347 } 348 // create 3 DummyServers 349 Server s1 = new DummyServer("dummyserver1.example.org"); 350 Server s2 = new DummyServer("dummyserver2.example.org"); 351 Server s3 = new DummyServer("dummyserver3.example.org"); 352 353 // create 3 DummyNodeFailoverWorkers 354 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker( 355 server.getServerName().getServerName(), s1); 356 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker( 357 server.getServerName().getServerName(), s2); 358 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker( 359 server.getServerName().getServerName(), s3); 360 361 latch = new CountDownLatch(3); 362 // start the threads 363 w1.start(); 364 w2.start(); 365 w3.start(); 366 // make sure only one is successful 367 int populatedMap = 0; 368 // wait for result now... till all the workers are done. 369 latch.await(); 370 populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() 371 + w3.isLogZnodesMapPopulated(); 372 assertEquals(1, populatedMap); 373 server.abort("", null); 374 } 375 376 @Test 377 public void testCleanupFailoverQueues() throws Exception { 378 final Server server = new DummyServer("hostname1.example.org"); 379 ReplicationQueues rq = 380 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, 381 server.getZooKeeper())); 382 rq.init(server.getServerName().toString()); 383 // populate some znodes in the peer znode 384 SortedSet<String> files = new TreeSet<>(); 385 String group = "testgroup"; 386 String file1 = group + ".log1"; 387 String file2 = group + ".log2"; 388 files.add(file1); 389 files.add(file2); 390 for (String file : files) { 391 rq.addLog("1", file); 392 } 393 Server s1 = new DummyServer("dummyserver1.example.org"); 394 ReplicationQueues rq1 = 395 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, 396 s1.getZooKeeper())); 397 rq1.init(s1.getServerName().toString()); 398 ReplicationPeers rp1 = 399 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); 400 rp1.init(); 401 NodeFailoverWorker w1 = 402 manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( 403 new Long(1), new Long(2))); 404 w1.run(); 405 assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); 406 String id = "1-" + server.getServerName().getServerName(); 407 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); 408 manager.cleanOldLogs(file2, id, true); 409 // log1 should be deleted 410 assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); 411 } 412 413 @Test 414 public void testCleanupUnknownPeerZNode() throws Exception { 415 final Server server = new DummyServer("hostname2.example.org"); 416 ReplicationQueues rq = ReplicationFactory.getReplicationQueues( 417 new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); 418 rq.init(server.getServerName().toString()); 419 // populate some znodes in the peer znode 420 // add log to an unknown peer 421 String group = "testgroup"; 422 rq.addLog("2", group + ".log1"); 423 rq.addLog("2", group + ".log2"); 424 425 NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); 426 w1.run(); 427 428 // The log of the unknown peer should be removed from zk 429 for (String peer : manager.getAllQueues()) { 430 assertTrue(peer.startsWith("1")); 431 } 432 } 433 434 /** 435 * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the 436 * compaction WALEdit. 437 */ 438 @Test 439 public void testCompactionWALEdits() throws Exception { 440 TableName tableName = TableName.valueOf("testCompactionWALEdits"); 441 WALProtos.CompactionDescriptor compactionDescriptor = 442 WALProtos.CompactionDescriptor.getDefaultInstance(); 443 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) 444 .setEndKey(HConstants.EMPTY_END_ROW).build(); 445 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); 446 ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); 447 } 448 449 @Test 450 public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { 451 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 452 // 1. Get the bulk load wal edit event 453 WALEdit logEdit = getBulkLoadWALEdit(scope); 454 // 2. Create wal key 455 WALKeyImpl logKey = new WALKeyImpl(scope); 456 457 // 3. Get the scopes for the key 458 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); 459 460 // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled 461 assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", 462 logKey.getReplicationScopes()); 463 } 464 465 @Test 466 public void testBulkLoadWALEdits() throws Exception { 467 // 1. Get the bulk load wal edit event 468 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 469 WALEdit logEdit = getBulkLoadWALEdit(scope); 470 // 2. Create wal key 471 WALKeyImpl logKey = new WALKeyImpl(scope); 472 // 3. Enable bulk load hfile replication 473 Configuration bulkLoadConf = HBaseConfiguration.create(conf); 474 bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 475 476 // 4. Get the scopes for the key 477 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); 478 479 NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); 480 // Assert family with replication scope global is present in the key scopes 481 assertTrue("This family scope is set to global, should be part of replication key scopes.", 482 scopes.containsKey(f1)); 483 // Assert family with replication scope local is not present in the key scopes 484 assertFalse("This family scope is set to local, should not be part of replication key scopes", 485 scopes.containsKey(f2)); 486 } 487 488 /** 489 * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the 490 * corresponding ReplicationSourceInterface correctly cleans up the corresponding 491 * replication queue and ReplicationPeer. 492 * See HBASE-16096. 493 * @throws Exception 494 */ 495 @Test 496 public void testPeerRemovalCleanup() throws Exception{ 497 String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); 498 final String peerId = "FakePeer"; 499 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 500 .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); 501 try { 502 DummyServer server = new DummyServer(); 503 final ReplicationQueues rq = 504 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( 505 server.getConfiguration(), server, server.getZooKeeper())); 506 rq.init(server.getServerName().toString()); 507 // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface 508 // initialization to throw an exception. 509 conf.set("replication.replicationsource.implementation", 510 FailInitializeDummyReplicationSource.class.getName()); 511 final ReplicationPeers rp = manager.getReplicationPeers(); 512 // Set up the znode and ReplicationPeer for the fake peer 513 // Don't wait for replication source to initialize, we know it won't. 514 addPeerAndWait(peerId, peerConfig, false); 515 516 // Sanity check 517 assertNull(manager.getSource(peerId)); 518 519 // Create a replication queue for the fake peer 520 rq.addLog(peerId, "FakeFile"); 521 // Unregister peer, this should remove the peer and clear all queues associated with it 522 // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. 523 removePeerAndWait(peerId); 524 assertFalse(rq.getAllQueues().contains(peerId)); 525 } finally { 526 conf.set("replication.replicationsource.implementation", replicationSourceImplName); 527 removePeerAndWait(peerId); 528 } 529 } 530 531 private static MetricsReplicationSourceSource getGlobalSource() throws Exception { 532 ReplicationSourceInterface source = manager.getSource(slaveId); 533 // Retrieve the global replication metrics source 534 Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); 535 f.setAccessible(true); 536 return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); 537 } 538 539 private static long getSizeOfLatestPath() { 540 // If no mini cluster is running, there are extra replication manager influencing the metrics. 541 if (utility.getMiniHBaseCluster() == null) { 542 return 0; 543 } 544 return utility.getMiniHBaseCluster().getRegionServerThreads() 545 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 546 .map(HRegionServer::getReplicationSourceService) 547 .map(r -> (Replication)r) 548 .map(Replication::getReplicationManager) 549 .mapToLong(ReplicationSourceManager::getSizeOfLatestPath) 550 .sum(); 551 } 552 553 @Test 554 public void testRemovePeerMetricsCleanup() throws Exception { 555 final String peerId = "DummyPeer"; 556 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 557 .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); 558 try { 559 MetricsReplicationSourceSource globalSource = getGlobalSource(); 560 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 561 final long sizeOfLatestPath = getSizeOfLatestPath(); 562 addPeerAndWait(peerId, peerConfig, true); 563 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, 564 globalSource.getSizeOfLogQueue()); 565 ReplicationSourceInterface source = manager.getSource(peerId); 566 // Sanity check 567 assertNotNull(source); 568 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 569 // Enqueue log and check if metrics updated 570 source.enqueueLog(new Path("abc")); 571 assertEquals(1 + sizeOfSingleLogQueue, 572 source.getSourceMetrics().getSizeOfLogQueue()); 573 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() 574 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 575 576 // Removing the peer should reset the global metrics 577 removePeerAndWait(peerId); 578 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 579 580 // Adding the same peer back again should reset the single source metrics 581 addPeerAndWait(peerId, peerConfig, true); 582 source = manager.getSource(peerId); 583 assertNotNull(source); 584 assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue()); 585 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() 586 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 587 } finally { 588 removePeerAndWait(peerId); 589 } 590 } 591 592 /** 593 * Add a peer and wait for it to initialize 594 * @param peerId 595 * @param peerConfig 596 * @param waitForSource Whether to wait for replication source to initialize 597 * @throws Exception 598 */ 599 private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, 600 final boolean waitForSource) throws Exception { 601 final ReplicationPeers rp = manager.getReplicationPeers(); 602 rp.registerPeer(peerId, peerConfig); 603 waitPeer(peerId, manager, waitForSource); 604 if (managerOfCluster != null) { 605 waitPeer(peerId, managerOfCluster, waitForSource); 606 } 607 } 608 609 private static void waitPeer(final String peerId, 610 ReplicationSourceManager manager, final boolean waitForSource) { 611 ReplicationPeers rp = manager.getReplicationPeers(); 612 Waiter.waitFor(conf, 20000, () -> { 613 if (waitForSource) { 614 ReplicationSourceInterface rs = manager.getSource(peerId); 615 if (rs == null) { 616 return false; 617 } 618 if (rs instanceof ReplicationSourceDummy) { 619 return ((ReplicationSourceDummy)rs).isStartup(); 620 } 621 return true; 622 } else { 623 return (rp.getConnectedPeer(peerId) != null); 624 } 625 }); 626 } 627 628 /** 629 * Remove a peer and wait for it to get cleaned up 630 * @param peerId 631 * @throws Exception 632 */ 633 private void removePeerAndWait(final String peerId) throws Exception { 634 final ReplicationPeers rp = manager.getReplicationPeers(); 635 if (rp.getAllPeerIds().contains(peerId)) { 636 rp.unregisterPeer(peerId); 637 } 638 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 639 @Override public boolean evaluate() throws Exception { 640 List<String> peers = rp.getAllPeerIds(); 641 return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null) 642 && (!peers.contains(peerId)) 643 && manager.getSource(peerId) == null; 644 } 645 }); 646 } 647 648 private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { 649 // 1. Create store files for the families 650 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 651 Map<String, Long> storeFilesSize = new HashMap<>(1); 652 List<Path> p = new ArrayList<>(1); 653 Path hfilePath1 = new Path(Bytes.toString(f1)); 654 p.add(hfilePath1); 655 try { 656 storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); 657 } catch (IOException e) { 658 LOG.debug("Failed to calculate the size of hfile " + hfilePath1); 659 storeFilesSize.put(hfilePath1.getName(), 0L); 660 } 661 storeFiles.put(f1, p); 662 scope.put(f1, 1); 663 p = new ArrayList<>(1); 664 Path hfilePath2 = new Path(Bytes.toString(f2)); 665 p.add(hfilePath2); 666 try { 667 storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); 668 } catch (IOException e) { 669 LOG.debug("Failed to calculate the size of hfile " + hfilePath2); 670 storeFilesSize.put(hfilePath2.getName(), 0L); 671 } 672 storeFiles.put(f2, p); 673 // 2. Create bulk load descriptor 674 BulkLoadDescriptor desc = 675 ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), 676 UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); 677 678 // 3. create bulk load wal edit event 679 WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); 680 return logEdit; 681 } 682 683 static class DummyNodeFailoverWorker extends Thread { 684 private Map<String, Set<String>> logZnodesMap; 685 Server server; 686 private String deadRsZnode; 687 ReplicationQueues rq; 688 689 public DummyNodeFailoverWorker(String znode, Server s) throws Exception { 690 this.deadRsZnode = znode; 691 this.server = s; 692 this.rq = 693 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, 694 server.getZooKeeper())); 695 this.rq.init(this.server.getServerName().toString()); 696 } 697 698 @Override 699 public void run() { 700 try { 701 logZnodesMap = new HashMap<>(); 702 List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode); 703 for(String queue:queues){ 704 Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue); 705 if (pair != null) { 706 logZnodesMap.put(pair.getFirst(), pair.getSecond()); 707 } 708 } 709 server.abort("Done with testing", null); 710 } catch (Exception e) { 711 LOG.error("Got exception while running NodeFailoverWorker", e); 712 } finally { 713 latch.countDown(); 714 } 715 } 716 717 /** 718 * @return 1 when the map is not empty. 719 */ 720 private int isLogZnodesMapPopulated() { 721 Collection<Set<String>> sets = logZnodesMap.values(); 722 if (sets.size() > 1) { 723 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); 724 } 725 if (sets.size() == 1) { 726 Set<String> s = sets.iterator().next(); 727 for (String file : files) { 728 // at least one file was missing 729 if (!s.contains(file)) { 730 return 0; 731 } 732 } 733 return 1; // we found all the files 734 } 735 return 0; 736 } 737 } 738 739 static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { 740 741 @Override 742 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 743 ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, 744 UUID clusterId, ReplicationEndpoint replicationEndpoint, 745 WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { 746 throw new IOException("Failing deliberately"); 747 } 748 } 749 750 static class DummyServer implements Server { 751 String hostname; 752 753 DummyServer() { 754 hostname = "hostname.example.org"; 755 } 756 757 DummyServer(String hostname) { 758 this.hostname = hostname; 759 } 760 761 @Override 762 public Configuration getConfiguration() { 763 return conf; 764 } 765 766 @Override 767 public ZKWatcher getZooKeeper() { 768 return zkw; 769 } 770 771 @Override 772 public CoordinatedStateManager getCoordinatedStateManager() { 773 return null; 774 } 775 @Override 776 public ClusterConnection getConnection() { 777 return null; 778 } 779 780 @Override 781 public MetaTableLocator getMetaTableLocator() { 782 return null; 783 } 784 785 @Override 786 public ServerName getServerName() { 787 return ServerName.valueOf(hostname, 1234, 1L); 788 } 789 790 @Override 791 public void abort(String why, Throwable e) { 792 // To change body of implemented methods use File | Settings | File Templates. 793 } 794 795 @Override 796 public boolean isAborted() { 797 return false; 798 } 799 800 @Override 801 public void stop(String why) { 802 // To change body of implemented methods use File | Settings | File Templates. 803 } 804 805 @Override 806 public boolean isStopped() { 807 return false; // To change body of implemented methods use File | Settings | File Templates. 808 } 809 810 @Override 811 public ChoreService getChoreService() { 812 return null; 813 } 814 815 @Override 816 public ClusterConnection getClusterConnection() { 817 // TODO Auto-generated method stub 818 return null; 819 } 820 821 @Override 822 public FileSystem getFileSystem() { 823 return null; 824 } 825 826 @Override 827 public boolean isStopping() { 828 return false; 829 } 830 831 @Override 832 public Connection createConnection(Configuration conf) throws IOException { 833 return null; 834 } 835 } 836}