001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.stream.Collectors; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.TableNotFoundException; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.master.RegionState; 038import org.apache.hadoop.hbase.master.ServerManager; 039import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 040import org.apache.hadoop.hbase.master.assignment.RegionStates; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.JVMClusterUtil; 048import org.apache.hadoop.hbase.util.Threads; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.runner.RunWith; 053import org.junit.runners.Parameterized; 054 055/** 056 * Class to test asynchronous region admin operations. 057 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our 058 * ten minute timeout so they were split. 059 */ 060@RunWith(Parameterized.class) 061@Category({ LargeTests.class, ClientTests.class }) 062public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class); 066 067 @Test 068 public void testAssignRegionAndUnassignRegion() throws Exception { 069 createTableWithDefaultConf(tableName); 070 071 // assign region. 072 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 073 AssignmentManager am = master.getAssignmentManager(); 074 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0); 075 076 // assert region on server 077 RegionStates regionStates = am.getRegionStates(); 078 ServerName serverName = regionStates.getRegionServerOfRegion(hri); 079 TEST_UTIL.assertRegionOnServer(hri, serverName, 200); 080 assertTrue(regionStates.getRegionState(hri).isOpened()); 081 082 // Region is assigned now. Let's assign it again. 083 // Master should not abort, and region should stay assigned. 084 admin.assign(hri.getRegionName()).get(); 085 assertTrue(regionStates.getRegionState(hri).isOpened()); 086 087 // unassign region 088 admin.unassign(hri.getRegionName(), true).get(); 089 assertTrue(regionStates.getRegionState(hri).isClosed()); 090 } 091 092 RegionInfo createTableAndGetOneRegion(final TableName tableName) 093 throws IOException, InterruptedException, ExecutionException { 094 TableDescriptor desc = 095 TableDescriptorBuilder.newBuilder(tableName) 096 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 097 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get(); 098 099 // wait till the table is assigned 100 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 101 long timeoutTime = System.currentTimeMillis() + 3000; 102 while (true) { 103 List<RegionInfo> regions = 104 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); 105 if (regions.size() > 3) { 106 return regions.get(2); 107 } 108 long now = System.currentTimeMillis(); 109 if (now > timeoutTime) { 110 fail("Could not find an online region"); 111 } 112 Thread.sleep(10); 113 } 114 } 115 116 @Test 117 public void testGetRegionByStateOfTable() throws Exception { 118 RegionInfo hri = createTableAndGetOneRegion(tableName); 119 120 RegionStates regionStates = 121 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 122 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN) 123 .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 124 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom")) 125 .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 126 } 127 128 @Test 129 public void testMoveRegion() throws Exception { 130 admin.balancerSwitch(false).join(); 131 132 RegionInfo hri = createTableAndGetOneRegion(tableName); 133 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin(); 134 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 135 136 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 137 ServerManager serverManager = master.getServerManager(); 138 ServerName destServerName = null; 139 List<JVMClusterUtil.RegionServerThread> regionServers = 140 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads(); 141 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) { 142 HRegionServer destServer = regionServer.getRegionServer(); 143 destServerName = destServer.getServerName(); 144 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) { 145 break; 146 } 147 } 148 149 assertTrue(destServerName != null && !destServerName.equals(serverName)); 150 admin.move(hri.getRegionName(), destServerName).get(); 151 152 long timeoutTime = System.currentTimeMillis() + 30000; 153 while (true) { 154 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 155 if (sn != null && sn.equals(destServerName)) { 156 break; 157 } 158 long now = System.currentTimeMillis(); 159 if (now > timeoutTime) { 160 fail("Failed to move the region in time: " + hri); 161 } 162 Thread.sleep(100); 163 } 164 admin.balancerSwitch(true).join(); 165 } 166 167 @Test 168 public void testGetOnlineRegions() throws Exception { 169 createTableAndGetOneRegion(tableName); 170 AtomicInteger regionServerCount = new AtomicInteger(0); 171 TEST_UTIL 172 .getHBaseCluster() 173 .getLiveRegionServerThreads() 174 .stream() 175 .map(rsThread -> rsThread.getRegionServer()) 176 .forEach( 177 rs -> { 178 ServerName serverName = rs.getServerName(); 179 try { 180 assertEquals(admin.getRegions(serverName).get().size(), rs 181 .getRegions().size()); 182 } catch (Exception e) { 183 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); 184 } 185 regionServerCount.incrementAndGet(); 186 }); 187 assertEquals(2, regionServerCount.get()); 188 } 189 190 @Test 191 public void testFlushTableAndRegion() throws Exception { 192 RegionInfo hri = createTableAndGetOneRegion(tableName); 193 ServerName serverName = 194 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() 195 .getRegionServerOfRegion(hri); 196 HRegionServer regionServer = 197 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 198 .map(rsThread -> rsThread.getRegionServer()) 199 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); 200 201 // write a put into the specific region 202 ASYNC_CONN.getTable(tableName) 203 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))) 204 .join(); 205 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 206 // flush region and wait flush operation finished. 207 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName())); 208 admin.flushRegion(hri.getRegionName()).get(); 209 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName())); 210 Threads.sleepWithoutInterrupt(500); 211 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 212 Threads.sleep(50); 213 } 214 // check the memstore. 215 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 216 217 // write another put into the specific region 218 ASYNC_CONN.getTable(tableName) 219 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))) 220 .join(); 221 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 222 admin.flush(tableName).get(); 223 Threads.sleepWithoutInterrupt(500); 224 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 225 Threads.sleep(50); 226 } 227 // check the memstore. 228 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 229 } 230 231 private void waitUntilMobCompactionFinished(TableName tableName) 232 throws ExecutionException, InterruptedException { 233 long finished = EnvironmentEdgeManager.currentTime() + 60000; 234 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 235 while (EnvironmentEdgeManager.currentTime() < finished) { 236 if (state == CompactionState.NONE) { 237 break; 238 } 239 Thread.sleep(10); 240 state = admin.getCompactionState(tableName, CompactType.MOB).get(); 241 } 242 assertEquals(CompactionState.NONE, state); 243 } 244 245 @Test 246 public void testCompactMob() throws Exception { 247 ColumnFamilyDescriptor columnDescriptor = 248 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob")) 249 .setMobEnabled(true).setMobThreshold(0).build(); 250 251 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 252 .setColumnFamily(columnDescriptor).build(); 253 254 admin.createTable(tableDescriptor).get(); 255 256 byte[][] families = { Bytes.toBytes("mob") }; 257 loadData(tableName, families, 3000, 8); 258 259 admin.majorCompact(tableName, CompactType.MOB).get(); 260 261 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 262 assertNotEquals(CompactionState.NONE, state); 263 264 waitUntilMobCompactionFinished(tableName); 265 } 266 267 @Test 268 public void testCompactRegionServer() throws Exception { 269 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; 270 createTableWithDefaultConf(tableName, null, families); 271 loadData(tableName, families, 3000, 8); 272 273 List<HRegionServer> rsList = 274 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 275 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); 276 List<Region> regions = new ArrayList<>(); 277 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName))); 278 assertEquals(1, regions.size()); 279 int countBefore = countStoreFilesInFamilies(regions, families); 280 assertTrue(countBefore > 0); 281 282 // Minor compaction for all region servers. 283 for (HRegionServer rs : rsList) 284 admin.compactRegionServer(rs.getServerName()).get(); 285 Thread.sleep(5000); 286 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); 287 assertTrue(countAfterMinorCompaction < countBefore); 288 289 // Major compaction for all region servers. 290 for (HRegionServer rs : rsList) 291 admin.majorCompactRegionServer(rs.getServerName()).get(); 292 Thread.sleep(5000); 293 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); 294 assertEquals(3, countAfterMajorCompaction); 295 } 296 297 @Test 298 public void testCompact() throws Exception { 299 compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false); 300 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false); 301 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true); 302 compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true); 303 } 304 305 private void compactionTest(final TableName tableName, final int flushes, 306 final CompactionState expectedState, boolean singleFamily) throws Exception { 307 // Create a table with regions 308 byte[] family = Bytes.toBytes("family"); 309 byte[][] families = 310 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 311 createTableWithDefaultConf(tableName, null, families); 312 loadData(tableName, families, 3000, flushes); 313 314 List<Region> regions = new ArrayList<>(); 315 TEST_UTIL 316 .getHBaseCluster() 317 .getLiveRegionServerThreads() 318 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 319 assertEquals(1, regions.size()); 320 321 int countBefore = countStoreFilesInFamilies(regions, families); 322 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family); 323 assertTrue(countBefore > 0); // there should be some data files 324 if (expectedState == CompactionState.MINOR) { 325 if (singleFamily) { 326 admin.compact(tableName, family).get(); 327 } else { 328 admin.compact(tableName).get(); 329 } 330 } else { 331 if (singleFamily) { 332 admin.majorCompact(tableName, family).get(); 333 } else { 334 admin.majorCompact(tableName).get(); 335 } 336 } 337 338 long curt = System.currentTimeMillis(); 339 long waitTime = 5000; 340 long endt = curt + waitTime; 341 CompactionState state = admin.getCompactionState(tableName).get(); 342 while (state == CompactionState.NONE && curt < endt) { 343 Thread.sleep(10); 344 state = admin.getCompactionState(tableName).get(); 345 curt = System.currentTimeMillis(); 346 } 347 // Now, should have the right compaction state, 348 // otherwise, the compaction should have already been done 349 if (expectedState != state) { 350 for (Region region : regions) { 351 state = CompactionState.valueOf(region.getCompactionState().toString()); 352 assertEquals(CompactionState.NONE, state); 353 } 354 } else { 355 // Wait until the compaction is done 356 state = admin.getCompactionState(tableName).get(); 357 while (state != CompactionState.NONE && curt < endt) { 358 Thread.sleep(10); 359 state = admin.getCompactionState(tableName).get(); 360 } 361 // Now, compaction should be done. 362 assertEquals(CompactionState.NONE, state); 363 } 364 365 int countAfter = countStoreFilesInFamilies(regions, families); 366 int countAfterSingleFamily = countStoreFilesInFamily(regions, family); 367 assertTrue(countAfter < countBefore); 368 if (!singleFamily) { 369 if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter); 370 else assertTrue(families.length < countAfter); 371 } else { 372 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; 373 // assert only change was to single column family 374 assertTrue(singleFamDiff == (countBefore - countAfter)); 375 if (expectedState == CompactionState.MAJOR) { 376 assertTrue(1 == countAfterSingleFamily); 377 } else { 378 assertTrue(1 < countAfterSingleFamily); 379 } 380 } 381 } 382 383 @Test 384 public void testNonExistentTableCompaction() { 385 testNonExistentTableCompaction(CompactionState.MINOR); 386 testNonExistentTableCompaction(CompactionState.MAJOR); 387 } 388 389 private void testNonExistentTableCompaction(CompactionState compactionState) { 390 try { 391 if (compactionState == CompactionState.MINOR) { 392 admin.compact(TableName.valueOf("NonExistentTable")).get(); 393 } else { 394 admin.majorCompact(TableName.valueOf("NonExistentTable")).get(); 395 } 396 fail("Expected TableNotFoundException when table doesn't exist"); 397 } catch (Exception e) { 398 // expected. 399 assertTrue(e.getCause() instanceof TableNotFoundException); 400 } 401 } 402 403 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) { 404 return countStoreFilesInFamilies(regions, new byte[][] { family }); 405 } 406 407 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) { 408 int count = 0; 409 for (Region region : regions) { 410 count += region.getStoreFileList(families).size(); 411 } 412 return count; 413 } 414 415 static void loadData(final TableName tableName, final byte[][] families, final int rows) 416 throws IOException { 417 loadData(tableName, families, rows, 1); 418 } 419 420 static void loadData(final TableName tableName, final byte[][] families, final int rows, 421 final int flushes) throws IOException { 422 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 423 List<Put> puts = new ArrayList<>(rows); 424 byte[] qualifier = Bytes.toBytes("val"); 425 for (int i = 0; i < flushes; i++) { 426 for (int k = 0; k < rows; k++) { 427 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i)); 428 Put p = new Put(row); 429 for (int j = 0; j < families.length; ++j) { 430 p.addColumn(families[j], qualifier, row); 431 } 432 puts.add(p); 433 } 434 table.putAll(puts).join(); 435 TEST_UTIL.flush(); 436 puts.clear(); 437 } 438 } 439}