001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertThrows; 025import static org.junit.jupiter.api.Assertions.fail; 026 027import java.io.Closeable; 028import java.io.IOException; 029import java.util.Arrays; 030import java.util.List; 031import java.util.Optional; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Delete; 050import org.apache.hadoop.hbase.client.Durability; 051import org.apache.hadoop.hbase.client.Get; 052import org.apache.hadoop.hbase.client.Put; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 057import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 059import org.apache.hadoop.hbase.coprocessor.ObserverContext; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 062import org.apache.hadoop.hbase.coprocessor.RegionObserver; 063import org.apache.hadoop.hbase.regionserver.HRegion; 064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 065import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.ReplicationTests; 068import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.HFileTestUtil; 071import org.apache.hadoop.hbase.wal.WALEdit; 072import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 073import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 074import org.junit.jupiter.api.AfterEach; 075import org.junit.jupiter.api.BeforeEach; 076import org.junit.jupiter.api.Tag; 077import org.junit.jupiter.api.Test; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081@Tag(ReplicationTests.TAG) 082@Tag(LargeTests.TAG) 083public class TestMasterReplication { 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class); 086 087 private Configuration baseConfiguration; 088 089 private HBaseTestingUtil[] utilities; 090 private Configuration[] configurations; 091 private MiniZooKeeperCluster miniZK; 092 093 private static final long SLEEP_TIME = 1000; 094 private static final int NB_RETRIES = 120; 095 096 private static final TableName tableName = TableName.valueOf("test"); 097 private static final byte[] famName = Bytes.toBytes("f"); 098 private static final byte[] famName1 = Bytes.toBytes("f1"); 099 private static final byte[] row = Bytes.toBytes("row"); 100 private static final byte[] row1 = Bytes.toBytes("row1"); 101 private static final byte[] row2 = Bytes.toBytes("row2"); 102 private static final byte[] row3 = Bytes.toBytes("row3"); 103 private static final byte[] row4 = Bytes.toBytes("row4"); 104 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 105 106 private static final byte[] count = Bytes.toBytes("count"); 107 private static final byte[] put = Bytes.toBytes("put"); 108 private static final byte[] delete = Bytes.toBytes("delete"); 109 110 private TableDescriptor table; 111 112 @BeforeEach 113 public void setUp() throws Exception { 114 baseConfiguration = HBaseConfiguration.create(); 115 // smaller block size and capacity to trigger more operations 116 // and test them 117 baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 118 baseConfiguration.setInt("replication.source.size.capacity", 1024); 119 baseConfiguration.setLong("replication.source.sleepforretries", 100); 120 baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); 121 baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); 122 baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 123 baseConfiguration.set("hbase.replication.source.fs.conf.provider", 124 TestSourceFSConfigurationProvider.class.getCanonicalName()); 125 baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 126 baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 127 baseConfiguration.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 128 CoprocessorCounter.class.getName()); 129 table = TableDescriptorBuilder.newBuilder(tableName) 130 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 131 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 132 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1) 133 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 134 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 135 } 136 137 /** 138 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by adding and deleting a 139 * row to a table in each cluster, checking if it's replicated. It also tests that the puts and 140 * deletes are not replicated back to the originating cluster. 141 */ 142 @Test 143 public void testCyclicReplication1() throws Exception { 144 LOG.info("testSimplePutDelete"); 145 int numClusters = 2; 146 Table[] htables = null; 147 try { 148 htables = setUpClusterTablesAndPeers(numClusters); 149 150 int[] expectedCounts = new int[] { 2, 2 }; 151 152 // add rows to both clusters, 153 // make sure they are both replication 154 putAndWait(row, famName, htables[0], htables[1]); 155 putAndWait(row1, famName, htables[1], htables[0]); 156 validateCounts(htables, put, expectedCounts); 157 158 deleteAndWait(row, htables[0], htables[1]); 159 deleteAndWait(row1, htables[1], htables[0]); 160 validateCounts(htables, delete, expectedCounts); 161 } finally { 162 close(htables); 163 shutDownMiniClusters(); 164 } 165 } 166 167 /** 168 * Tests the replication scenario 0 -> 0. By default 169 * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint}, 170 * the replication peer should not be added. 171 */ 172 @Test 173 public void testLoopedReplication() throws Exception { 174 LOG.info("testLoopedReplication"); 175 startMiniClusters(1); 176 createTableOnClusters(table); 177 assertThrows(DoNotRetryIOException.class, () -> addPeer("1", 0, 0)); 178 } 179 180 /** 181 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of 182 * HFiles to a table in each cluster, checking if it's replicated. 183 */ 184 @Test 185 public void testHFileCyclicReplication() throws Exception { 186 LOG.info("testHFileCyclicReplication"); 187 int numClusters = 2; 188 Table[] htables = null; 189 try { 190 htables = setUpClusterTablesAndPeers(numClusters); 191 192 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 193 // to cluster '1'. 194 byte[][][] hfileRanges = 195 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 196 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 197 int numOfRows = 100; 198 int[] expectedCounts = 199 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 200 201 loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row, 202 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 203 204 // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated 205 // to cluster '0'. 206 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 207 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 208 numOfRows = 200; 209 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 210 hfileRanges.length * numOfRows + expectedCounts[1] }; 211 212 loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row, 213 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 214 215 } finally { 216 close(htables); 217 shutDownMiniClusters(); 218 } 219 } 220 221 private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception { 222 Table[] htables; 223 startMiniClusters(numClusters); 224 createTableOnClusters(table); 225 226 htables = getHTablesOnClusters(tableName); 227 // Test the replication scenarios of 0 -> 1 -> 0 228 addPeer("1", 0, 1); 229 addPeer("1", 1, 0); 230 return htables; 231 } 232 233 /** 234 * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a 235 * table in each clusters and ensuring that the each of these clusters get the appropriate 236 * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits 237 * originating from itself and also the edits that it received using replication from a different 238 * cluster. The scenario is explained in HBASE-9158 239 */ 240 @Test 241 public void testCyclicReplication2() throws Exception { 242 LOG.info("testCyclicReplication2"); 243 int numClusters = 3; 244 Table[] htables = null; 245 try { 246 startMiniClusters(numClusters); 247 createTableOnClusters(table); 248 249 // Test the replication scenario of 0 -> 1 -> 2 -> 0 250 addPeer("1", 0, 1); 251 addPeer("1", 1, 2); 252 addPeer("1", 2, 0); 253 254 htables = getHTablesOnClusters(tableName); 255 256 // put "row" and wait 'til it got around 257 putAndWait(row, famName, htables[0], htables[2]); 258 putAndWait(row1, famName, htables[1], htables[0]); 259 putAndWait(row2, famName, htables[2], htables[1]); 260 261 deleteAndWait(row, htables[0], htables[2]); 262 deleteAndWait(row1, htables[1], htables[0]); 263 deleteAndWait(row2, htables[2], htables[1]); 264 265 int[] expectedCounts = new int[] { 3, 3, 3 }; 266 validateCounts(htables, put, expectedCounts); 267 validateCounts(htables, delete, expectedCounts); 268 269 // Test HBASE-9158 270 disablePeer("1", 2); 271 // we now have an edit that was replicated into cluster originating from 272 // cluster 0 273 putAndWait(row3, famName, htables[0], htables[1]); 274 // now add a local edit to cluster 1 275 htables[1].put(new Put(row4).addColumn(famName, row4, row4)); 276 // re-enable replication from cluster 2 to cluster 0 277 enablePeer("1", 2); 278 // without HBASE-9158 the edit for row4 would have been marked with 279 // cluster 0's id 280 // and hence not replicated to cluster 0 281 wait(row4, htables[0], false); 282 } finally { 283 close(htables); 284 shutDownMiniClusters(); 285 } 286 } 287 288 /** 289 * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk 290 * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers. 291 */ 292 @Test 293 public void testHFileMultiSlaveReplication() throws Exception { 294 LOG.info("testHFileMultiSlaveReplication"); 295 int numClusters = 3; 296 Table[] htables = null; 297 try { 298 startMiniClusters(numClusters); 299 createTableOnClusters(table); 300 301 // Add a slave, 0 -> 1 302 addPeer("1", 0, 1); 303 304 htables = getHTablesOnClusters(tableName); 305 306 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 307 // to cluster '1'. 308 byte[][][] hfileRanges = 309 new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") }, 310 new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, }; 311 int numOfRows = 100; 312 313 int[] expectedCounts = 314 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 315 316 loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row, 317 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 318 319 // Validate data is not replicated to cluster '2'. 320 assertEquals(0, HBaseTestingUtil.countRows(htables[2])); 321 322 rollWALAndWait(utilities[0], htables[0].getName(), row); 323 324 // Add one more slave, 0 -> 2 325 addPeer("2", 0, 2); 326 327 // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated 328 // to cluster '1' and '2'. Previous data should be replicated to cluster '2'. 329 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") }, 330 new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, }; 331 numOfRows = 200; 332 333 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 334 hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows }; 335 336 loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row, 337 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 338 339 } finally { 340 close(htables); 341 shutDownMiniClusters(); 342 } 343 } 344 345 /** 346 * It tests the bulk loaded hfile replication scenario to only explicitly specified table column 347 * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set 348 * only one CF data to replicate. 349 */ 350 @Test 351 public void testHFileReplicationForConfiguredTableCfs() throws Exception { 352 LOG.info("testHFileReplicationForConfiguredTableCfs"); 353 int numClusters = 2; 354 Table[] htables = null; 355 try { 356 startMiniClusters(numClusters); 357 createTableOnClusters(table); 358 359 htables = getHTablesOnClusters(tableName); 360 // Test the replication scenarios only 'f' is configured for table data replication not 'f1' 361 addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName)); 362 363 // Load 100 rows for each hfile range in cluster '0' for table CF 'f' 364 byte[][][] hfileRanges = 365 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 366 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 367 int numOfRows = 100; 368 int[] expectedCounts = 369 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 370 371 loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables, 372 hfileRanges, numOfRows, expectedCounts, true); 373 374 // Load 100 rows for each hfile range in cluster '0' for table CF 'f1' 375 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 376 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 377 numOfRows = 100; 378 379 int[] newExpectedCounts = 380 new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] }; 381 382 loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables, 383 hfileRanges, numOfRows, newExpectedCounts, false); 384 385 // Validate data replication for CF 'f1' 386 387 // Source cluster table should contain data for the families 388 wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]); 389 390 // Sleep for enough time so that the data is still not replicated for the CF which is not 391 // configured for replication 392 Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME); 393 // Peer cluster should have only configured CF data 394 wait(1, htables[1], expectedCounts[1]); 395 } finally { 396 close(htables); 397 shutDownMiniClusters(); 398 } 399 } 400 401 /** 402 * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. 403 */ 404 @Test 405 public void testCyclicReplication3() throws Exception { 406 LOG.info("testCyclicReplication2"); 407 int numClusters = 3; 408 Table[] htables = null; 409 try { 410 startMiniClusters(numClusters); 411 createTableOnClusters(table); 412 413 // Test the replication scenario of 0 -> 1 -> 2 -> 1 414 addPeer("1", 0, 1); 415 addPeer("1", 1, 2); 416 addPeer("1", 2, 1); 417 418 htables = getHTablesOnClusters(tableName); 419 420 // put "row" and wait 'til it got around 421 putAndWait(row, famName, htables[0], htables[2]); 422 putAndWait(row1, famName, htables[1], htables[2]); 423 putAndWait(row2, famName, htables[2], htables[1]); 424 425 deleteAndWait(row, htables[0], htables[2]); 426 deleteAndWait(row1, htables[1], htables[2]); 427 deleteAndWait(row2, htables[2], htables[1]); 428 429 int[] expectedCounts = new int[] { 1, 3, 3 }; 430 validateCounts(htables, put, expectedCounts); 431 validateCounts(htables, delete, expectedCounts); 432 } finally { 433 close(htables); 434 shutDownMiniClusters(); 435 } 436 } 437 438 /** 439 * Tests that base replication peer configs are applied on peer creation and the configs are 440 * overriden if updated as part of updateReplicationPeerConfig() 441 */ 442 @Test 443 public void testBasePeerConfigsForReplicationPeer() throws Exception { 444 LOG.info("testBasePeerConfigsForPeerMutations"); 445 String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; 446 String firstCustomPeerConfigValue = "test"; 447 String firstCustomPeerConfigUpdatedValue = "test_updated"; 448 449 String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config"; 450 String secondCustomPeerConfigValue = "testSecond"; 451 String secondCustomPeerConfigUpdatedValue = "testSecondUpdated"; 452 try { 453 baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 454 firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue)); 455 startMiniClusters(2); 456 addPeer("1", 0, 1); 457 addPeer("2", 0, 1); 458 Admin admin = utilities[0].getAdmin(); 459 460 // Validates base configs 1 is present for both peer. 461 assertEquals(firstCustomPeerConfigValue, 462 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 463 assertEquals(firstCustomPeerConfigValue, 464 admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey)); 465 466 // override value of configuration 1 for peer "1". 467 ReplicationPeerConfig updatedReplicationConfigForPeer1 = 468 ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("1")) 469 .putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build(); 470 471 // add configuration 2 for peer "2". 472 ReplicationPeerConfig updatedReplicationConfigForPeer2 = 473 ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("2")) 474 .putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build(); 475 476 admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1); 477 admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2); 478 479 // validates configuration is overridden by updateReplicationPeerConfig 480 assertEquals(firstCustomPeerConfigUpdatedValue, 481 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 482 assertEquals(secondCustomPeerConfigUpdatedValue, 483 admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey)); 484 485 // Add second config to base config and perform restart. 486 utilities[0].getConfiguration().set( 487 ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 488 firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue).concat(";") 489 .concat(secondCustomPeerConfigKey).concat("=").concat(secondCustomPeerConfigValue)); 490 491 utilities[0].shutdownMiniHBaseCluster(); 492 utilities[0].restartHBaseCluster(1); 493 admin = utilities[0].getAdmin(); 494 495 // Configurations should be updated after restart again 496 assertEquals(firstCustomPeerConfigValue, 497 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 498 assertEquals(firstCustomPeerConfigValue, 499 admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey)); 500 501 assertEquals(secondCustomPeerConfigValue, 502 admin.getReplicationPeerConfig("1").getConfiguration().get(secondCustomPeerConfigKey)); 503 assertEquals(secondCustomPeerConfigValue, 504 admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey)); 505 } finally { 506 shutDownMiniClusters(); 507 baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 508 } 509 } 510 511 @Test 512 public void testBasePeerConfigsRemovalForReplicationPeer() throws Exception { 513 LOG.info("testBasePeerConfigsForPeerMutations"); 514 String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; 515 String firstCustomPeerConfigValue = "test"; 516 517 try { 518 baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 519 firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue)); 520 startMiniClusters(2); 521 addPeer("1", 0, 1); 522 Admin admin = utilities[0].getAdmin(); 523 524 // Validates base configs 1 is present for both peer. 525 assertEquals(firstCustomPeerConfigValue, 526 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 527 528 utilities[0].getConfiguration() 529 .unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 530 utilities[0].getConfiguration().set( 531 ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 532 firstCustomPeerConfigKey.concat("=").concat("")); 533 534 utilities[0].shutdownMiniHBaseCluster(); 535 utilities[0].restartHBaseCluster(1); 536 admin = utilities[0].getAdmin(); 537 538 // Configurations should be removed after restart again 539 assertNull( 540 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 541 } finally { 542 shutDownMiniClusters(); 543 baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 544 } 545 } 546 547 @Test 548 public void testRemoveBasePeerConfigWithoutExistingConfigForReplicationPeer() throws Exception { 549 LOG.info("testBasePeerConfigsForPeerMutations"); 550 String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; 551 552 try { 553 baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 554 firstCustomPeerConfigKey.concat("=").concat("")); 555 startMiniClusters(2); 556 addPeer("1", 0, 1); 557 Admin admin = utilities[0].getAdmin(); 558 559 assertNull( 560 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey), 561 "Config should not be there"); 562 } finally { 563 shutDownMiniClusters(); 564 baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 565 } 566 } 567 568 @AfterEach 569 public void tearDown() throws IOException { 570 configurations = null; 571 utilities = null; 572 } 573 574 @SuppressWarnings("resource") 575 private void startMiniClusters(int numClusters) throws Exception { 576 utilities = new HBaseTestingUtil[numClusters]; 577 configurations = new Configuration[numClusters]; 578 for (int i = 0; i < numClusters; i++) { 579 Configuration conf = new Configuration(baseConfiguration); 580 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + ThreadLocalRandom.current().nextInt()); 581 HBaseTestingUtil utility = new HBaseTestingUtil(conf); 582 if (i == 0) { 583 utility.startMiniZKCluster(); 584 miniZK = utility.getZkCluster(); 585 } else { 586 utility.setZkCluster(miniZK); 587 } 588 utility.startMiniCluster(); 589 utilities[i] = utility; 590 configurations[i] = conf; 591 new ZKWatcher(conf, "cluster" + i, null, true); 592 } 593 } 594 595 private void shutDownMiniClusters() throws Exception { 596 int numClusters = utilities.length; 597 for (int i = numClusters - 1; i >= 0; i--) { 598 if (utilities[i] != null) { 599 utilities[i].shutdownMiniCluster(); 600 } 601 } 602 miniZK.shutdown(); 603 } 604 605 private void createTableOnClusters(TableDescriptor table) throws Exception { 606 for (HBaseTestingUtil utility : utilities) { 607 utility.getAdmin().createTable(table); 608 } 609 } 610 611 private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber) 612 throws Exception { 613 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 614 Admin admin = conn.getAdmin()) { 615 admin.addReplicationPeer(id, ReplicationPeerConfig.newBuilder() 616 .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI()).build()); 617 } 618 } 619 620 private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) 621 throws Exception { 622 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 623 Admin admin = conn.getAdmin()) { 624 admin.addReplicationPeer(id, 625 ReplicationPeerConfig.newBuilder() 626 .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI()) 627 .setReplicateAllUserTables(false) 628 .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)).build()); 629 } 630 } 631 632 private void disablePeer(String id, int masterClusterNumber) throws Exception { 633 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 634 Admin admin = conn.getAdmin()) { 635 admin.disableReplicationPeer(id); 636 } 637 } 638 639 private void enablePeer(String id, int masterClusterNumber) throws Exception { 640 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 641 Admin admin = conn.getAdmin()) { 642 admin.enableReplicationPeer(id); 643 } 644 } 645 646 private void close(Closeable... closeables) { 647 try { 648 if (closeables != null) { 649 for (Closeable closeable : closeables) { 650 closeable.close(); 651 } 652 } 653 } catch (Exception e) { 654 LOG.warn("Exception occurred while closing the object:", e); 655 } 656 } 657 658 @SuppressWarnings("resource") 659 private Table[] getHTablesOnClusters(TableName tableName) throws Exception { 660 int numClusters = utilities.length; 661 Table[] htables = new Table[numClusters]; 662 for (int i = 0; i < numClusters; i++) { 663 Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName); 664 htables[i] = htable; 665 } 666 return htables; 667 } 668 669 private void validateCounts(Table[] htables, byte[] type, int[] expectedCounts) 670 throws IOException { 671 for (int i = 0; i < htables.length; i++) { 672 assertEquals(expectedCounts[i], getCount(htables[i], type), 673 Bytes.toString(type) + " were replicated back"); 674 } 675 } 676 677 private int getCount(Table t, byte[] type) throws IOException { 678 Get test = new Get(row); 679 test.setAttribute("count", new byte[] {}); 680 Result res = t.get(test); 681 return Bytes.toInt(res.getValue(count, type)); 682 } 683 684 private void deleteAndWait(byte[] row, Table source, Table target) throws Exception { 685 Delete del = new Delete(row); 686 source.delete(del); 687 wait(row, target, true); 688 } 689 690 private void putAndWait(byte[] row, byte[] fam, Table source, Table target) throws Exception { 691 Put put = new Put(row); 692 put.addColumn(fam, row, row); 693 source.put(put); 694 wait(row, target, false); 695 } 696 697 private void loadAndValidateHFileReplication(String testName, int masterNumber, 698 int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges, 699 int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception { 700 HBaseTestingUtil util = utilities[masterNumber]; 701 702 Path dir = util.getDataTestDirOnTestFS(testName); 703 FileSystem fs = util.getTestFileSystem(); 704 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 705 Path familyDir = new Path(dir, Bytes.toString(fam)); 706 707 int hfileIdx = 0; 708 for (byte[][] range : hfileRanges) { 709 byte[] from = range[0]; 710 byte[] to = range[1]; 711 HFileTestUtil.createHFile(util.getConfiguration(), fs, 712 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 713 } 714 715 Table source = tables[masterNumber]; 716 final TableName tableName = source.getName(); 717 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir); 718 719 if (toValidate) { 720 for (int slaveClusterNumber : slaveNumbers) { 721 wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]); 722 } 723 } 724 } 725 726 private void wait(int slaveNumber, Table target, int expectedCount) 727 throws IOException, InterruptedException { 728 int count = 0; 729 for (int i = 0; i < NB_RETRIES; i++) { 730 if (i == NB_RETRIES - 1) { 731 fail("Waited too much time for bulkloaded data replication. Current count=" + count 732 + ", expected count=" + expectedCount); 733 } 734 count = HBaseTestingUtil.countRows(target); 735 if (count != expectedCount) { 736 LOG.info("Waiting more time for bulkloaded data replication."); 737 Thread.sleep(SLEEP_TIME); 738 } else { 739 break; 740 } 741 } 742 } 743 744 private void wait(byte[] row, Table target, boolean isDeleted) throws Exception { 745 Get get = new Get(row); 746 for (int i = 0; i < NB_RETRIES; i++) { 747 if (i == NB_RETRIES - 1) { 748 fail("Waited too much time for replication. Row:" + Bytes.toString(row) 749 + ". IsDeleteReplication:" + isDeleted); 750 } 751 Result res = target.get(get); 752 boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty(); 753 if (sleep) { 754 LOG.info("Waiting for more time for replication. Row:" + Bytes.toString(row) 755 + ". IsDeleteReplication:" + isDeleted); 756 Thread.sleep(SLEEP_TIME); 757 } else { 758 if (!isDeleted) { 759 assertArrayEquals(res.value(), row); 760 } 761 LOG.info("Obtained row:" + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 762 break; 763 } 764 } 765 } 766 767 private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table, 768 final byte[] row) throws IOException { 769 final Admin admin = utility.getAdmin(); 770 final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster(); 771 772 // find the region that corresponds to the given row. 773 HRegion region = null; 774 for (HRegion candidate : cluster.getRegions(table)) { 775 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 776 region = candidate; 777 break; 778 } 779 } 780 assertNotNull(region, "Couldn't find the region for row '" + Arrays.toString(row) + "'"); 781 782 final CountDownLatch latch = new CountDownLatch(1); 783 784 // listen for successful log rolls 785 final WALActionsListener listener = new WALActionsListener() { 786 @Override 787 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 788 latch.countDown(); 789 } 790 }; 791 region.getWAL().registerWALActionsListener(listener); 792 793 // request a roll 794 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 795 region.getRegionInfo().getRegionName())); 796 797 // wait 798 try { 799 latch.await(); 800 } catch (InterruptedException exception) { 801 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " 802 + "replication tests fail, it's probably because we should still be waiting."); 803 Thread.currentThread().interrupt(); 804 } 805 region.getWAL().unregisterWALActionsListener(listener); 806 } 807 808 /** 809 * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same 810 * timestamp there is otherwise no way to count them. 811 */ 812 public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver { 813 private int nCount = 0; 814 private int nDelete = 0; 815 816 @Override 817 public Optional<RegionObserver> getRegionObserver() { 818 return Optional.of(this); 819 } 820 821 @Override 822 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 823 final Put put, final WALEdit edit, final Durability durability) throws IOException { 824 nCount++; 825 } 826 827 @Override 828 public void postDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 829 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 830 nDelete++; 831 } 832 833 @Override 834 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 835 final Get get, final List<Cell> result) throws IOException { 836 if (get.getAttribute("count") != null) { 837 result.clear(); 838 // order is important! 839 result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete))); 840 result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount))); 841 c.bypass(); 842 } 843 } 844 } 845 846}