001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotEquals; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.stream.Collectors; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.TableNotFoundException; 041import org.apache.hadoop.hbase.master.HMaster; 042import org.apache.hadoop.hbase.master.RegionState; 043import org.apache.hadoop.hbase.master.ServerManager; 044import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 045import org.apache.hadoop.hbase.master.assignment.RegionStates; 046import org.apache.hadoop.hbase.regionserver.HRegionServer; 047import org.apache.hadoop.hbase.regionserver.Region; 048import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 049import org.apache.hadoop.hbase.testclassification.ClientTests; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.JVMClusterUtil; 054import org.apache.hadoop.hbase.util.Threads; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.runner.RunWith; 059import org.junit.runners.Parameterized; 060 061/** 062 * Class to test asynchronous region admin operations. 063 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our 064 * ten minute timeout so they were split. 065 */ 066@RunWith(Parameterized.class) 067@Category({ LargeTests.class, ClientTests.class }) 068public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class); 072 073 @Test 074 public void testAssignRegionAndUnassignRegion() throws Exception { 075 createTableWithDefaultConf(tableName); 076 077 // assign region. 078 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 079 AssignmentManager am = master.getAssignmentManager(); 080 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0); 081 082 // assert region on server 083 RegionStates regionStates = am.getRegionStates(); 084 ServerName serverName = regionStates.getRegionServerOfRegion(hri); 085 TEST_UTIL.assertRegionOnServer(hri, serverName, 200); 086 assertTrue(regionStates.getRegionState(hri).isOpened()); 087 088 // Region is assigned now. Let's assign it again. 089 // Master should not abort, and region should stay assigned. 090 try { 091 admin.assign(hri.getRegionName()).get(); 092 fail("Should fail when assigning an already onlined region"); 093 } catch (ExecutionException e) { 094 // Expected 095 assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class)); 096 } 097 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition()); 098 assertTrue(regionStates.getRegionState(hri).isOpened()); 099 100 // unassign region 101 admin.unassign(hri.getRegionName(), true).get(); 102 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition()); 103 assertTrue(regionStates.getRegionState(hri).isClosed()); 104 } 105 106 RegionInfo createTableAndGetOneRegion(final TableName tableName) 107 throws IOException, InterruptedException, ExecutionException { 108 TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) 109 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 110 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get(); 111 112 // wait till the table is assigned 113 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 114 long timeoutTime = EnvironmentEdgeManager.currentTime() + 3000; 115 while (true) { 116 List<RegionInfo> regions = 117 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); 118 if (regions.size() > 3) { 119 return regions.get(2); 120 } 121 long now = EnvironmentEdgeManager.currentTime(); 122 if (now > timeoutTime) { 123 fail("Could not find an online region"); 124 } 125 Thread.sleep(10); 126 } 127 } 128 129 @Test 130 public void testGetRegionByStateOfTable() throws Exception { 131 RegionInfo hri = createTableAndGetOneRegion(tableName); 132 133 RegionStates regionStates = 134 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 135 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN).stream() 136 .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 137 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom")) 138 .get(RegionState.State.OPEN).stream() 139 .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 140 } 141 142 @Test 143 public void testMoveRegion() throws Exception { 144 admin.balancerSwitch(false).join(); 145 146 RegionInfo hri = createTableAndGetOneRegion(tableName); 147 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin(); 148 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 149 150 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 151 ServerManager serverManager = master.getServerManager(); 152 ServerName destServerName = null; 153 List<JVMClusterUtil.RegionServerThread> regionServers = 154 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads(); 155 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) { 156 HRegionServer destServer = regionServer.getRegionServer(); 157 destServerName = destServer.getServerName(); 158 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) { 159 break; 160 } 161 } 162 163 assertTrue(destServerName != null && !destServerName.equals(serverName)); 164 admin.move(hri.getRegionName(), destServerName).get(); 165 166 long timeoutTime = EnvironmentEdgeManager.currentTime() + 30000; 167 while (true) { 168 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 169 if (sn != null && sn.equals(destServerName)) { 170 break; 171 } 172 long now = EnvironmentEdgeManager.currentTime(); 173 if (now > timeoutTime) { 174 fail("Failed to move the region in time: " + hri); 175 } 176 Thread.sleep(100); 177 } 178 admin.balancerSwitch(true).join(); 179 } 180 181 @Test 182 public void testGetOnlineRegions() throws Exception { 183 createTableAndGetOneRegion(tableName); 184 AtomicInteger regionServerCount = new AtomicInteger(0); 185 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 186 .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> { 187 ServerName serverName = rs.getServerName(); 188 try { 189 assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size()); 190 } catch (Exception e) { 191 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); 192 } 193 regionServerCount.incrementAndGet(); 194 }); 195 assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(), 196 regionServerCount.get()); 197 } 198 199 @Test 200 public void testFlushTableAndRegion() throws Exception { 201 RegionInfo hri = createTableAndGetOneRegion(tableName); 202 ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() 203 .getRegionStates().getRegionServerOfRegion(hri); 204 HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 205 .map(rsThread -> rsThread.getRegionServer()) 206 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); 207 208 // write a put into the specific region 209 ASYNC_CONN.getTable(tableName) 210 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))).join(); 211 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 212 // flush region and wait flush operation finished. 213 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName())); 214 admin.flushRegion(hri.getRegionName()).get(); 215 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName())); 216 Threads.sleepWithoutInterrupt(500); 217 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 218 Threads.sleep(50); 219 } 220 // check the memstore. 221 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 222 223 // write another put into the specific region 224 ASYNC_CONN.getTable(tableName) 225 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))).join(); 226 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 227 admin.flush(tableName).get(); 228 Threads.sleepWithoutInterrupt(500); 229 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 230 Threads.sleep(50); 231 } 232 // check the memstore. 233 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 234 } 235 236 private void waitUntilMobCompactionFinished(TableName tableName) 237 throws ExecutionException, InterruptedException { 238 long finished = EnvironmentEdgeManager.currentTime() + 60000; 239 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 240 while (EnvironmentEdgeManager.currentTime() < finished) { 241 if (state == CompactionState.NONE) { 242 break; 243 } 244 Thread.sleep(10); 245 state = admin.getCompactionState(tableName, CompactType.MOB).get(); 246 } 247 assertEquals(CompactionState.NONE, state); 248 } 249 250 @Test 251 public void testCompactMob() throws Exception { 252 ColumnFamilyDescriptor columnDescriptor = ColumnFamilyDescriptorBuilder 253 .newBuilder(Bytes.toBytes("mob")).setMobEnabled(true).setMobThreshold(0).build(); 254 255 TableDescriptor tableDescriptor = 256 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnDescriptor).build(); 257 258 admin.createTable(tableDescriptor).get(); 259 260 byte[][] families = { Bytes.toBytes("mob") }; 261 loadData(tableName, families, 3000, 8); 262 263 admin.majorCompact(tableName).get(); 264 265 CompactionState state = admin.getCompactionState(tableName).get(); 266 assertNotEquals(CompactionState.NONE, state); 267 268 waitUntilMobCompactionFinished(tableName); 269 } 270 271 @Test 272 public void testCompactRegionServer() throws Exception { 273 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; 274 createTableWithDefaultConf(tableName, null, families); 275 loadData(tableName, families, 3000, 8); 276 277 List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 278 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); 279 List<Region> regions = new ArrayList<>(); 280 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName))); 281 assertEquals(1, regions.size()); 282 int countBefore = countStoreFilesInFamilies(regions, families); 283 assertTrue(countBefore > 0); 284 285 // Minor compaction for all region servers. 286 for (HRegionServer rs : rsList) 287 admin.compactRegionServer(rs.getServerName()).get(); 288 Thread.sleep(5000); 289 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); 290 assertTrue(countAfterMinorCompaction < countBefore); 291 292 // Major compaction for all region servers. 293 for (HRegionServer rs : rsList) 294 admin.majorCompactRegionServer(rs.getServerName()).get(); 295 Thread.sleep(5000); 296 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); 297 assertEquals(3, countAfterMajorCompaction); 298 } 299 300 @Test 301 public void testCompactionSwitchStates() throws Exception { 302 // Create a table with regions 303 byte[] family = Bytes.toBytes("family"); 304 byte[][] families = 305 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 306 createTableWithDefaultConf(tableName, null, families); 307 loadData(tableName, families, 3000, 8); 308 List<Region> regions = new ArrayList<>(); 309 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads() 310 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 311 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture = 312 admin.compactionSwitch(true, new ArrayList<>()); 313 Map<ServerName, Boolean> pairs = listCompletableFuture.get(); 314 for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) { 315 assertEquals("Default compaction state, expected=enabled actual=disabled", true, 316 p.getValue()); 317 } 318 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 = 319 admin.compactionSwitch(false, new ArrayList<>()); 320 Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get(); 321 for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) { 322 assertEquals("Last compaction state, expected=enabled actual=disabled", true, p.getValue()); 323 } 324 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 = 325 admin.compactionSwitch(true, new ArrayList<>()); 326 Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get(); 327 for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) { 328 assertEquals("Last compaction state, expected=disabled actual=enabled", false, p.getValue()); 329 } 330 ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); 331 List<String> serverNameList = new ArrayList<String>(); 332 serverNameList.add(serverName.getServerName()); 333 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 = 334 admin.compactionSwitch(false, serverNameList); 335 Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get(); 336 assertEquals(pairs3.entrySet().size(), 1); 337 for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) { 338 assertEquals("Last compaction state, expected=enabled actual=disabled", true, p.getValue()); 339 } 340 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 = 341 admin.compactionSwitch(true, serverNameList); 342 Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get(); 343 assertEquals(pairs4.entrySet().size(), 1); 344 for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) { 345 assertEquals("Last compaction state, expected=disabled actual=enabled", false, p.getValue()); 346 } 347 } 348 349 @Test 350 public void testCompact() throws Exception { 351 compactionTest(TableName.valueOf("testCompact1"), 15, CompactionState.MINOR, false); 352 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, true); 353 354 // For major compaction, set up a higher hbase.hstore.compaction.min to avoid 355 // minor compactions. It is a hack to avoid random delays introduced by Admins's 356 // updateConfiguration() method. 357 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 358 Configuration conf = thread.getRegionServer().getConfiguration(); 359 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 25); 360 }); 361 362 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, false); 363 compactionTest(TableName.valueOf("testCompact4"), 8, CompactionState.MAJOR, true); 364 365 // Restore to default 366 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 367 Configuration conf = thread.getRegionServer().getConfiguration(); 368 conf.unset(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY); 369 }); 370 } 371 372 private void compactionTest(final TableName tableName, final int flushes, 373 final CompactionState expectedState, boolean singleFamily) throws Exception { 374 // Create a table with regions 375 byte[] family = Bytes.toBytes("family"); 376 byte[][] families = 377 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 378 createTableWithDefaultConf(tableName, null, families); 379 380 byte[][] singleFamilyArray = { family }; 381 382 // When singleFamily is true, only load data for the family being tested. This is to avoid 383 // the case that while major compaction is going on for the family, minor compaction could 384 // happen for other families at the same time (Two compaction threads long/short), thus 385 // pollute the compaction and store file numbers for the region. 386 if (singleFamily) { 387 loadData(tableName, singleFamilyArray, 3000, flushes); 388 } else { 389 loadData(tableName, families, 3000, flushes); 390 } 391 392 List<Region> regions = new ArrayList<>(); 393 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads() 394 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 395 assertEquals(1, regions.size()); 396 397 int countBefore = countStoreFilesInFamilies(regions, families); 398 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family); 399 assertTrue(countBefore > 0); // there should be some data files 400 if (expectedState == CompactionState.MINOR) { 401 if (singleFamily) { 402 admin.compact(tableName, family).get(); 403 } else { 404 admin.compact(tableName).get(); 405 } 406 } else { 407 if (singleFamily) { 408 admin.majorCompact(tableName, family).get(); 409 } else { 410 admin.majorCompact(tableName).get(); 411 } 412 } 413 414 long curt = EnvironmentEdgeManager.currentTime(); 415 long waitTime = 10000; 416 long endt = curt + waitTime; 417 CompactionState state = admin.getCompactionState(tableName).get(); 418 while (state == CompactionState.NONE && curt < endt) { 419 Thread.sleep(1); 420 state = admin.getCompactionState(tableName).get(); 421 curt = EnvironmentEdgeManager.currentTime(); 422 } 423 // Now, should have the right compaction state, 424 // otherwise, the compaction should have already been done 425 if (expectedState != state) { 426 for (Region region : regions) { 427 state = CompactionState.valueOf(region.getCompactionState().toString()); 428 assertEquals(CompactionState.NONE, state); 429 } 430 } else { 431 // Wait until the compaction is done 432 state = admin.getCompactionState(tableName).get(); 433 while (state != CompactionState.NONE && curt < endt) { 434 Thread.sleep(10); 435 state = admin.getCompactionState(tableName).get(); 436 } 437 // Now, compaction should be done. 438 assertEquals(CompactionState.NONE, state); 439 } 440 441 int countAfter = countStoreFilesInFamilies(regions, families); 442 int countAfterSingleFamily = countStoreFilesInFamily(regions, family); 443 assertTrue(countAfter < countBefore); 444 if (!singleFamily) { 445 if (expectedState == CompactionState.MAJOR) { 446 assertEquals(families.length, countAfter); 447 } else { 448 assertTrue(families.length <= countAfter); 449 } 450 } else { 451 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; 452 // assert only change was to single column family 453 assertEquals(singleFamDiff, (countBefore - countAfter)); 454 if (expectedState == CompactionState.MAJOR) { 455 assertEquals(1, countAfterSingleFamily); 456 } else { 457 assertTrue("" + countAfterSingleFamily, 1 <= countAfterSingleFamily); 458 } 459 } 460 } 461 462 @Test 463 public void testNonExistentTableCompaction() { 464 testNonExistentTableCompaction(CompactionState.MINOR); 465 testNonExistentTableCompaction(CompactionState.MAJOR); 466 } 467 468 private void testNonExistentTableCompaction(CompactionState compactionState) { 469 try { 470 if (compactionState == CompactionState.MINOR) { 471 admin.compact(TableName.valueOf("NonExistentTable")).get(); 472 } else { 473 admin.majorCompact(TableName.valueOf("NonExistentTable")).get(); 474 } 475 fail("Expected TableNotFoundException when table doesn't exist"); 476 } catch (Exception e) { 477 // expected. 478 assertTrue(e.getCause() instanceof TableNotFoundException); 479 } 480 } 481 482 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) { 483 return countStoreFilesInFamilies(regions, new byte[][] { family }); 484 } 485 486 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) { 487 int count = 0; 488 for (Region region : regions) { 489 count += region.getStoreFileList(families).size(); 490 } 491 return count; 492 } 493 494 static void loadData(final TableName tableName, final byte[][] families, final int rows) 495 throws IOException { 496 loadData(tableName, families, rows, 1); 497 } 498 499 static void loadData(final TableName tableName, final byte[][] families, final int rows, 500 final int flushes) throws IOException { 501 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 502 List<Put> puts = new ArrayList<>(rows); 503 byte[] qualifier = Bytes.toBytes("val"); 504 for (int i = 0; i < flushes; i++) { 505 for (int k = 0; k < rows; k++) { 506 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i)); 507 Put p = new Put(row); 508 for (int j = 0; j < families.length; ++j) { 509 p.addColumn(families[j], qualifier, row); 510 } 511 puts.add(p); 512 } 513 table.putAll(puts).join(); 514 TEST_UTIL.flush(); 515 puts.clear(); 516 } 517 } 518}