001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.fail; 024 025import java.io.Closeable; 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.EnumSet; 029import java.util.List; 030import java.util.Optional; 031import java.util.Random; 032import java.util.concurrent.CountDownLatch; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.ClusterMetrics; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.MiniHBaseCluster; 044import org.apache.hadoop.hbase.ServerMetrics; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.Waiter; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.Delete; 052import org.apache.hadoop.hbase.client.Durability; 053import org.apache.hadoop.hbase.client.Get; 054import org.apache.hadoop.hbase.client.Put; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 061import org.apache.hadoop.hbase.coprocessor.ObserverContext; 062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 064import org.apache.hadoop.hbase.coprocessor.RegionObserver; 065import org.apache.hadoop.hbase.regionserver.HRegion; 066import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 067import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.testclassification.ReplicationTests; 070import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.HFileTestUtil; 073import org.apache.hadoop.hbase.wal.WALEdit; 074import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 075import org.apache.hadoop.hbase.zookeeper.ZKUtil; 076import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 077import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 078import org.junit.After; 079import org.junit.Before; 080import org.junit.ClassRule; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086@Category({ReplicationTests.class, LargeTests.class}) 087public class TestMasterReplication { 088 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestMasterReplication.class); 092 093 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); 094 095 private Configuration baseConfiguration; 096 097 private HBaseTestingUtility[] utilities; 098 private Configuration[] configurations; 099 private MiniZooKeeperCluster miniZK; 100 101 private static final long SLEEP_TIME = 1000; 102 private static final int NB_RETRIES = 120; 103 104 private static final TableName tableName = TableName.valueOf("test"); 105 private static final byte[] famName = Bytes.toBytes("f"); 106 private static final byte[] famName1 = Bytes.toBytes("f1"); 107 private static final byte[] row = Bytes.toBytes("row"); 108 private static final byte[] row1 = Bytes.toBytes("row1"); 109 private static final byte[] row2 = Bytes.toBytes("row2"); 110 private static final byte[] row3 = Bytes.toBytes("row3"); 111 private static final byte[] row4 = Bytes.toBytes("row4"); 112 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 113 114 private static final byte[] count = Bytes.toBytes("count"); 115 private static final byte[] put = Bytes.toBytes("put"); 116 private static final byte[] delete = Bytes.toBytes("delete"); 117 118 private TableDescriptor table; 119 120 @Before 121 public void setUp() throws Exception { 122 baseConfiguration = HBaseConfiguration.create(); 123 // smaller block size and capacity to trigger more operations 124 // and test them 125 baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 126 baseConfiguration.setInt("replication.source.size.capacity", 1024); 127 baseConfiguration.setLong("replication.source.sleepforretries", 100); 128 baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); 129 baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); 130 baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 131 baseConfiguration.set("hbase.replication.source.fs.conf.provider", 132 TestSourceFSConfigurationProvider.class.getCanonicalName()); 133 baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 134 baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 135 baseConfiguration.setStrings( 136 CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 137 CoprocessorCounter.class.getName()); 138 table = TableDescriptorBuilder.newBuilder(tableName) 139 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 140 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 141 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1) 142 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 143 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 144 } 145 146 /** 147 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by 148 * adding and deleting a row to a table in each cluster, checking if it's 149 * replicated. It also tests that the puts and deletes are not replicated back 150 * to the originating cluster. 151 */ 152 @Test 153 public void testCyclicReplication1() throws Exception { 154 LOG.info("testSimplePutDelete"); 155 int numClusters = 2; 156 Table[] htables = null; 157 try { 158 htables = setUpClusterTablesAndPeers(numClusters); 159 160 int[] expectedCounts = new int[] { 2, 2 }; 161 162 // add rows to both clusters, 163 // make sure they are both replication 164 putAndWait(row, famName, htables[0], htables[1]); 165 putAndWait(row1, famName, htables[1], htables[0]); 166 validateCounts(htables, put, expectedCounts); 167 168 deleteAndWait(row, htables[0], htables[1]); 169 deleteAndWait(row1, htables[1], htables[0]); 170 validateCounts(htables, delete, expectedCounts); 171 } finally { 172 close(htables); 173 shutDownMiniClusters(); 174 } 175 } 176 177 /** 178 * Tests the replication scenario 0 -> 0. By default 179 * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the 180 * ReplicationSource should terminate, and no further logs should get enqueued 181 */ 182 @Test 183 public void testLoopedReplication() throws Exception { 184 LOG.info("testLoopedReplication"); 185 startMiniClusters(1); 186 createTableOnClusters(table); 187 addPeer("1", 0, 0); 188 Thread.sleep(SLEEP_TIME); 189 190 // wait for source to terminate 191 final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName(); 192 Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() { 193 @Override 194 public boolean evaluate() throws Exception { 195 ClusterMetrics clusterStatus = utilities[0].getAdmin() 196 .getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS)); 197 ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName); 198 List<ReplicationLoadSource> replicationLoadSourceList = 199 serverLoad.getReplicationLoadSourceList(); 200 return replicationLoadSourceList.isEmpty(); 201 } 202 }); 203 204 Table[] htables = getHTablesOnClusters(tableName); 205 putAndWait(row, famName, htables[0], htables[0]); 206 rollWALAndWait(utilities[0], table.getTableName(), row); 207 ZKWatcher zkw = utilities[0].getZooKeeperWatcher(); 208 String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, 209 ZNodePaths.joinZNode("replication", "rs")); 210 List<String> listChildrenNoWatch = 211 ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString())); 212 assertEquals(0, listChildrenNoWatch.size()); 213 } 214 215 /** 216 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of 217 * HFiles to a table in each cluster, checking if it's replicated. 218 */ 219 @Test 220 public void testHFileCyclicReplication() throws Exception { 221 LOG.info("testHFileCyclicReplication"); 222 int numClusters = 2; 223 Table[] htables = null; 224 try { 225 htables = setUpClusterTablesAndPeers(numClusters); 226 227 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 228 // to cluster '1'. 229 byte[][][] hfileRanges = 230 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 231 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 232 int numOfRows = 100; 233 int[] expectedCounts = 234 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 235 236 loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row, 237 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 238 239 // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated 240 // to cluster '0'. 241 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 242 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 243 numOfRows = 200; 244 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 245 hfileRanges.length * numOfRows + expectedCounts[1] }; 246 247 loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row, 248 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 249 250 } finally { 251 close(htables); 252 shutDownMiniClusters(); 253 } 254 } 255 256 private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception { 257 Table[] htables; 258 startMiniClusters(numClusters); 259 createTableOnClusters(table); 260 261 htables = getHTablesOnClusters(tableName); 262 // Test the replication scenarios of 0 -> 1 -> 0 263 addPeer("1", 0, 1); 264 addPeer("1", 1, 0); 265 return htables; 266 } 267 268 /** 269 * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a 270 * table in each clusters and ensuring that the each of these clusters get the appropriate 271 * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits 272 * originating from itself and also the edits that it received using replication from a different 273 * cluster. The scenario is explained in HBASE-9158 274 */ 275 @Test 276 public void testCyclicReplication2() throws Exception { 277 LOG.info("testCyclicReplication2"); 278 int numClusters = 3; 279 Table[] htables = null; 280 try { 281 startMiniClusters(numClusters); 282 createTableOnClusters(table); 283 284 // Test the replication scenario of 0 -> 1 -> 2 -> 0 285 addPeer("1", 0, 1); 286 addPeer("1", 1, 2); 287 addPeer("1", 2, 0); 288 289 htables = getHTablesOnClusters(tableName); 290 291 // put "row" and wait 'til it got around 292 putAndWait(row, famName, htables[0], htables[2]); 293 putAndWait(row1, famName, htables[1], htables[0]); 294 putAndWait(row2, famName, htables[2], htables[1]); 295 296 deleteAndWait(row, htables[0], htables[2]); 297 deleteAndWait(row1, htables[1], htables[0]); 298 deleteAndWait(row2, htables[2], htables[1]); 299 300 int[] expectedCounts = new int[] { 3, 3, 3 }; 301 validateCounts(htables, put, expectedCounts); 302 validateCounts(htables, delete, expectedCounts); 303 304 // Test HBASE-9158 305 disablePeer("1", 2); 306 // we now have an edit that was replicated into cluster originating from 307 // cluster 0 308 putAndWait(row3, famName, htables[0], htables[1]); 309 // now add a local edit to cluster 1 310 htables[1].put(new Put(row4).addColumn(famName, row4, row4)); 311 // re-enable replication from cluster 2 to cluster 0 312 enablePeer("1", 2); 313 // without HBASE-9158 the edit for row4 would have been marked with 314 // cluster 0's id 315 // and hence not replicated to cluster 0 316 wait(row4, htables[0], false); 317 } finally { 318 close(htables); 319 shutDownMiniClusters(); 320 } 321 } 322 323 /** 324 * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk 325 * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers. 326 */ 327 @Test 328 public void testHFileMultiSlaveReplication() throws Exception { 329 LOG.info("testHFileMultiSlaveReplication"); 330 int numClusters = 3; 331 Table[] htables = null; 332 try { 333 startMiniClusters(numClusters); 334 createTableOnClusters(table); 335 336 // Add a slave, 0 -> 1 337 addPeer("1", 0, 1); 338 339 htables = getHTablesOnClusters(tableName); 340 341 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 342 // to cluster '1'. 343 byte[][][] hfileRanges = 344 new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") }, 345 new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, }; 346 int numOfRows = 100; 347 348 int[] expectedCounts = 349 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 350 351 loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row, 352 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 353 354 // Validate data is not replicated to cluster '2'. 355 assertEquals(0, utilities[2].countRows(htables[2])); 356 357 rollWALAndWait(utilities[0], htables[0].getName(), row); 358 359 // Add one more slave, 0 -> 2 360 addPeer("2", 0, 2); 361 362 // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated 363 // to cluster '1' and '2'. Previous data should be replicated to cluster '2'. 364 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") }, 365 new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, }; 366 numOfRows = 200; 367 368 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 369 hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows }; 370 371 loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row, 372 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 373 374 } finally { 375 close(htables); 376 shutDownMiniClusters(); 377 } 378 } 379 380 /** 381 * It tests the bulk loaded hfile replication scenario to only explicitly specified table column 382 * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set 383 * only one CF data to replicate. 384 */ 385 @Test 386 public void testHFileReplicationForConfiguredTableCfs() throws Exception { 387 LOG.info("testHFileReplicationForConfiguredTableCfs"); 388 int numClusters = 2; 389 Table[] htables = null; 390 try { 391 startMiniClusters(numClusters); 392 createTableOnClusters(table); 393 394 htables = getHTablesOnClusters(tableName); 395 // Test the replication scenarios only 'f' is configured for table data replication not 'f1' 396 addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName)); 397 398 // Load 100 rows for each hfile range in cluster '0' for table CF 'f' 399 byte[][][] hfileRanges = 400 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 401 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 402 int numOfRows = 100; 403 int[] expectedCounts = 404 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 405 406 loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables, 407 hfileRanges, numOfRows, expectedCounts, true); 408 409 // Load 100 rows for each hfile range in cluster '0' for table CF 'f1' 410 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 411 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 412 numOfRows = 100; 413 414 int[] newExpectedCounts = 415 new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] }; 416 417 loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables, 418 hfileRanges, numOfRows, newExpectedCounts, false); 419 420 // Validate data replication for CF 'f1' 421 422 // Source cluster table should contain data for the families 423 wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]); 424 425 // Sleep for enough time so that the data is still not replicated for the CF which is not 426 // configured for replication 427 Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME); 428 // Peer cluster should have only configured CF data 429 wait(1, htables[1], expectedCounts[1]); 430 } finally { 431 close(htables); 432 shutDownMiniClusters(); 433 } 434 } 435 436 /** 437 * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. 438 */ 439 @Test 440 public void testCyclicReplication3() throws Exception { 441 LOG.info("testCyclicReplication2"); 442 int numClusters = 3; 443 Table[] htables = null; 444 try { 445 startMiniClusters(numClusters); 446 createTableOnClusters(table); 447 448 // Test the replication scenario of 0 -> 1 -> 2 -> 1 449 addPeer("1", 0, 1); 450 addPeer("1", 1, 2); 451 addPeer("1", 2, 1); 452 453 htables = getHTablesOnClusters(tableName); 454 455 // put "row" and wait 'til it got around 456 putAndWait(row, famName, htables[0], htables[2]); 457 putAndWait(row1, famName, htables[1], htables[2]); 458 putAndWait(row2, famName, htables[2], htables[1]); 459 460 deleteAndWait(row, htables[0], htables[2]); 461 deleteAndWait(row1, htables[1], htables[2]); 462 deleteAndWait(row2, htables[2], htables[1]); 463 464 int[] expectedCounts = new int[] { 1, 3, 3 }; 465 validateCounts(htables, put, expectedCounts); 466 validateCounts(htables, delete, expectedCounts); 467 } finally { 468 close(htables); 469 shutDownMiniClusters(); 470 } 471 } 472 473 @After 474 public void tearDown() throws IOException { 475 configurations = null; 476 utilities = null; 477 } 478 479 @SuppressWarnings("resource") 480 private void startMiniClusters(int numClusters) throws Exception { 481 Random random = new Random(); 482 utilities = new HBaseTestingUtility[numClusters]; 483 configurations = new Configuration[numClusters]; 484 for (int i = 0; i < numClusters; i++) { 485 Configuration conf = new Configuration(baseConfiguration); 486 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt()); 487 HBaseTestingUtility utility = new HBaseTestingUtility(conf); 488 if (i == 0) { 489 utility.startMiniZKCluster(); 490 miniZK = utility.getZkCluster(); 491 } else { 492 utility.setZkCluster(miniZK); 493 } 494 utility.startMiniCluster(); 495 utilities[i] = utility; 496 configurations[i] = conf; 497 new ZKWatcher(conf, "cluster" + i, null, true); 498 } 499 } 500 501 private void shutDownMiniClusters() throws Exception { 502 int numClusters = utilities.length; 503 for (int i = numClusters - 1; i >= 0; i--) { 504 if (utilities[i] != null) { 505 utilities[i].shutdownMiniCluster(); 506 } 507 } 508 miniZK.shutdown(); 509 } 510 511 private void createTableOnClusters(TableDescriptor table) throws Exception { 512 for (HBaseTestingUtility utility : utilities) { 513 utility.getAdmin().createTable(table); 514 } 515 } 516 517 private void addPeer(String id, int masterClusterNumber, 518 int slaveClusterNumber) throws Exception { 519 try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) 520 .getAdmin()) { 521 admin.addReplicationPeer(id, 522 new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())); 523 } 524 } 525 526 private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) 527 throws Exception { 528 try (Admin admin = 529 ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) { 530 admin.addReplicationPeer( 531 id, 532 new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) 533 .setReplicateAllUserTables(false) 534 .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs))); 535 } 536 } 537 538 private void disablePeer(String id, int masterClusterNumber) throws Exception { 539 try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) 540 .getAdmin()) { 541 admin.disableReplicationPeer(id); 542 } 543 } 544 545 private void enablePeer(String id, int masterClusterNumber) throws Exception { 546 try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) 547 .getAdmin()) { 548 admin.enableReplicationPeer(id); 549 } 550 } 551 552 private void close(Closeable... closeables) { 553 try { 554 if (closeables != null) { 555 for (Closeable closeable : closeables) { 556 closeable.close(); 557 } 558 } 559 } catch (Exception e) { 560 LOG.warn("Exception occurred while closing the object:", e); 561 } 562 } 563 564 @SuppressWarnings("resource") 565 private Table[] getHTablesOnClusters(TableName tableName) throws Exception { 566 int numClusters = utilities.length; 567 Table[] htables = new Table[numClusters]; 568 for (int i = 0; i < numClusters; i++) { 569 Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName); 570 htables[i] = htable; 571 } 572 return htables; 573 } 574 575 private void validateCounts(Table[] htables, byte[] type, 576 int[] expectedCounts) throws IOException { 577 for (int i = 0; i < htables.length; i++) { 578 assertEquals(Bytes.toString(type) + " were replicated back ", 579 expectedCounts[i], getCount(htables[i], type)); 580 } 581 } 582 583 private int getCount(Table t, byte[] type) throws IOException { 584 Get test = new Get(row); 585 test.setAttribute("count", new byte[] {}); 586 Result res = t.get(test); 587 return Bytes.toInt(res.getValue(count, type)); 588 } 589 590 private void deleteAndWait(byte[] row, Table source, Table target) 591 throws Exception { 592 Delete del = new Delete(row); 593 source.delete(del); 594 wait(row, target, true); 595 } 596 597 private void putAndWait(byte[] row, byte[] fam, Table source, Table target) 598 throws Exception { 599 Put put = new Put(row); 600 put.addColumn(fam, row, row); 601 source.put(put); 602 wait(row, target, false); 603 } 604 605 private void loadAndValidateHFileReplication(String testName, int masterNumber, 606 int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges, 607 int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception { 608 HBaseTestingUtility util = utilities[masterNumber]; 609 610 Path dir = util.getDataTestDirOnTestFS(testName); 611 FileSystem fs = util.getTestFileSystem(); 612 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 613 Path familyDir = new Path(dir, Bytes.toString(fam)); 614 615 int hfileIdx = 0; 616 for (byte[][] range : hfileRanges) { 617 byte[] from = range[0]; 618 byte[] to = range[1]; 619 HFileTestUtil.createHFile(util.getConfiguration(), fs, 620 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 621 } 622 623 Table source = tables[masterNumber]; 624 final TableName tableName = source.getName(); 625 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 626 String[] args = { dir.toString(), tableName.toString() }; 627 loader.run(args); 628 629 if (toValidate) { 630 for (int slaveClusterNumber : slaveNumbers) { 631 wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]); 632 } 633 } 634 } 635 636 private void wait(int slaveNumber, Table target, int expectedCount) 637 throws IOException, InterruptedException { 638 int count = 0; 639 for (int i = 0; i < NB_RETRIES; i++) { 640 if (i == NB_RETRIES - 1) { 641 fail("Waited too much time for bulkloaded data replication. Current count=" + count 642 + ", expected count=" + expectedCount); 643 } 644 count = utilities[slaveNumber].countRows(target); 645 if (count != expectedCount) { 646 LOG.info("Waiting more time for bulkloaded data replication."); 647 Thread.sleep(SLEEP_TIME); 648 } else { 649 break; 650 } 651 } 652 } 653 654 private void wait(byte[] row, Table target, boolean isDeleted) throws Exception { 655 Get get = new Get(row); 656 for (int i = 0; i < NB_RETRIES; i++) { 657 if (i == NB_RETRIES - 1) { 658 fail("Waited too much time for replication. Row:" + Bytes.toString(row) 659 + ". IsDeleteReplication:" + isDeleted); 660 } 661 Result res = target.get(get); 662 boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty(); 663 if (sleep) { 664 LOG.info("Waiting for more time for replication. Row:" 665 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 666 Thread.sleep(SLEEP_TIME); 667 } else { 668 if (!isDeleted) { 669 assertArrayEquals(res.value(), row); 670 } 671 LOG.info("Obtained row:" 672 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 673 break; 674 } 675 } 676 } 677 678 private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, 679 final byte[] row) throws IOException { 680 final Admin admin = utility.getAdmin(); 681 final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); 682 683 // find the region that corresponds to the given row. 684 HRegion region = null; 685 for (HRegion candidate : cluster.getRegions(table)) { 686 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 687 region = candidate; 688 break; 689 } 690 } 691 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); 692 693 final CountDownLatch latch = new CountDownLatch(1); 694 695 // listen for successful log rolls 696 final WALActionsListener listener = new WALActionsListener() { 697 @Override 698 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 699 latch.countDown(); 700 } 701 }; 702 region.getWAL().registerWALActionsListener(listener); 703 704 // request a roll 705 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 706 region.getRegionInfo().getRegionName())); 707 708 // wait 709 try { 710 latch.await(); 711 } catch (InterruptedException exception) { 712 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " + 713 "replication tests fail, it's probably because we should still be waiting."); 714 Thread.currentThread().interrupt(); 715 } 716 region.getWAL().unregisterWALActionsListener(listener); 717 } 718 719 /** 720 * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same 721 * timestamp there is otherwise no way to count them. 722 */ 723 public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver { 724 private int nCount = 0; 725 private int nDelete = 0; 726 727 @Override 728 public Optional<RegionObserver> getRegionObserver() { 729 return Optional.of(this); 730 } 731 732 @Override 733 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 734 final WALEdit edit, final Durability durability) throws IOException { 735 nCount++; 736 } 737 738 @Override 739 public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, 740 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 741 nDelete++; 742 } 743 744 @Override 745 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, 746 final Get get, final List<Cell> result) throws IOException { 747 if (get.getAttribute("count") != null) { 748 result.clear(); 749 // order is important! 750 result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete))); 751 result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount))); 752 c.bypass(); 753 } 754 } 755 } 756 757}