001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.stream.Collectors; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.master.HMaster; 036import org.apache.hadoop.hbase.master.RegionState; 037import org.apache.hadoop.hbase.master.ServerManager; 038import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 039import org.apache.hadoop.hbase.master.assignment.RegionStates; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.regionserver.Region; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.util.JVMClusterUtil; 047import org.apache.hadoop.hbase.util.Threads; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.runner.RunWith; 052import org.junit.runners.Parameterized; 053 054/** 055 * Class to test asynchronous region admin operations. 056 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our 057 * ten minute timeout so they were split. 058 */ 059@RunWith(Parameterized.class) 060@Category({ LargeTests.class, ClientTests.class }) 061public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class); 065 066 @Test 067 public void testAssignRegionAndUnassignRegion() throws Exception { 068 createTableWithDefaultConf(tableName); 069 070 // assign region. 071 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 072 AssignmentManager am = master.getAssignmentManager(); 073 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0); 074 075 // assert region on server 076 RegionStates regionStates = am.getRegionStates(); 077 ServerName serverName = regionStates.getRegionServerOfRegion(hri); 078 TEST_UTIL.assertRegionOnServer(hri, serverName, 200); 079 assertTrue(regionStates.getRegionState(hri).isOpened()); 080 081 // Region is assigned now. Let's assign it again. 082 // Master should not abort, and region should stay assigned. 083 admin.assign(hri.getRegionName()).get(); 084 assertTrue(regionStates.getRegionState(hri).isOpened()); 085 086 // unassign region 087 admin.unassign(hri.getRegionName(), true).get(); 088 assertTrue(regionStates.getRegionState(hri).isClosed()); 089 } 090 091 RegionInfo createTableAndGetOneRegion(final TableName tableName) 092 throws IOException, InterruptedException, ExecutionException { 093 TableDescriptor desc = 094 TableDescriptorBuilder.newBuilder(tableName) 095 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 096 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get(); 097 098 // wait till the table is assigned 099 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 100 long timeoutTime = System.currentTimeMillis() + 3000; 101 while (true) { 102 List<RegionInfo> regions = 103 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); 104 if (regions.size() > 3) { 105 return regions.get(2); 106 } 107 long now = System.currentTimeMillis(); 108 if (now > timeoutTime) { 109 fail("Could not find an online region"); 110 } 111 Thread.sleep(10); 112 } 113 } 114 115 @Test 116 public void testGetRegionByStateOfTable() throws Exception { 117 RegionInfo hri = createTableAndGetOneRegion(tableName); 118 119 RegionStates regionStates = 120 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 121 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN) 122 .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 123 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom")) 124 .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)); 125 } 126 127 @Test 128 public void testMoveRegion() throws Exception { 129 admin.balancerSwitch(false).join(); 130 131 RegionInfo hri = createTableAndGetOneRegion(tableName); 132 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin(); 133 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 134 135 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 136 ServerManager serverManager = master.getServerManager(); 137 ServerName destServerName = null; 138 List<JVMClusterUtil.RegionServerThread> regionServers = 139 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads(); 140 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) { 141 HRegionServer destServer = regionServer.getRegionServer(); 142 destServerName = destServer.getServerName(); 143 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) { 144 break; 145 } 146 } 147 148 assertTrue(destServerName != null && !destServerName.equals(serverName)); 149 admin.move(hri.getRegionName(), destServerName).get(); 150 151 long timeoutTime = System.currentTimeMillis() + 30000; 152 while (true) { 153 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName(); 154 if (sn != null && sn.equals(destServerName)) { 155 break; 156 } 157 long now = System.currentTimeMillis(); 158 if (now > timeoutTime) { 159 fail("Failed to move the region in time: " + hri); 160 } 161 Thread.sleep(100); 162 } 163 admin.balancerSwitch(true).join(); 164 } 165 166 @Test 167 public void testGetOnlineRegions() throws Exception { 168 createTableAndGetOneRegion(tableName); 169 AtomicInteger regionServerCount = new AtomicInteger(0); 170 TEST_UTIL 171 .getHBaseCluster() 172 .getLiveRegionServerThreads() 173 .stream() 174 .map(rsThread -> rsThread.getRegionServer()) 175 .forEach( 176 rs -> { 177 ServerName serverName = rs.getServerName(); 178 try { 179 assertEquals(admin.getRegions(serverName).get().size(), rs 180 .getRegions().size()); 181 } catch (Exception e) { 182 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); 183 } 184 regionServerCount.incrementAndGet(); 185 }); 186 assertEquals(2, regionServerCount.get()); 187 } 188 189 @Test 190 public void testFlushTableAndRegion() throws Exception { 191 RegionInfo hri = createTableAndGetOneRegion(tableName); 192 ServerName serverName = 193 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() 194 .getRegionServerOfRegion(hri); 195 HRegionServer regionServer = 196 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 197 .map(rsThread -> rsThread.getRegionServer()) 198 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); 199 200 // write a put into the specific region 201 ASYNC_CONN.getTable(tableName) 202 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))) 203 .join(); 204 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 205 // flush region and wait flush operation finished. 206 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName())); 207 admin.flushRegion(hri.getRegionName()).get(); 208 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName())); 209 Threads.sleepWithoutInterrupt(500); 210 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 211 Threads.sleep(50); 212 } 213 // check the memstore. 214 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 215 216 // write another put into the specific region 217 ASYNC_CONN.getTable(tableName) 218 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))) 219 .join(); 220 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0); 221 admin.flush(tableName).get(); 222 Threads.sleepWithoutInterrupt(500); 223 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) { 224 Threads.sleep(50); 225 } 226 // check the memstore. 227 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0); 228 } 229 230 private void waitUntilMobCompactionFinished(TableName tableName) 231 throws ExecutionException, InterruptedException { 232 long finished = EnvironmentEdgeManager.currentTime() + 60000; 233 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 234 while (EnvironmentEdgeManager.currentTime() < finished) { 235 if (state == CompactionState.NONE) { 236 break; 237 } 238 Thread.sleep(10); 239 state = admin.getCompactionState(tableName, CompactType.MOB).get(); 240 } 241 assertEquals(CompactionState.NONE, state); 242 } 243 244 @Test 245 public void testCompactMob() throws Exception { 246 ColumnFamilyDescriptor columnDescriptor = 247 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob")) 248 .setMobEnabled(true).setMobThreshold(0).build(); 249 250 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 251 .setColumnFamily(columnDescriptor).build(); 252 253 admin.createTable(tableDescriptor).get(); 254 255 byte[][] families = { Bytes.toBytes("mob") }; 256 loadData(tableName, families, 3000, 8); 257 258 admin.majorCompact(tableName, CompactType.MOB).get(); 259 260 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); 261 assertNotEquals(CompactionState.NONE, state); 262 263 waitUntilMobCompactionFinished(tableName); 264 } 265 266 @Test 267 public void testCompactRegionServer() throws Exception { 268 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; 269 createTableWithDefaultConf(tableName, null, families); 270 loadData(tableName, families, 3000, 8); 271 272 List<HRegionServer> rsList = 273 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 274 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); 275 List<Region> regions = new ArrayList<>(); 276 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName))); 277 assertEquals(1, regions.size()); 278 int countBefore = countStoreFilesInFamilies(regions, families); 279 assertTrue(countBefore > 0); 280 281 // Minor compaction for all region servers. 282 for (HRegionServer rs : rsList) 283 admin.compactRegionServer(rs.getServerName()).get(); 284 Thread.sleep(5000); 285 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); 286 assertTrue(countAfterMinorCompaction < countBefore); 287 288 // Major compaction for all region servers. 289 for (HRegionServer rs : rsList) 290 admin.majorCompactRegionServer(rs.getServerName()).get(); 291 Thread.sleep(5000); 292 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); 293 assertEquals(3, countAfterMajorCompaction); 294 } 295 296 @Test 297 public void testCompact() throws Exception { 298 compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false); 299 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false); 300 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true); 301 compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true); 302 } 303 304 private void compactionTest(final TableName tableName, final int flushes, 305 final CompactionState expectedState, boolean singleFamily) throws Exception { 306 // Create a table with regions 307 byte[] family = Bytes.toBytes("family"); 308 byte[][] families = 309 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; 310 createTableWithDefaultConf(tableName, null, families); 311 loadData(tableName, families, 3000, flushes); 312 313 List<Region> regions = new ArrayList<>(); 314 TEST_UTIL 315 .getHBaseCluster() 316 .getLiveRegionServerThreads() 317 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); 318 assertEquals(1, regions.size()); 319 320 int countBefore = countStoreFilesInFamilies(regions, families); 321 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family); 322 assertTrue(countBefore > 0); // there should be some data files 323 if (expectedState == CompactionState.MINOR) { 324 if (singleFamily) { 325 admin.compact(tableName, family).get(); 326 } else { 327 admin.compact(tableName).get(); 328 } 329 } else { 330 if (singleFamily) { 331 admin.majorCompact(tableName, family).get(); 332 } else { 333 admin.majorCompact(tableName).get(); 334 } 335 } 336 337 long curt = System.currentTimeMillis(); 338 long waitTime = 5000; 339 long endt = curt + waitTime; 340 CompactionState state = admin.getCompactionState(tableName).get(); 341 while (state == CompactionState.NONE && curt < endt) { 342 Thread.sleep(10); 343 state = admin.getCompactionState(tableName).get(); 344 curt = System.currentTimeMillis(); 345 } 346 // Now, should have the right compaction state, 347 // otherwise, the compaction should have already been done 348 if (expectedState != state) { 349 for (Region region : regions) { 350 state = CompactionState.valueOf(region.getCompactionState().toString()); 351 assertEquals(CompactionState.NONE, state); 352 } 353 } else { 354 // Wait until the compaction is done 355 state = admin.getCompactionState(tableName).get(); 356 while (state != CompactionState.NONE && curt < endt) { 357 Thread.sleep(10); 358 state = admin.getCompactionState(tableName).get(); 359 } 360 // Now, compaction should be done. 361 assertEquals(CompactionState.NONE, state); 362 } 363 364 int countAfter = countStoreFilesInFamilies(regions, families); 365 int countAfterSingleFamily = countStoreFilesInFamily(regions, family); 366 assertTrue(countAfter < countBefore); 367 if (!singleFamily) { 368 if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter); 369 else assertTrue(families.length < countAfter); 370 } else { 371 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; 372 // assert only change was to single column family 373 assertTrue(singleFamDiff == (countBefore - countAfter)); 374 if (expectedState == CompactionState.MAJOR) { 375 assertTrue(1 == countAfterSingleFamily); 376 } else { 377 assertTrue(1 < countAfterSingleFamily); 378 } 379 } 380 } 381 382 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) { 383 return countStoreFilesInFamilies(regions, new byte[][] { family }); 384 } 385 386 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) { 387 int count = 0; 388 for (Region region : regions) { 389 count += region.getStoreFileList(families).size(); 390 } 391 return count; 392 } 393 394 static void loadData(final TableName tableName, final byte[][] families, final int rows) 395 throws IOException { 396 loadData(tableName, families, rows, 1); 397 } 398 399 static void loadData(final TableName tableName, final byte[][] families, final int rows, 400 final int flushes) throws IOException { 401 AsyncTable<?> table = ASYNC_CONN.getTable(tableName); 402 List<Put> puts = new ArrayList<>(rows); 403 byte[] qualifier = Bytes.toBytes("val"); 404 for (int i = 0; i < flushes; i++) { 405 for (int k = 0; k < rows; k++) { 406 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i)); 407 Put p = new Put(row); 408 for (int j = 0; j < families.length; ++j) { 409 p.addColumn(families[j], qualifier, row); 410 } 411 puts.add(p); 412 } 413 table.putAll(puts).join(); 414 TEST_UTIL.flush(); 415 puts.clear(); 416 } 417 } 418}