001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.fail; 024 025import java.io.Closeable; 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Optional; 030import java.util.Random; 031import java.util.concurrent.CountDownLatch; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.MiniHBaseCluster; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Delete; 050import org.apache.hadoop.hbase.client.Durability; 051import org.apache.hadoop.hbase.client.Get; 052import org.apache.hadoop.hbase.client.Put; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 057import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 059import org.apache.hadoop.hbase.coprocessor.ObserverContext; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 062import org.apache.hadoop.hbase.coprocessor.RegionObserver; 063import org.apache.hadoop.hbase.regionserver.HRegion; 064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 065import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.ReplicationTests; 068import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.HFileTestUtil; 071import org.apache.hadoop.hbase.wal.WALEdit; 072import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 073import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 074import org.junit.After; 075import org.junit.Assert; 076import org.junit.Before; 077import org.junit.ClassRule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083@Category({ReplicationTests.class, LargeTests.class}) 084public class TestMasterReplication { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestMasterReplication.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class); 091 092 private Configuration baseConfiguration; 093 094 private HBaseTestingUtility[] utilities; 095 private Configuration[] configurations; 096 private MiniZooKeeperCluster miniZK; 097 098 private static final long SLEEP_TIME = 1000; 099 private static final int NB_RETRIES = 120; 100 101 private static final TableName tableName = TableName.valueOf("test"); 102 private static final byte[] famName = Bytes.toBytes("f"); 103 private static final byte[] famName1 = Bytes.toBytes("f1"); 104 private static final byte[] row = Bytes.toBytes("row"); 105 private static final byte[] row1 = Bytes.toBytes("row1"); 106 private static final byte[] row2 = Bytes.toBytes("row2"); 107 private static final byte[] row3 = Bytes.toBytes("row3"); 108 private static final byte[] row4 = Bytes.toBytes("row4"); 109 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 110 111 private static final byte[] count = Bytes.toBytes("count"); 112 private static final byte[] put = Bytes.toBytes("put"); 113 private static final byte[] delete = Bytes.toBytes("delete"); 114 115 private TableDescriptor table; 116 117 @Before 118 public void setUp() throws Exception { 119 baseConfiguration = HBaseConfiguration.create(); 120 // smaller block size and capacity to trigger more operations 121 // and test them 122 baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 123 baseConfiguration.setInt("replication.source.size.capacity", 1024); 124 baseConfiguration.setLong("replication.source.sleepforretries", 100); 125 baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); 126 baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); 127 baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 128 baseConfiguration.set("hbase.replication.source.fs.conf.provider", 129 TestSourceFSConfigurationProvider.class.getCanonicalName()); 130 baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 131 baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 132 baseConfiguration.setStrings( 133 CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 134 CoprocessorCounter.class.getName()); 135 table = TableDescriptorBuilder.newBuilder(tableName) 136 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 137 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 138 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1) 139 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 140 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 141 } 142 143 /** 144 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by 145 * adding and deleting a row to a table in each cluster, checking if it's 146 * replicated. It also tests that the puts and deletes are not replicated back 147 * to the originating cluster. 148 */ 149 @Test 150 public void testCyclicReplication1() throws Exception { 151 LOG.info("testSimplePutDelete"); 152 int numClusters = 2; 153 Table[] htables = null; 154 try { 155 htables = setUpClusterTablesAndPeers(numClusters); 156 157 int[] expectedCounts = new int[] { 2, 2 }; 158 159 // add rows to both clusters, 160 // make sure they are both replication 161 putAndWait(row, famName, htables[0], htables[1]); 162 putAndWait(row1, famName, htables[1], htables[0]); 163 validateCounts(htables, put, expectedCounts); 164 165 deleteAndWait(row, htables[0], htables[1]); 166 deleteAndWait(row1, htables[1], htables[0]); 167 validateCounts(htables, delete, expectedCounts); 168 } finally { 169 close(htables); 170 shutDownMiniClusters(); 171 } 172 } 173 174 /** 175 * Tests the replication scenario 0 -> 0. By default 176 * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint}, 177 * the replication peer should not be added. 178 */ 179 @Test(expected = DoNotRetryIOException.class) 180 public void testLoopedReplication() 181 throws Exception { 182 LOG.info("testLoopedReplication"); 183 startMiniClusters(1); 184 createTableOnClusters(table); 185 addPeer("1", 0, 0); 186 } 187 188 /** 189 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of 190 * HFiles to a table in each cluster, checking if it's replicated. 191 */ 192 @Test 193 public void testHFileCyclicReplication() throws Exception { 194 LOG.info("testHFileCyclicReplication"); 195 int numClusters = 2; 196 Table[] htables = null; 197 try { 198 htables = setUpClusterTablesAndPeers(numClusters); 199 200 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 201 // to cluster '1'. 202 byte[][][] hfileRanges = 203 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 204 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 205 int numOfRows = 100; 206 int[] expectedCounts = 207 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 208 209 loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row, 210 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 211 212 // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated 213 // to cluster '0'. 214 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 215 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 216 numOfRows = 200; 217 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 218 hfileRanges.length * numOfRows + expectedCounts[1] }; 219 220 loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row, 221 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 222 223 } finally { 224 close(htables); 225 shutDownMiniClusters(); 226 } 227 } 228 229 private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception { 230 Table[] htables; 231 startMiniClusters(numClusters); 232 createTableOnClusters(table); 233 234 htables = getHTablesOnClusters(tableName); 235 // Test the replication scenarios of 0 -> 1 -> 0 236 addPeer("1", 0, 1); 237 addPeer("1", 1, 0); 238 return htables; 239 } 240 241 /** 242 * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a 243 * table in each clusters and ensuring that the each of these clusters get the appropriate 244 * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits 245 * originating from itself and also the edits that it received using replication from a different 246 * cluster. The scenario is explained in HBASE-9158 247 */ 248 @Test 249 public void testCyclicReplication2() throws Exception { 250 LOG.info("testCyclicReplication2"); 251 int numClusters = 3; 252 Table[] htables = null; 253 try { 254 startMiniClusters(numClusters); 255 createTableOnClusters(table); 256 257 // Test the replication scenario of 0 -> 1 -> 2 -> 0 258 addPeer("1", 0, 1); 259 addPeer("1", 1, 2); 260 addPeer("1", 2, 0); 261 262 htables = getHTablesOnClusters(tableName); 263 264 // put "row" and wait 'til it got around 265 putAndWait(row, famName, htables[0], htables[2]); 266 putAndWait(row1, famName, htables[1], htables[0]); 267 putAndWait(row2, famName, htables[2], htables[1]); 268 269 deleteAndWait(row, htables[0], htables[2]); 270 deleteAndWait(row1, htables[1], htables[0]); 271 deleteAndWait(row2, htables[2], htables[1]); 272 273 int[] expectedCounts = new int[] { 3, 3, 3 }; 274 validateCounts(htables, put, expectedCounts); 275 validateCounts(htables, delete, expectedCounts); 276 277 // Test HBASE-9158 278 disablePeer("1", 2); 279 // we now have an edit that was replicated into cluster originating from 280 // cluster 0 281 putAndWait(row3, famName, htables[0], htables[1]); 282 // now add a local edit to cluster 1 283 htables[1].put(new Put(row4).addColumn(famName, row4, row4)); 284 // re-enable replication from cluster 2 to cluster 0 285 enablePeer("1", 2); 286 // without HBASE-9158 the edit for row4 would have been marked with 287 // cluster 0's id 288 // and hence not replicated to cluster 0 289 wait(row4, htables[0], false); 290 } finally { 291 close(htables); 292 shutDownMiniClusters(); 293 } 294 } 295 296 /** 297 * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk 298 * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers. 299 */ 300 @Test 301 public void testHFileMultiSlaveReplication() throws Exception { 302 LOG.info("testHFileMultiSlaveReplication"); 303 int numClusters = 3; 304 Table[] htables = null; 305 try { 306 startMiniClusters(numClusters); 307 createTableOnClusters(table); 308 309 // Add a slave, 0 -> 1 310 addPeer("1", 0, 1); 311 312 htables = getHTablesOnClusters(tableName); 313 314 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 315 // to cluster '1'. 316 byte[][][] hfileRanges = 317 new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") }, 318 new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, }; 319 int numOfRows = 100; 320 321 int[] expectedCounts = 322 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 323 324 loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row, 325 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 326 327 // Validate data is not replicated to cluster '2'. 328 assertEquals(0, utilities[2].countRows(htables[2])); 329 330 rollWALAndWait(utilities[0], htables[0].getName(), row); 331 332 // Add one more slave, 0 -> 2 333 addPeer("2", 0, 2); 334 335 // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated 336 // to cluster '1' and '2'. Previous data should be replicated to cluster '2'. 337 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") }, 338 new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, }; 339 numOfRows = 200; 340 341 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 342 hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows }; 343 344 loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row, 345 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 346 347 } finally { 348 close(htables); 349 shutDownMiniClusters(); 350 } 351 } 352 353 /** 354 * It tests the bulk loaded hfile replication scenario to only explicitly specified table column 355 * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set 356 * only one CF data to replicate. 357 */ 358 @Test 359 public void testHFileReplicationForConfiguredTableCfs() throws Exception { 360 LOG.info("testHFileReplicationForConfiguredTableCfs"); 361 int numClusters = 2; 362 Table[] htables = null; 363 try { 364 startMiniClusters(numClusters); 365 createTableOnClusters(table); 366 367 htables = getHTablesOnClusters(tableName); 368 // Test the replication scenarios only 'f' is configured for table data replication not 'f1' 369 addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName)); 370 371 // Load 100 rows for each hfile range in cluster '0' for table CF 'f' 372 byte[][][] hfileRanges = 373 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 374 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 375 int numOfRows = 100; 376 int[] expectedCounts = 377 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 378 379 loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables, 380 hfileRanges, numOfRows, expectedCounts, true); 381 382 // Load 100 rows for each hfile range in cluster '0' for table CF 'f1' 383 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 384 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 385 numOfRows = 100; 386 387 int[] newExpectedCounts = 388 new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] }; 389 390 loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables, 391 hfileRanges, numOfRows, newExpectedCounts, false); 392 393 // Validate data replication for CF 'f1' 394 395 // Source cluster table should contain data for the families 396 wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]); 397 398 // Sleep for enough time so that the data is still not replicated for the CF which is not 399 // configured for replication 400 Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME); 401 // Peer cluster should have only configured CF data 402 wait(1, htables[1], expectedCounts[1]); 403 } finally { 404 close(htables); 405 shutDownMiniClusters(); 406 } 407 } 408 409 /** 410 * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. 411 */ 412 @Test 413 public void testCyclicReplication3() throws Exception { 414 LOG.info("testCyclicReplication2"); 415 int numClusters = 3; 416 Table[] htables = null; 417 try { 418 startMiniClusters(numClusters); 419 createTableOnClusters(table); 420 421 // Test the replication scenario of 0 -> 1 -> 2 -> 1 422 addPeer("1", 0, 1); 423 addPeer("1", 1, 2); 424 addPeer("1", 2, 1); 425 426 htables = getHTablesOnClusters(tableName); 427 428 // put "row" and wait 'til it got around 429 putAndWait(row, famName, htables[0], htables[2]); 430 putAndWait(row1, famName, htables[1], htables[2]); 431 putAndWait(row2, famName, htables[2], htables[1]); 432 433 deleteAndWait(row, htables[0], htables[2]); 434 deleteAndWait(row1, htables[1], htables[2]); 435 deleteAndWait(row2, htables[2], htables[1]); 436 437 int[] expectedCounts = new int[] { 1, 3, 3 }; 438 validateCounts(htables, put, expectedCounts); 439 validateCounts(htables, delete, expectedCounts); 440 } finally { 441 close(htables); 442 shutDownMiniClusters(); 443 } 444 } 445 446 /** 447 * Tests that base replication peer configs are applied on peer creation 448 * and the configs are overriden if updated as part of updateReplicationPeerConfig() 449 * 450 */ 451 @Test 452 public void testBasePeerConfigsForPeerMutations() 453 throws Exception { 454 LOG.info("testBasePeerConfigsForPeerMutations"); 455 String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; 456 String firstCustomPeerConfigValue = "test"; 457 String firstCustomPeerConfigUpdatedValue = "test_updated"; 458 459 String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config"; 460 String secondCustomPeerConfigValue = "testSecond"; 461 String secondCustomPeerConfigUpdatedValue = "testSecondUpdated"; 462 try { 463 baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 464 firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue)); 465 startMiniClusters(2); 466 addPeer("1", 0, 1); 467 addPeer("2", 0, 1); 468 Admin admin = utilities[0].getAdmin(); 469 470 // Validates base configs 1 is present for both peer. 471 Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1"). 472 getConfiguration().get(firstCustomPeerConfigKey)); 473 Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2"). 474 getConfiguration().get(firstCustomPeerConfigKey)); 475 476 // override value of configuration 1 for peer "1". 477 ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig. 478 newBuilder(admin.getReplicationPeerConfig("1")). 479 putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build(); 480 481 // add configuration 2 for peer "2". 482 ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig. 483 newBuilder(admin.getReplicationPeerConfig("2")). 484 putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build(); 485 486 admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1); 487 admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2); 488 489 // validates configuration is overridden by updateReplicationPeerConfig 490 Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1"). 491 getConfiguration().get(firstCustomPeerConfigKey)); 492 Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2"). 493 getConfiguration().get(secondCustomPeerConfigKey)); 494 495 // Add second config to base config and perform restart. 496 utilities[0].getConfiguration().set(ReplicationPeerConfigUtil. 497 HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("="). 498 concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey) 499 .concat("=").concat(secondCustomPeerConfigValue)); 500 501 utilities[0].shutdownMiniHBaseCluster(); 502 utilities[0].restartHBaseCluster(1); 503 admin = utilities[0].getAdmin(); 504 505 // Both retains the value of base configuration 1 value as before restart. 506 // Peer 1 (Update value), Peer 2 (Base Value) 507 Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1"). 508 getConfiguration().get(firstCustomPeerConfigKey)); 509 Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2"). 510 getConfiguration().get(firstCustomPeerConfigKey)); 511 512 // Peer 1 gets new base config as part of restart. 513 Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1"). 514 getConfiguration().get(secondCustomPeerConfigKey)); 515 // Peer 2 retains the updated value as before restart. 516 Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2"). 517 getConfiguration().get(secondCustomPeerConfigKey)); 518 } finally { 519 shutDownMiniClusters(); 520 baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 521 } 522 } 523 524 @After 525 public void tearDown() throws IOException { 526 configurations = null; 527 utilities = null; 528 } 529 530 @SuppressWarnings("resource") 531 private void startMiniClusters(int numClusters) throws Exception { 532 Random random = new Random(); 533 utilities = new HBaseTestingUtility[numClusters]; 534 configurations = new Configuration[numClusters]; 535 for (int i = 0; i < numClusters; i++) { 536 Configuration conf = new Configuration(baseConfiguration); 537 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt()); 538 HBaseTestingUtility utility = new HBaseTestingUtility(conf); 539 if (i == 0) { 540 utility.startMiniZKCluster(); 541 miniZK = utility.getZkCluster(); 542 } else { 543 utility.setZkCluster(miniZK); 544 } 545 utility.startMiniCluster(); 546 utilities[i] = utility; 547 configurations[i] = conf; 548 new ZKWatcher(conf, "cluster" + i, null, true); 549 } 550 } 551 552 private void shutDownMiniClusters() throws Exception { 553 int numClusters = utilities.length; 554 for (int i = numClusters - 1; i >= 0; i--) { 555 if (utilities[i] != null) { 556 utilities[i].shutdownMiniCluster(); 557 } 558 } 559 miniZK.shutdown(); 560 } 561 562 private void createTableOnClusters(TableDescriptor table) throws Exception { 563 for (HBaseTestingUtility utility : utilities) { 564 utility.getAdmin().createTable(table); 565 } 566 } 567 568 private void addPeer(String id, int masterClusterNumber, 569 int slaveClusterNumber) throws Exception { 570 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 571 Admin admin = conn.getAdmin()) { 572 admin.addReplicationPeer(id, 573 new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())); 574 } 575 } 576 577 private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) 578 throws Exception { 579 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 580 Admin admin = conn.getAdmin()) { 581 admin.addReplicationPeer( 582 id, 583 new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) 584 .setReplicateAllUserTables(false) 585 .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs))); 586 } 587 } 588 589 private void disablePeer(String id, int masterClusterNumber) throws Exception { 590 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 591 Admin admin = conn.getAdmin()) { 592 admin.disableReplicationPeer(id); 593 } 594 } 595 596 private void enablePeer(String id, int masterClusterNumber) throws Exception { 597 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 598 Admin admin = conn.getAdmin()) { 599 admin.enableReplicationPeer(id); 600 } 601 } 602 603 private void close(Closeable... closeables) { 604 try { 605 if (closeables != null) { 606 for (Closeable closeable : closeables) { 607 closeable.close(); 608 } 609 } 610 } catch (Exception e) { 611 LOG.warn("Exception occurred while closing the object:", e); 612 } 613 } 614 615 @SuppressWarnings("resource") 616 private Table[] getHTablesOnClusters(TableName tableName) throws Exception { 617 int numClusters = utilities.length; 618 Table[] htables = new Table[numClusters]; 619 for (int i = 0; i < numClusters; i++) { 620 Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName); 621 htables[i] = htable; 622 } 623 return htables; 624 } 625 626 private void validateCounts(Table[] htables, byte[] type, 627 int[] expectedCounts) throws IOException { 628 for (int i = 0; i < htables.length; i++) { 629 assertEquals(Bytes.toString(type) + " were replicated back ", 630 expectedCounts[i], getCount(htables[i], type)); 631 } 632 } 633 634 private int getCount(Table t, byte[] type) throws IOException { 635 Get test = new Get(row); 636 test.setAttribute("count", new byte[] {}); 637 Result res = t.get(test); 638 return Bytes.toInt(res.getValue(count, type)); 639 } 640 641 private void deleteAndWait(byte[] row, Table source, Table target) 642 throws Exception { 643 Delete del = new Delete(row); 644 source.delete(del); 645 wait(row, target, true); 646 } 647 648 private void putAndWait(byte[] row, byte[] fam, Table source, Table target) 649 throws Exception { 650 Put put = new Put(row); 651 put.addColumn(fam, row, row); 652 source.put(put); 653 wait(row, target, false); 654 } 655 656 private void loadAndValidateHFileReplication(String testName, int masterNumber, 657 int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges, 658 int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception { 659 HBaseTestingUtility util = utilities[masterNumber]; 660 661 Path dir = util.getDataTestDirOnTestFS(testName); 662 FileSystem fs = util.getTestFileSystem(); 663 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 664 Path familyDir = new Path(dir, Bytes.toString(fam)); 665 666 int hfileIdx = 0; 667 for (byte[][] range : hfileRanges) { 668 byte[] from = range[0]; 669 byte[] to = range[1]; 670 HFileTestUtil.createHFile(util.getConfiguration(), fs, 671 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 672 } 673 674 Table source = tables[masterNumber]; 675 final TableName tableName = source.getName(); 676 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 677 String[] args = { dir.toString(), tableName.toString() }; 678 loader.run(args); 679 680 if (toValidate) { 681 for (int slaveClusterNumber : slaveNumbers) { 682 wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]); 683 } 684 } 685 } 686 687 private void wait(int slaveNumber, Table target, int expectedCount) 688 throws IOException, InterruptedException { 689 int count = 0; 690 for (int i = 0; i < NB_RETRIES; i++) { 691 if (i == NB_RETRIES - 1) { 692 fail("Waited too much time for bulkloaded data replication. Current count=" + count 693 + ", expected count=" + expectedCount); 694 } 695 count = utilities[slaveNumber].countRows(target); 696 if (count != expectedCount) { 697 LOG.info("Waiting more time for bulkloaded data replication."); 698 Thread.sleep(SLEEP_TIME); 699 } else { 700 break; 701 } 702 } 703 } 704 705 private void wait(byte[] row, Table target, boolean isDeleted) throws Exception { 706 Get get = new Get(row); 707 for (int i = 0; i < NB_RETRIES; i++) { 708 if (i == NB_RETRIES - 1) { 709 fail("Waited too much time for replication. Row:" + Bytes.toString(row) 710 + ". IsDeleteReplication:" + isDeleted); 711 } 712 Result res = target.get(get); 713 boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty(); 714 if (sleep) { 715 LOG.info("Waiting for more time for replication. Row:" 716 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 717 Thread.sleep(SLEEP_TIME); 718 } else { 719 if (!isDeleted) { 720 assertArrayEquals(res.value(), row); 721 } 722 LOG.info("Obtained row:" 723 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 724 break; 725 } 726 } 727 } 728 729 private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, 730 final byte[] row) throws IOException { 731 final Admin admin = utility.getAdmin(); 732 final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); 733 734 // find the region that corresponds to the given row. 735 HRegion region = null; 736 for (HRegion candidate : cluster.getRegions(table)) { 737 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 738 region = candidate; 739 break; 740 } 741 } 742 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); 743 744 final CountDownLatch latch = new CountDownLatch(1); 745 746 // listen for successful log rolls 747 final WALActionsListener listener = new WALActionsListener() { 748 @Override 749 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 750 latch.countDown(); 751 } 752 }; 753 region.getWAL().registerWALActionsListener(listener); 754 755 // request a roll 756 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 757 region.getRegionInfo().getRegionName())); 758 759 // wait 760 try { 761 latch.await(); 762 } catch (InterruptedException exception) { 763 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " + 764 "replication tests fail, it's probably because we should still be waiting."); 765 Thread.currentThread().interrupt(); 766 } 767 region.getWAL().unregisterWALActionsListener(listener); 768 } 769 770 /** 771 * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same 772 * timestamp there is otherwise no way to count them. 773 */ 774 public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver { 775 private int nCount = 0; 776 private int nDelete = 0; 777 778 @Override 779 public Optional<RegionObserver> getRegionObserver() { 780 return Optional.of(this); 781 } 782 783 @Override 784 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 785 final WALEdit edit, final Durability durability) throws IOException { 786 nCount++; 787 } 788 789 @Override 790 public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, 791 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 792 nDelete++; 793 } 794 795 @Override 796 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, 797 final Get get, final List<Cell> result) throws IOException { 798 if (get.getAttribute("count") != null) { 799 result.clear(); 800 // order is important! 801 result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete))); 802 result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount))); 803 c.bypass(); 804 } 805 } 806 } 807 808}