001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotEquals; 024import static org.junit.Assert.assertThat; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.Map; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.stream.Collectors; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.TableNotFoundException; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.master.RegionState; 042import org.apache.hadoop.hbase.master.ServerManager; 043import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 044import org.apache.hadoop.hbase.master.assignment.RegionStates; 045import org.apache.hadoop.hbase.regionserver.HRegionServer; 046import org.apache.hadoop.hbase.regionserver.Region; 047import org.apache.hadoop.hbase.testclassification.ClientTests; 048import org.apache.hadoop.hbase.testclassification.LargeTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.JVMClusterUtil; 052import org.apache.hadoop.hbase.util.Threads; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.runner.RunWith; 057import org.junit.runners.Parameterized; 058 059/** 060 * Class to test asynchronous region admin operations. 061 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our 062 * ten minute timeout so they were split. 063 */ 064@RunWith(Parameterized.class) 065@Category({ LargeTests.class, ClientTests.class }) 066public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class); 070 071 @Test 072 public void testAssignRegionAndUnassignRegion() throws Exception { 073 createTableWithDefaultConf(tableName); 074 075 // assign region. 076 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 077 AssignmentManager am = master.getAssignmentManager(); 078 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0); 079 080 // assert region on server 081 RegionStates regionStates = am.getRegionStates(); 082 ServerName serverName = regionStates.getRegionServerOfRegion(hri); 083 TEST_UTIL.assertRegionOnServer(hri, serverName, 200); 084 assertTrue(regionStates.getRegionState(hri).isOpened()); 085 086 // Region is assigned now. Let's assign it again. 087 // Master should not abort, and region should stay assigned. 088 try { 089 admin.assign(hri.getRegionName()).get(); 090 fail("Should fail when assigning an already onlined region"); 091 } catch (ExecutionException e) { 092 // Expected 093 assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class)); 094 } 095 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition()); 096 assertTrue(regionStates.getRegionState(hri).isOpened()); 097 098 // unassign region 099 admin.unassign(hri.getRegionName(), true).get(); 100 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition()); 101 assertTrue(regionStates.getRegionState(hri).isClosed()); 102 } 103 104 RegionInfo createTableAndGetOneRegion(final TableName tableName) 105 throws IOException, InterruptedException, ExecutionException { 106 TableDescriptor desc = 107 TableDescriptorBuilder.newBuilder(tableName) 108 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 109 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get(); 110 111 // wait till the table is assigned 112 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 113 long timeoutTime = System.currentTimeMillis() + 3000; 114 while (true) { 115 List<RegionInfo> regions = 116 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); 117 if (regions.size() > 3) { 118 return regions.get(2); 119 } 120 long now = System.currentTimeMillis(); 121 if (now > timeoutTime) { 122 fail("Could not find an online region"); 123 } 124 Thread.sleep(10); 125 } 126 } 127 128 @Test 129 public void testGetRegionByStateOfTable() throws Exception { 130 RegionInfo hri = createTableAndGetOneRegion(tableName); 131 132 RegionStates regionStates = 133 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 134 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN) 135 .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 136 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom")) 137 .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 138 } 139 140 @Test 141 public void testMoveRegion() throws Exception { 142 admin.balancerSwitch(false).join(); 143 144 RegionInfo hri = createTableAndGetOneRegion(tableName); 145 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin(); 146 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 147 148 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 149 ServerManager serverManager = master.getServerManager(); 150 ServerName destServerName = null; 151 List<JVMClusterUtil.RegionServerThread> regionServers = 152 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads(); 153 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) { 154 HRegionServer destServer = regionServer.getRegionServer(); 155 destServerName = destServer.getServerName(); 156 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) { 157 break; 158 } 159 } 160 161 assertTrue(destServerName != null && !destServerName.equals(serverName)); 162 admin.move(hri.getRegionName(), destServerName).get(); 163 164 long timeoutTime = System.currentTimeMillis() + 30000; 165 while (true) { 166 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 167 if (sn != null && sn.equals(destServerName)) { 168 break; 169 } 170 long now = System.currentTimeMillis(); 171 if (now > timeoutTime) { 172 fail("Failed to move the region in time: " + hri); 173 } 174 Thread.sleep(100); 175 } 176 admin.balancerSwitch(true).join(); 177 } 178 179 @Test 180 public void testGetOnlineRegions() throws Exception { 181 createTableAndGetOneRegion(tableName); 182 AtomicInteger regionServerCount = new AtomicInteger(0); 183 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 184 .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> { 185 ServerName serverName = rs.getServerName(); 186 try { 187 assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size()); 188 } catch (Exception e) { 189 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); 190 } 191 regionServerCount.incrementAndGet(); 192 }); 193 assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(), 194 regionServerCount.get()); 195 } 196 197 @Test 198 public void testFlushTableAndRegion() throws Exception { 199 RegionInfo hri = createTableAndGetOneRegion(tableName); 200 ServerName serverName = 201 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() 202 .getRegionServerOfRegion(hri); 203 HRegionServer regionServer = 204 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 205 .map(rsThread -> rsThread.getRegionServer()) 206 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); 207 208 // write a put into the specific region 209 ASYNC_CONN.getTable(tableName) 210 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))) 211 .join(); 212 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 213 // flush region and wait flush operation finished. 214 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName())); 215 admin.flushRegion(hri.getRegionName()).get(); 216 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName())); 217 Threads.sleepWithoutInterrupt(500); 218 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 219 Threads.sleep(50); 220 } 221 // check the memstore. 222 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 223 224 // write another put into the specific region 225 ASYNC_CONN.getTable(tableName) 226 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))) 227 .join(); 228 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 229 admin.flush(tableName).get(); 230 Threads.sleepWithoutInterrupt(500); 231 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 232 Threads.sleep(50); 233 } 234 // check the memstore. 235 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 236 } 237 238 private void waitUntilMobCompactionFinished(TableName tableName) 239 throws ExecutionException, InterruptedException { 240 long finished = EnvironmentEdgeManager.currentTime() + 60000; 241 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 242 while (EnvironmentEdgeManager.currentTime() < finished) { 243 if (state == CompactionState.NONE) { 244 break; 245 } 246 Thread.sleep(10); 247 state = admin.getCompactionState(tableName, CompactType.MOB).get(); 248 } 249 assertEquals(CompactionState.NONE, state); 250 } 251 252 @Test 253 public void testCompactMob() throws Exception { 254 ColumnFamilyDescriptor columnDescriptor = 255 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob")) 256 .setMobEnabled(true).setMobThreshold(0).build(); 257 258 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 259 .setColumnFamily(columnDescriptor).build(); 260 261 admin.createTable(tableDescriptor).get(); 262 263 byte[][] families = { Bytes.toBytes("mob") }; 264 loadData(tableName, families, 3000, 8); 265 266 admin.majorCompact(tableName, CompactType.MOB).get(); 267 268 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 269 assertNotEquals(CompactionState.NONE, state); 270 271 waitUntilMobCompactionFinished(tableName); 272 } 273 274 @Test 275 public void testCompactRegionServer() throws Exception { 276 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; 277 createTableWithDefaultConf(tableName, null, families); 278 loadData(tableName, families, 3000, 8); 279 280 List<HRegionServer> rsList = 281 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 282 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); 283 List<Region> regions = new ArrayList<>(); 284 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName))); 285 assertEquals(1, regions.size()); 286 int countBefore = countStoreFilesInFamilies(regions, families); 287 assertTrue(countBefore > 0); 288 289 // Minor compaction for all region servers. 290 for (HRegionServer rs : rsList) 291 admin.compactRegionServer(rs.getServerName()).get(); 292 Thread.sleep(5000); 293 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); 294 assertTrue(countAfterMinorCompaction < countBefore); 295 296 // Major compaction for all region servers. 297 for (HRegionServer rs : rsList) 298 admin.majorCompactRegionServer(rs.getServerName()).get(); 299 Thread.sleep(5000); 300 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); 301 assertEquals(3, countAfterMajorCompaction); 302 } 303 304 @Test 305 public void testCompactionSwitchStates() throws Exception { 306 // Create a table with regions 307 byte[] family = Bytes.toBytes("family"); 308 byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")), 309 Bytes.add(family, Bytes.toBytes("3"))}; 310 createTableWithDefaultConf(tableName, null, families); 311 loadData(tableName, families, 3000, 8); 312 List<Region> regions = new ArrayList<>(); 313 TEST_UTIL 314 .getHBaseCluster() 315 .getLiveRegionServerThreads() 316 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 317 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture = 318 admin.compactionSwitch(true, new ArrayList<>()); 319 Map<ServerName, Boolean> pairs = listCompletableFuture.get(); 320 for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) { 321 assertEquals("Default compaction state, expected=enabled actual=disabled", 322 true, p.getValue()); 323 } 324 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 = 325 admin.compactionSwitch(false, new ArrayList<>()); 326 Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get(); 327 for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) { 328 assertEquals("Last compaction state, expected=enabled actual=disabled", 329 true, p.getValue()); 330 } 331 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 = 332 admin.compactionSwitch(true, new ArrayList<>()); 333 Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get(); 334 for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) { 335 assertEquals("Last compaction state, expected=disabled actual=enabled", 336 false, p.getValue()); 337 } 338 ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0) 339 .getServerName(); 340 List<String> serverNameList = new ArrayList<String>(); 341 serverNameList.add(serverName.getServerName()); 342 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 = 343 admin.compactionSwitch(false, serverNameList); 344 Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get(); 345 assertEquals(pairs3.entrySet().size(), 1); 346 for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) { 347 assertEquals("Last compaction state, expected=enabled actual=disabled", 348 true, p.getValue()); 349 } 350 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 = 351 admin.compactionSwitch(true, serverNameList); 352 Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get(); 353 assertEquals(pairs4.entrySet().size(), 1); 354 for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) { 355 assertEquals("Last compaction state, expected=disabled actual=enabled", 356 false, p.getValue()); 357 } 358 } 359 360 @Test 361 public void testCompact() throws Exception { 362 compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false); 363 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false); 364 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true); 365 compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true); 366 } 367 368 private void compactionTest(final TableName tableName, final int flushes, 369 final CompactionState expectedState, boolean singleFamily) throws Exception { 370 // Create a table with regions 371 byte[] family = Bytes.toBytes("family"); 372 byte[][] families = 373 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 374 createTableWithDefaultConf(tableName, null, families); 375 loadData(tableName, families, 3000, flushes); 376 377 List<Region> regions = new ArrayList<>(); 378 TEST_UTIL 379 .getHBaseCluster() 380 .getLiveRegionServerThreads() 381 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 382 assertEquals(1, regions.size()); 383 384 int countBefore = countStoreFilesInFamilies(regions, families); 385 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family); 386 assertTrue(countBefore > 0); // there should be some data files 387 if (expectedState == CompactionState.MINOR) { 388 if (singleFamily) { 389 admin.compact(tableName, family).get(); 390 } else { 391 admin.compact(tableName).get(); 392 } 393 } else { 394 if (singleFamily) { 395 admin.majorCompact(tableName, family).get(); 396 } else { 397 admin.majorCompact(tableName).get(); 398 } 399 } 400 401 long curt = System.currentTimeMillis(); 402 long waitTime = 5000; 403 long endt = curt + waitTime; 404 CompactionState state = admin.getCompactionState(tableName).get(); 405 while (state == CompactionState.NONE && curt < endt) { 406 Thread.sleep(10); 407 state = admin.getCompactionState(tableName).get(); 408 curt = System.currentTimeMillis(); 409 } 410 // Now, should have the right compaction state, 411 // otherwise, the compaction should have already been done 412 if (expectedState != state) { 413 for (Region region : regions) { 414 state = CompactionState.valueOf(region.getCompactionState().toString()); 415 assertEquals(CompactionState.NONE, state); 416 } 417 } else { 418 // Wait until the compaction is done 419 state = admin.getCompactionState(tableName).get(); 420 while (state != CompactionState.NONE && curt < endt) { 421 Thread.sleep(10); 422 state = admin.getCompactionState(tableName).get(); 423 } 424 // Now, compaction should be done. 425 assertEquals(CompactionState.NONE, state); 426 } 427 428 int countAfter = countStoreFilesInFamilies(regions, families); 429 int countAfterSingleFamily = countStoreFilesInFamily(regions, family); 430 assertTrue(countAfter < countBefore); 431 if (!singleFamily) { 432 if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter); 433 else assertTrue(families.length < countAfter); 434 } else { 435 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; 436 // assert only change was to single column family 437 assertTrue(singleFamDiff == (countBefore - countAfter)); 438 if (expectedState == CompactionState.MAJOR) { 439 assertTrue(1 == countAfterSingleFamily); 440 } else { 441 assertTrue(1 < countAfterSingleFamily); 442 } 443 } 444 } 445 446 @Test 447 public void testNonExistentTableCompaction() { 448 testNonExistentTableCompaction(CompactionState.MINOR); 449 testNonExistentTableCompaction(CompactionState.MAJOR); 450 } 451 452 private void testNonExistentTableCompaction(CompactionState compactionState) { 453 try { 454 if (compactionState == CompactionState.MINOR) { 455 admin.compact(TableName.valueOf("NonExistentTable")).get(); 456 } else { 457 admin.majorCompact(TableName.valueOf("NonExistentTable")).get(); 458 } 459 fail("Expected TableNotFoundException when table doesn't exist"); 460 } catch (Exception e) { 461 // expected. 462 assertTrue(e.getCause() instanceof TableNotFoundException); 463 } 464 } 465 466 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) { 467 return countStoreFilesInFamilies(regions, new byte[][] { family }); 468 } 469 470 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) { 471 int count = 0; 472 for (Region region : regions) { 473 count += region.getStoreFileList(families).size(); 474 } 475 return count; 476 } 477 478 static void loadData(final TableName tableName, final byte[][] families, final int rows) 479 throws IOException { 480 loadData(tableName, families, rows, 1); 481 } 482 483 static void loadData(final TableName tableName, final byte[][] families, final int rows, 484 final int flushes) throws IOException { 485 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 486 List<Put> puts = new ArrayList<>(rows); 487 byte[] qualifier = Bytes.toBytes("val"); 488 for (int i = 0; i < flushes; i++) { 489 for (int k = 0; k < rows; k++) { 490 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i)); 491 Put p = new Put(row); 492 for (int j = 0; j < families.length; ++j) { 493 p.addColumn(families[j], qualifier, row); 494 } 495 puts.add(p); 496 } 497 table.putAll(puts).join(); 498 TEST_UTIL.flush(); 499 puts.clear(); 500 } 501 } 502}