001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Objects; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.stream.Collectors; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.MetaTableAccessor; 040import org.apache.hadoop.hbase.MiniHBaseCluster; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.StartMiniClusterOption; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.UnknownRegionException; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionReplicaUtil; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.ResultScanner; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.exceptions.MergeRegionException; 057import org.apache.hadoop.hbase.master.HMaster; 058import org.apache.hadoop.hbase.master.MasterRpcServices; 059import org.apache.hadoop.hbase.master.RegionState; 060import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 061import org.apache.hadoop.hbase.master.assignment.RegionStates; 062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 063import org.apache.hadoop.hbase.testclassification.LargeTests; 064import org.apache.hadoop.hbase.testclassification.RegionServerTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.CommonFSUtils; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.hbase.util.FutureUtils; 069import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 070import org.apache.hadoop.hbase.util.Pair; 071import org.apache.hadoop.hbase.util.PairOfSameType; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.util.StringUtils; 074import org.apache.zookeeper.KeeperException; 075import org.junit.AfterClass; 076import org.junit.BeforeClass; 077import org.junit.ClassRule; 078import org.junit.Rule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.junit.rules.TestName; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 086import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 087import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 088 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 092 093@Category({ RegionServerTests.class, LargeTests.class }) 094public class TestRegionMergeTransactionOnCluster { 095 096 @ClassRule 097 public static final HBaseClassTestRule CLASS_RULE = 098 HBaseClassTestRule.forClass(TestRegionMergeTransactionOnCluster.class); 099 100 private static final Logger LOG = 101 LoggerFactory.getLogger(TestRegionMergeTransactionOnCluster.class); 102 103 @Rule 104 public TestName name = new TestName(); 105 106 private static final int NB_SERVERS = 3; 107 108 private static final byte[] FAMILYNAME = Bytes.toBytes("fam"); 109 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 110 111 private static byte[] ROW = Bytes.toBytes("testRow"); 112 private static final int INITIAL_REGION_NUM = 10; 113 private static final int ROWSIZE = 200; 114 private static byte[][] ROWS = makeN(ROW, ROWSIZE); 115 116 private static int waitTime = 60 * 1000; 117 118 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 119 120 private static HMaster MASTER; 121 private static Admin ADMIN; 122 123 @BeforeClass 124 public static void beforeAllTests() throws Exception { 125 // Start a cluster 126 StartMiniClusterOption option = StartMiniClusterOption.builder().masterClass(MyMaster.class) 127 .numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build(); 128 TEST_UTIL.startMiniCluster(option); 129 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 130 MASTER = cluster.getMaster(); 131 MASTER.balanceSwitch(false); 132 ADMIN = TEST_UTIL.getConnection().getAdmin(); 133 } 134 135 @AfterClass 136 public static void afterAllTests() throws Exception { 137 TEST_UTIL.shutdownMiniCluster(); 138 if (ADMIN != null) { 139 ADMIN.close(); 140 } 141 } 142 143 @Test 144 public void testWholesomeMerge() throws Exception { 145 LOG.info("Starting " + name.getMethodName()); 146 final TableName tableName = TableName.valueOf(name.getMethodName()); 147 148 try { 149 // Create table and load data. 150 Table table = createTableAndLoadData(MASTER, tableName); 151 // Merge 1st and 2nd region 152 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1); 153 154 // Merge 2nd and 3th region 155 PairOfSameType<RegionInfo> mergedRegions = 156 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 1, 2, INITIAL_REGION_NUM - 2); 157 158 verifyRowCount(table, ROWSIZE); 159 160 // Randomly choose one of the two merged regions 161 RegionInfo hri = ThreadLocalRandom.current().nextBoolean() 162 ? mergedRegions.getFirst() 163 : mergedRegions.getSecond(); 164 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 165 AssignmentManager am = cluster.getMaster().getAssignmentManager(); 166 RegionStates regionStates = am.getRegionStates(); 167 168 // We should not be able to assign it again 169 am.assign(hri); 170 assertFalse("Merged region can't be assigned", regionStates.isRegionInTransition(hri)); 171 172 // We should not be able to unassign it either 173 am.unassign(hri); 174 assertFalse("Merged region can't be unassigned", regionStates.isRegionInTransition(hri)); 175 176 table.close(); 177 } finally { 178 TEST_UTIL.deleteTable(tableName); 179 } 180 } 181 182 /** 183 * Not really restarting the master. Simulate it by clear of new region state since it is not 184 * persisted, will be lost after master restarts. 185 */ 186 @Test 187 public void testMergeAndRestartingMaster() throws Exception { 188 final TableName tableName = TableName.valueOf(name.getMethodName()); 189 190 try { 191 // Create table and load data. 192 Table table = createTableAndLoadData(MASTER, tableName); 193 194 try { 195 MyMasterRpcServices.enabled.set(true); 196 197 // Merge 1st and 2nd region 198 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1); 199 } finally { 200 MyMasterRpcServices.enabled.set(false); 201 } 202 203 table.close(); 204 } finally { 205 TEST_UTIL.deleteTable(tableName); 206 } 207 } 208 209 @Test 210 public void testCleanMergeReference() throws Exception { 211 LOG.info("Starting " + name.getMethodName()); 212 ADMIN.enableCatalogJanitor(false); 213 final TableName tableName = TableName.valueOf(name.getMethodName()); 214 try { 215 // Create table and load data. 216 Table table = createTableAndLoadData(MASTER, tableName); 217 // Merge 1st and 2nd region 218 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1); 219 verifyRowCount(table, ROWSIZE); 220 table.close(); 221 222 List<Pair<RegionInfo, ServerName>> tableRegions = 223 MetaTableAccessor.getTableRegionsAndLocations(MASTER.getConnection(), tableName); 224 RegionInfo mergedRegionInfo = tableRegions.get(0).getFirst(); 225 TableDescriptor tableDescriptor = MASTER.getTableDescriptors().get(tableName); 226 Result mergedRegionResult = 227 MetaTableAccessor.getRegionResult(MASTER.getConnection(), mergedRegionInfo.getRegionName()); 228 229 // contains merge reference in META 230 assertTrue(MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells())); 231 232 // merging regions' directory are in the file system all the same 233 List<RegionInfo> p = MetaTableAccessor.getMergeRegions(mergedRegionResult.rawCells()); 234 RegionInfo regionA = p.get(0); 235 RegionInfo regionB = p.get(1); 236 FileSystem fs = MASTER.getMasterFileSystem().getFileSystem(); 237 Path rootDir = MASTER.getMasterFileSystem().getRootDir(); 238 239 Path tabledir = CommonFSUtils.getTableDir(rootDir, mergedRegionInfo.getTable()); 240 Path regionAdir = new Path(tabledir, regionA.getEncodedName()); 241 Path regionBdir = new Path(tabledir, regionB.getEncodedName()); 242 assertTrue(fs.exists(regionAdir)); 243 assertTrue(fs.exists(regionBdir)); 244 245 ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies(); 246 HRegionFileSystem hrfs = 247 new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo); 248 int count = 0; 249 for (ColumnFamilyDescriptor colFamily : columnFamilies) { 250 count += hrfs.getStoreFiles(colFamily.getNameAsString()).size(); 251 } 252 ADMIN.compactRegion(mergedRegionInfo.getRegionName()); 253 // clean up the merged region store files 254 // wait until merged region have reference file 255 long timeout = EnvironmentEdgeManager.currentTime() + waitTime; 256 int newcount = 0; 257 while (EnvironmentEdgeManager.currentTime() < timeout) { 258 for (ColumnFamilyDescriptor colFamily : columnFamilies) { 259 newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size(); 260 } 261 if (newcount > count) { 262 break; 263 } 264 Thread.sleep(50); 265 } 266 assertTrue(newcount > count); 267 List<RegionServerThread> regionServerThreads = 268 TEST_UTIL.getHBaseCluster().getRegionServerThreads(); 269 for (RegionServerThread rs : regionServerThreads) { 270 CompactedHFilesDischarger cleaner = 271 new CompactedHFilesDischarger(100, null, rs.getRegionServer(), false); 272 cleaner.chore(); 273 Thread.sleep(1000); 274 } 275 while (EnvironmentEdgeManager.currentTime() < timeout) { 276 int newcount1 = 0; 277 for (ColumnFamilyDescriptor colFamily : columnFamilies) { 278 newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size(); 279 } 280 if (newcount1 <= 1) { 281 break; 282 } 283 Thread.sleep(50); 284 } 285 // run CatalogJanitor to clean merge references in hbase:meta and archive the 286 // files of merging regions 287 int cleaned = 0; 288 while (cleaned == 0) { 289 cleaned = ADMIN.runCatalogScan(); 290 LOG.debug("catalog janitor returned " + cleaned); 291 Thread.sleep(50); 292 // Cleanup is async so wait till all procedures are done running. 293 ProcedureTestingUtility.waitNoProcedureRunning( 294 TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor()); 295 } 296 // We used to check for existence of region in fs but sometimes the region dir was 297 // cleaned up by the time we got here making the test sometimes flakey. 298 assertTrue(cleaned > 0); 299 300 // Wait around a bit to give stuff a chance to complete. 301 while (true) { 302 mergedRegionResult = MetaTableAccessor.getRegionResult(TEST_UTIL.getConnection(), 303 mergedRegionInfo.getRegionName()); 304 if (MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells())) { 305 LOG.info("Waiting on cleanup of merge columns {}", 306 Arrays.asList(mergedRegionResult.rawCells()).stream().map(c -> c.toString()) 307 .collect(Collectors.joining(","))); 308 Threads.sleep(50); 309 } else { 310 break; 311 } 312 } 313 assertFalse(MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells())); 314 } finally { 315 ADMIN.enableCatalogJanitor(true); 316 TEST_UTIL.deleteTable(tableName); 317 } 318 } 319 320 /** 321 * This test tests 1, merging region not online; 2, merging same two regions; 3, merging unknown 322 * regions. They are in one test case so that we don't have to create many tables, and these tests 323 * are simple. 324 */ 325 @Test 326 public void testMerge() throws Exception { 327 LOG.info("Starting " + name.getMethodName()); 328 final TableName tableName = TableName.valueOf(name.getMethodName()); 329 final Admin admin = TEST_UTIL.getAdmin(); 330 final int syncWaitTimeout = 10 * 60000; // 10min 331 332 try { 333 // Create table and load data. 334 Table table = createTableAndLoadData(MASTER, tableName); 335 AssignmentManager am = MASTER.getAssignmentManager(); 336 List<RegionInfo> regions = am.getRegionStates().getRegionsOfTable(tableName); 337 // Fake offline one region 338 RegionInfo a = regions.get(0); 339 RegionInfo b = regions.get(1); 340 am.unassign(b); 341 am.offlineRegion(b); 342 try { 343 // Merge offline region. Region a is offline here 344 admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false) 345 .get(syncWaitTimeout, TimeUnit.MILLISECONDS); 346 fail("Offline regions should not be able to merge"); 347 } catch (DoNotRetryRegionException ie) { 348 System.out.println(ie); 349 assertTrue(ie instanceof MergeRegionException); 350 } 351 352 try { 353 // Merge the same region: b and b. 354 FutureUtils 355 .get(admin.mergeRegionsAsync(b.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), true)); 356 fail("A region should not be able to merge with itself, even forcfully"); 357 } catch (IOException ie) { 358 assertTrue("Exception should mention regions not online", 359 StringUtils.stringifyException(ie).contains("region to itself") 360 && ie instanceof MergeRegionException); 361 } 362 363 try { 364 // Merge unknown regions 365 admin.mergeRegionsAsync(Bytes.toBytes("-f1"), Bytes.toBytes("-f2"), true); 366 fail("Unknown region could not be merged"); 367 } catch (IOException ie) { 368 assertTrue("UnknownRegionException should be thrown", ie instanceof UnknownRegionException); 369 } 370 table.close(); 371 } finally { 372 TEST_UTIL.deleteTable(tableName); 373 } 374 } 375 376 @Test 377 public void testMergeWithReplicas() throws Exception { 378 final TableName tableName = TableName.valueOf(name.getMethodName()); 379 try { 380 // Create table and load data. 381 Table table = createTableAndLoadData(MASTER, tableName, 5, 2); 382 List<Pair<RegionInfo, ServerName>> initialRegionToServers = 383 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName); 384 // Merge 1st and 2nd region 385 PairOfSameType<RegionInfo> mergedRegions = 386 mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 2, 5 * 2 - 2); 387 List<Pair<RegionInfo, ServerName>> currentRegionToServers = 388 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName); 389 List<RegionInfo> initialRegions = new ArrayList<>(); 390 for (Pair<RegionInfo, ServerName> p : initialRegionToServers) { 391 initialRegions.add(p.getFirst()); 392 } 393 List<RegionInfo> currentRegions = new ArrayList<>(); 394 for (Pair<RegionInfo, ServerName> p : currentRegionToServers) { 395 currentRegions.add(p.getFirst()); 396 } 397 // this is the first region 398 assertTrue(initialRegions.contains(mergedRegions.getFirst())); 399 // this is the replica of the first region 400 assertTrue(initialRegions 401 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1))); 402 // this is the second region 403 assertTrue(initialRegions.contains(mergedRegions.getSecond())); 404 // this is the replica of the second region 405 assertTrue(initialRegions 406 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1))); 407 // this is the new region 408 assertTrue(!initialRegions.contains(currentRegions.get(0))); 409 // replica of the new region 410 assertTrue(!initialRegions 411 .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); 412 // replica of the new region 413 assertTrue(currentRegions 414 .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); 415 // replica of the merged region 416 assertTrue(!currentRegions 417 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1))); 418 // replica of the merged region 419 assertTrue(!currentRegions 420 .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1))); 421 table.close(); 422 } finally { 423 TEST_UTIL.deleteTable(tableName); 424 } 425 } 426 427 private PairOfSameType<RegionInfo> mergeRegionsAndVerifyRegionNum(HMaster master, 428 TableName tablename, int regionAnum, int regionBnum, int expectedRegionNum) throws Exception { 429 PairOfSameType<RegionInfo> mergedRegions = 430 requestMergeRegion(master, tablename, regionAnum, regionBnum); 431 waitAndVerifyRegionNum(master, tablename, expectedRegionNum); 432 return mergedRegions; 433 } 434 435 private PairOfSameType<RegionInfo> requestMergeRegion(HMaster master, TableName tablename, 436 int regionAnum, int regionBnum) throws Exception { 437 List<Pair<RegionInfo, ServerName>> tableRegions = 438 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 439 RegionInfo regionA = tableRegions.get(regionAnum).getFirst(); 440 RegionInfo regionB = tableRegions.get(regionBnum).getFirst(); 441 ADMIN.mergeRegionsAsync(regionA.getEncodedNameAsBytes(), regionB.getEncodedNameAsBytes(), 442 false); 443 return new PairOfSameType<>(regionA, regionB); 444 } 445 446 private void waitAndVerifyRegionNum(HMaster master, TableName tablename, int expectedRegionNum) 447 throws Exception { 448 List<Pair<RegionInfo, ServerName>> tableRegionsInMeta; 449 List<RegionInfo> tableRegionsInMaster; 450 long timeout = EnvironmentEdgeManager.currentTime() + waitTime; 451 while (EnvironmentEdgeManager.currentTime() < timeout) { 452 tableRegionsInMeta = 453 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 454 tableRegionsInMaster = 455 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tablename); 456 LOG.info(Objects.toString(tableRegionsInMaster)); 457 LOG.info(Objects.toString(tableRegionsInMeta)); 458 int tableRegionsInMetaSize = tableRegionsInMeta.size(); 459 int tableRegionsInMasterSize = tableRegionsInMaster.size(); 460 if ( 461 tableRegionsInMetaSize == expectedRegionNum && tableRegionsInMasterSize == expectedRegionNum 462 ) { 463 break; 464 } 465 Thread.sleep(250); 466 } 467 468 tableRegionsInMeta = 469 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 470 LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta)); 471 assertEquals(expectedRegionNum, tableRegionsInMeta.size()); 472 } 473 474 private Table createTableAndLoadData(HMaster master, TableName tablename) throws Exception { 475 return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1); 476 } 477 478 private Table createTableAndLoadData(HMaster master, TableName tablename, int numRegions, 479 int replication) throws Exception { 480 assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions); 481 byte[][] splitRows = new byte[numRegions - 1][]; 482 for (int i = 0; i < splitRows.length; i++) { 483 splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions]; 484 } 485 486 Table table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows); 487 LOG.info("Created " + table.getName()); 488 if (replication > 1) { 489 HBaseTestingUtility.setReplicas(ADMIN, tablename, replication); 490 LOG.info("Set replication of " + replication + " on " + table.getName()); 491 } 492 loadData(table); 493 LOG.info("Loaded " + table.getName()); 494 verifyRowCount(table, ROWSIZE); 495 LOG.info("Verified " + table.getName()); 496 497 List<Pair<RegionInfo, ServerName>> tableRegions; 498 TEST_UTIL.waitUntilAllRegionsAssigned(tablename); 499 LOG.info("All regions assigned for table - " + table.getName()); 500 tableRegions = 501 MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename); 502 assertEquals("Wrong number of regions in table " + tablename, numRegions * replication, 503 tableRegions.size()); 504 LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions)); 505 assertEquals(numRegions * replication, tableRegions.size()); 506 return table; 507 } 508 509 private static byte[][] makeN(byte[] base, int n) { 510 byte[][] ret = new byte[n][]; 511 for (int i = 0; i < n; i++) { 512 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); 513 } 514 return ret; 515 } 516 517 private void loadData(Table table) throws IOException { 518 for (int i = 0; i < ROWSIZE; i++) { 519 Put put = new Put(ROWS[i]); 520 put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(i)); 521 table.put(put); 522 } 523 } 524 525 private void verifyRowCount(Table table, int expectedRegionNum) throws IOException { 526 ResultScanner scanner = table.getScanner(new Scan()); 527 int rowCount = 0; 528 while (scanner.next() != null) { 529 rowCount++; 530 } 531 assertEquals(expectedRegionNum, rowCount); 532 scanner.close(); 533 } 534 535 // Make it public so that JVMClusterUtil can access it. 536 public static class MyMaster extends HMaster { 537 public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { 538 super(conf); 539 } 540 541 @Override 542 protected RSRpcServices createRpcServices() throws IOException { 543 return new MyMasterRpcServices(this); 544 } 545 } 546 547 static class MyMasterRpcServices extends MasterRpcServices { 548 static AtomicBoolean enabled = new AtomicBoolean(false); 549 550 private HMaster myMaster; 551 552 public MyMasterRpcServices(HMaster master) throws IOException { 553 super(master); 554 myMaster = master; 555 } 556 557 @Override 558 public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c, 559 ReportRegionStateTransitionRequest req) throws ServiceException { 560 ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req); 561 if ( 562 enabled.get() && req.getTransition(0).getTransitionCode() == TransitionCode.READY_TO_MERGE 563 && !resp.hasErrorMessage() 564 ) { 565 RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates(); 566 for (RegionState regionState : regionStates.getRegionsStateInTransition()) { 567 // Find the merging_new region and remove it 568 if (regionState.isMergingNew()) { 569 regionStates.deleteRegion(regionState.getRegion()); 570 } 571 } 572 } 573 return resp; 574 } 575 } 576}