001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotEquals; 024import static org.junit.Assert.assertThat; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import java.util.Map; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.stream.Collectors; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.TableNotFoundException; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.master.RegionState; 042import org.apache.hadoop.hbase.master.ServerManager; 043import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 044import org.apache.hadoop.hbase.master.assignment.RegionStates; 045import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 046import org.apache.hadoop.hbase.regionserver.HRegionServer; 047import org.apache.hadoop.hbase.regionserver.Region; 048import org.apache.hadoop.hbase.testclassification.ClientTests; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.JVMClusterUtil; 053import org.apache.hadoop.hbase.util.Threads; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.runner.RunWith; 058import org.junit.runners.Parameterized; 059 060/** 061 * Class to test asynchronous region admin operations. 062 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our 063 * ten minute timeout so they were split. 064 */ 065@RunWith(Parameterized.class) 066@Category({ LargeTests.class, ClientTests.class }) 067public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class); 071 072 @Test 073 public void testAssignRegionAndUnassignRegion() throws Exception { 074 createTableWithDefaultConf(tableName); 075 076 // assign region. 077 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 078 AssignmentManager am = master.getAssignmentManager(); 079 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0); 080 081 // assert region on server 082 RegionStates regionStates = am.getRegionStates(); 083 ServerName serverName = regionStates.getRegionServerOfRegion(hri); 084 TEST_UTIL.assertRegionOnServer(hri, serverName, 200); 085 assertTrue(regionStates.getRegionState(hri).isOpened()); 086 087 // Region is assigned now. Let's assign it again. 088 // Master should not abort, and region should stay assigned. 089 try { 090 admin.assign(hri.getRegionName()).get(); 091 fail("Should fail when assigning an already onlined region"); 092 } catch (ExecutionException e) { 093 // Expected 094 assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class)); 095 } 096 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition()); 097 assertTrue(regionStates.getRegionState(hri).isOpened()); 098 099 // unassign region 100 admin.unassign(hri.getRegionName(), true).get(); 101 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition()); 102 assertTrue(regionStates.getRegionState(hri).isClosed()); 103 } 104 105 RegionInfo createTableAndGetOneRegion(final TableName tableName) 106 throws IOException, InterruptedException, ExecutionException { 107 TableDescriptor desc = 108 TableDescriptorBuilder.newBuilder(tableName) 109 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 110 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get(); 111 112 // wait till the table is assigned 113 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 114 long timeoutTime = System.currentTimeMillis() + 3000; 115 while (true) { 116 List<RegionInfo> regions = 117 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); 118 if (regions.size() > 3) { 119 return regions.get(2); 120 } 121 long now = System.currentTimeMillis(); 122 if (now > timeoutTime) { 123 fail("Could not find an online region"); 124 } 125 Thread.sleep(10); 126 } 127 } 128 129 @Test 130 public void testGetRegionByStateOfTable() throws Exception { 131 RegionInfo hri = createTableAndGetOneRegion(tableName); 132 133 RegionStates regionStates = 134 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 135 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN) 136 .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 137 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom")) 138 .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 139 } 140 141 @Test 142 public void testMoveRegion() throws Exception { 143 admin.balancerSwitch(false).join(); 144 145 RegionInfo hri = createTableAndGetOneRegion(tableName); 146 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin(); 147 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 148 149 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 150 ServerManager serverManager = master.getServerManager(); 151 ServerName destServerName = null; 152 List<JVMClusterUtil.RegionServerThread> regionServers = 153 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads(); 154 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) { 155 HRegionServer destServer = regionServer.getRegionServer(); 156 destServerName = destServer.getServerName(); 157 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) { 158 break; 159 } 160 } 161 162 assertTrue(destServerName != null && !destServerName.equals(serverName)); 163 admin.move(hri.getRegionName(), destServerName).get(); 164 165 long timeoutTime = System.currentTimeMillis() + 30000; 166 while (true) { 167 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 168 if (sn != null && sn.equals(destServerName)) { 169 break; 170 } 171 long now = System.currentTimeMillis(); 172 if (now > timeoutTime) { 173 fail("Failed to move the region in time: " + hri); 174 } 175 Thread.sleep(100); 176 } 177 admin.balancerSwitch(true).join(); 178 } 179 180 @Test 181 public void testGetOnlineRegions() throws Exception { 182 createTableAndGetOneRegion(tableName); 183 AtomicInteger regionServerCount = new AtomicInteger(0); 184 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 185 .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> { 186 ServerName serverName = rs.getServerName(); 187 try { 188 assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size()); 189 } catch (Exception e) { 190 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); 191 } 192 regionServerCount.incrementAndGet(); 193 }); 194 assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(), 195 regionServerCount.get()); 196 } 197 198 @Test 199 public void testFlushTableAndRegion() throws Exception { 200 RegionInfo hri = createTableAndGetOneRegion(tableName); 201 ServerName serverName = 202 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() 203 .getRegionServerOfRegion(hri); 204 HRegionServer regionServer = 205 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 206 .map(rsThread -> rsThread.getRegionServer()) 207 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); 208 209 // write a put into the specific region 210 ASYNC_CONN.getTable(tableName) 211 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))) 212 .join(); 213 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 214 // flush region and wait flush operation finished. 215 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName())); 216 admin.flushRegion(hri.getRegionName()).get(); 217 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName())); 218 Threads.sleepWithoutInterrupt(500); 219 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 220 Threads.sleep(50); 221 } 222 // check the memstore. 223 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 224 225 // write another put into the specific region 226 ASYNC_CONN.getTable(tableName) 227 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))) 228 .join(); 229 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 230 admin.flush(tableName).get(); 231 Threads.sleepWithoutInterrupt(500); 232 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 233 Threads.sleep(50); 234 } 235 // check the memstore. 236 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 237 } 238 239 private void waitUntilMobCompactionFinished(TableName tableName) 240 throws ExecutionException, InterruptedException { 241 long finished = EnvironmentEdgeManager.currentTime() + 60000; 242 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 243 while (EnvironmentEdgeManager.currentTime() < finished) { 244 if (state == CompactionState.NONE) { 245 break; 246 } 247 Thread.sleep(10); 248 state = admin.getCompactionState(tableName, CompactType.MOB).get(); 249 } 250 assertEquals(CompactionState.NONE, state); 251 } 252 253 @Test 254 public void testCompactMob() throws Exception { 255 ColumnFamilyDescriptor columnDescriptor = 256 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob")) 257 .setMobEnabled(true).setMobThreshold(0).build(); 258 259 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 260 .setColumnFamily(columnDescriptor).build(); 261 262 admin.createTable(tableDescriptor).get(); 263 264 byte[][] families = { Bytes.toBytes("mob") }; 265 loadData(tableName, families, 3000, 8); 266 267 admin.majorCompact(tableName, CompactType.MOB).get(); 268 269 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 270 assertNotEquals(CompactionState.NONE, state); 271 272 waitUntilMobCompactionFinished(tableName); 273 } 274 275 @Test 276 public void testCompactRegionServer() throws Exception { 277 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; 278 createTableWithDefaultConf(tableName, null, families); 279 loadData(tableName, families, 3000, 8); 280 281 List<HRegionServer> rsList = 282 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 283 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); 284 List<Region> regions = new ArrayList<>(); 285 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName))); 286 assertEquals(1, regions.size()); 287 int countBefore = countStoreFilesInFamilies(regions, families); 288 assertTrue(countBefore > 0); 289 290 // Minor compaction for all region servers. 291 for (HRegionServer rs : rsList) 292 admin.compactRegionServer(rs.getServerName()).get(); 293 Thread.sleep(5000); 294 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); 295 assertTrue(countAfterMinorCompaction < countBefore); 296 297 // Major compaction for all region servers. 298 for (HRegionServer rs : rsList) 299 admin.majorCompactRegionServer(rs.getServerName()).get(); 300 Thread.sleep(5000); 301 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); 302 assertEquals(3, countAfterMajorCompaction); 303 } 304 305 @Test 306 public void testCompactionSwitchStates() throws Exception { 307 // Create a table with regions 308 byte[] family = Bytes.toBytes("family"); 309 byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")), 310 Bytes.add(family, Bytes.toBytes("3"))}; 311 createTableWithDefaultConf(tableName, null, families); 312 loadData(tableName, families, 3000, 8); 313 List<Region> regions = new ArrayList<>(); 314 TEST_UTIL 315 .getHBaseCluster() 316 .getLiveRegionServerThreads() 317 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 318 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture = 319 admin.compactionSwitch(true, new ArrayList<>()); 320 Map<ServerName, Boolean> pairs = listCompletableFuture.get(); 321 for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) { 322 assertEquals("Default compaction state, expected=enabled actual=disabled", 323 true, p.getValue()); 324 } 325 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 = 326 admin.compactionSwitch(false, new ArrayList<>()); 327 Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get(); 328 for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) { 329 assertEquals("Last compaction state, expected=enabled actual=disabled", 330 true, p.getValue()); 331 } 332 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 = 333 admin.compactionSwitch(true, new ArrayList<>()); 334 Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get(); 335 for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) { 336 assertEquals("Last compaction state, expected=disabled actual=enabled", 337 false, p.getValue()); 338 } 339 ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0) 340 .getServerName(); 341 List<String> serverNameList = new ArrayList<String>(); 342 serverNameList.add(serverName.getServerName()); 343 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 = 344 admin.compactionSwitch(false, serverNameList); 345 Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get(); 346 assertEquals(pairs3.entrySet().size(), 1); 347 for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) { 348 assertEquals("Last compaction state, expected=enabled actual=disabled", 349 true, p.getValue()); 350 } 351 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 = 352 admin.compactionSwitch(true, serverNameList); 353 Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get(); 354 assertEquals(pairs4.entrySet().size(), 1); 355 for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) { 356 assertEquals("Last compaction state, expected=disabled actual=enabled", 357 false, p.getValue()); 358 } 359 } 360 361 @Test 362 public void testCompact() throws Exception { 363 compactionTest(TableName.valueOf("testCompact1"), 15, CompactionState.MINOR, false); 364 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, true); 365 366 // For major compaction, set up a higher hbase.hstore.compaction.min to avoid 367 // minor compactions. It is a hack to avoid random delays introduced by Admins's 368 // updateConfiguration() method. 369 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 370 Configuration conf = thread.getRegionServer().getConfiguration(); 371 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 25); 372 }); 373 374 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, false); 375 compactionTest(TableName.valueOf("testCompact4"), 8, CompactionState.MAJOR, true); 376 377 // Restore to default 378 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { 379 Configuration conf = thread.getRegionServer().getConfiguration(); 380 conf.unset(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY); 381 }); 382 } 383 384 private void compactionTest(final TableName tableName, final int flushes, 385 final CompactionState expectedState, boolean singleFamily) throws Exception { 386 // Create a table with regions 387 byte[] family = Bytes.toBytes("family"); 388 byte[][] families = 389 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 390 createTableWithDefaultConf(tableName, null, families); 391 392 byte[][] singleFamilyArray = { family }; 393 394 // When singleFamily is true, only load data for the family being tested. This is to avoid 395 // the case that while major compaction is going on for the family, minor compaction could 396 // happen for other families at the same time (Two compaction threads long/short), thus 397 // pollute the compaction and store file numbers for the region. 398 if (singleFamily) { 399 loadData(tableName, singleFamilyArray, 3000, flushes); 400 } else { 401 loadData(tableName, families, 3000, flushes); 402 } 403 404 List<Region> regions = new ArrayList<>(); 405 TEST_UTIL 406 .getHBaseCluster() 407 .getLiveRegionServerThreads() 408 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 409 assertEquals(1, regions.size()); 410 411 int countBefore = countStoreFilesInFamilies(regions, families); 412 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family); 413 assertTrue(countBefore > 0); // there should be some data files 414 if (expectedState == CompactionState.MINOR) { 415 if (singleFamily) { 416 admin.compact(tableName, family).get(); 417 } else { 418 admin.compact(tableName).get(); 419 } 420 } else { 421 if (singleFamily) { 422 admin.majorCompact(tableName, family).get(); 423 } else { 424 admin.majorCompact(tableName).get(); 425 } 426 } 427 428 long curt = System.currentTimeMillis(); 429 long waitTime = 10000; 430 long endt = curt + waitTime; 431 CompactionState state = admin.getCompactionState(tableName).get(); 432 while (state == CompactionState.NONE && curt < endt) { 433 Thread.sleep(1); 434 state = admin.getCompactionState(tableName).get(); 435 curt = System.currentTimeMillis(); 436 } 437 // Now, should have the right compaction state, 438 // otherwise, the compaction should have already been done 439 if (expectedState != state) { 440 for (Region region : regions) { 441 state = CompactionState.valueOf(region.getCompactionState().toString()); 442 assertEquals(CompactionState.NONE, state); 443 } 444 } else { 445 // Wait until the compaction is done 446 state = admin.getCompactionState(tableName).get(); 447 while (state != CompactionState.NONE && curt < endt) { 448 Thread.sleep(10); 449 state = admin.getCompactionState(tableName).get(); 450 } 451 // Now, compaction should be done. 452 assertEquals(CompactionState.NONE, state); 453 } 454 455 int countAfter = countStoreFilesInFamilies(regions, families); 456 int countAfterSingleFamily = countStoreFilesInFamily(regions, family); 457 assertTrue(countAfter < countBefore); 458 if (!singleFamily) { 459 if (expectedState == CompactionState.MAJOR) { 460 assertEquals(families.length, countAfter); 461 } else { 462 assertTrue(families.length <= countAfter); 463 } 464 } else { 465 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; 466 // assert only change was to single column family 467 assertEquals(singleFamDiff, (countBefore - countAfter)); 468 if (expectedState == CompactionState.MAJOR) { 469 assertEquals(1, countAfterSingleFamily); 470 } else { 471 assertTrue("" + countAfterSingleFamily, 1 <= countAfterSingleFamily); 472 } 473 } 474 } 475 476 @Test 477 public void testNonExistentTableCompaction() { 478 testNonExistentTableCompaction(CompactionState.MINOR); 479 testNonExistentTableCompaction(CompactionState.MAJOR); 480 } 481 482 private void testNonExistentTableCompaction(CompactionState compactionState) { 483 try { 484 if (compactionState == CompactionState.MINOR) { 485 admin.compact(TableName.valueOf("NonExistentTable")).get(); 486 } else { 487 admin.majorCompact(TableName.valueOf("NonExistentTable")).get(); 488 } 489 fail("Expected TableNotFoundException when table doesn't exist"); 490 } catch (Exception e) { 491 // expected. 492 assertTrue(e.getCause() instanceof TableNotFoundException); 493 } 494 } 495 496 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) { 497 return countStoreFilesInFamilies(regions, new byte[][] { family }); 498 } 499 500 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) { 501 int count = 0; 502 for (Region region : regions) { 503 count += region.getStoreFileList(families).size(); 504 } 505 return count; 506 } 507 508 static void loadData(final TableName tableName, final byte[][] families, final int rows) 509 throws IOException { 510 loadData(tableName, families, rows, 1); 511 } 512 513 static void loadData(final TableName tableName, final byte[][] families, final int rows, 514 final int flushes) throws IOException { 515 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 516 List<Put> puts = new ArrayList<>(rows); 517 byte[] qualifier = Bytes.toBytes("val"); 518 for (int i = 0; i < flushes; i++) { 519 for (int k = 0; k < rows; k++) { 520 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i)); 521 Put p = new Put(row); 522 for (int j = 0; j < families.length; ++j) { 523 p.addColumn(families[j], qualifier, row); 524 } 525 puts.add(p); 526 } 527 table.putAll(puts).join(); 528 TEST_UTIL.flush(); 529 puts.clear(); 530 } 531 } 532}