001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotEquals;
024import static org.junit.Assert.assertThat;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.Map;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.stream.Collectors;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.master.HMaster;
041import org.apache.hadoop.hbase.master.RegionState;
042import org.apache.hadoop.hbase.master.ServerManager;
043import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
044import org.apache.hadoop.hbase.master.assignment.RegionStates;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
046import org.apache.hadoop.hbase.regionserver.HRegionServer;
047import org.apache.hadoop.hbase.regionserver.Region;
048import org.apache.hadoop.hbase.testclassification.ClientTests;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.JVMClusterUtil;
053import org.apache.hadoop.hbase.util.Threads;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.runner.RunWith;
058import org.junit.runners.Parameterized;
059
060/**
061 * Class to test asynchronous region admin operations.
062 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
063 * ten minute timeout so they were split.
064 */
065@RunWith(Parameterized.class)
066@Category({ LargeTests.class, ClientTests.class })
067public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class);
071
072  @Test
073  public void testAssignRegionAndUnassignRegion() throws Exception {
074    createTableWithDefaultConf(tableName);
075
076    // assign region.
077    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
078    AssignmentManager am = master.getAssignmentManager();
079    RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
080
081    // assert region on server
082    RegionStates regionStates = am.getRegionStates();
083    ServerName serverName = regionStates.getRegionServerOfRegion(hri);
084    TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
085    assertTrue(regionStates.getRegionState(hri).isOpened());
086
087    // Region is assigned now. Let's assign it again.
088    // Master should not abort, and region should stay assigned.
089    try {
090      admin.assign(hri.getRegionName()).get();
091      fail("Should fail when assigning an already onlined region");
092    } catch (ExecutionException e) {
093      // Expected
094      assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class));
095    }
096    assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
097    assertTrue(regionStates.getRegionState(hri).isOpened());
098
099    // unassign region
100    admin.unassign(hri.getRegionName(), true).get();
101    assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
102    assertTrue(regionStates.getRegionState(hri).isClosed());
103  }
104
105  RegionInfo createTableAndGetOneRegion(final TableName tableName)
106      throws IOException, InterruptedException, ExecutionException {
107    TableDescriptor desc =
108        TableDescriptorBuilder.newBuilder(tableName)
109            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
110    admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
111
112    // wait till the table is assigned
113    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
114    long timeoutTime = System.currentTimeMillis() + 3000;
115    while (true) {
116      List<RegionInfo> regions =
117          master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
118      if (regions.size() > 3) {
119        return regions.get(2);
120      }
121      long now = System.currentTimeMillis();
122      if (now > timeoutTime) {
123        fail("Could not find an online region");
124      }
125      Thread.sleep(10);
126    }
127  }
128
129  @Test
130  public void testGetRegionByStateOfTable() throws Exception {
131    RegionInfo hri = createTableAndGetOneRegion(tableName);
132
133    RegionStates regionStates =
134        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
135    assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
136        .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
137    assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
138        .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
139  }
140
141  @Test
142  public void testMoveRegion() throws Exception {
143    admin.balancerSwitch(false).join();
144
145    RegionInfo hri = createTableAndGetOneRegion(tableName);
146    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
147    ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
148
149    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
150    ServerManager serverManager = master.getServerManager();
151    ServerName destServerName = null;
152    List<JVMClusterUtil.RegionServerThread> regionServers =
153        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
154    for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
155      HRegionServer destServer = regionServer.getRegionServer();
156      destServerName = destServer.getServerName();
157      if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
158        break;
159      }
160    }
161
162    assertTrue(destServerName != null && !destServerName.equals(serverName));
163    admin.move(hri.getRegionName(), destServerName).get();
164
165    long timeoutTime = System.currentTimeMillis() + 30000;
166    while (true) {
167      ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
168      if (sn != null && sn.equals(destServerName)) {
169        break;
170      }
171      long now = System.currentTimeMillis();
172      if (now > timeoutTime) {
173        fail("Failed to move the region in time: " + hri);
174      }
175      Thread.sleep(100);
176    }
177    admin.balancerSwitch(true).join();
178  }
179
180  @Test
181  public void testGetOnlineRegions() throws Exception {
182    createTableAndGetOneRegion(tableName);
183    AtomicInteger regionServerCount = new AtomicInteger(0);
184    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
185      .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> {
186        ServerName serverName = rs.getServerName();
187        try {
188          assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size());
189        } catch (Exception e) {
190          fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
191        }
192        regionServerCount.incrementAndGet();
193      });
194    assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(),
195      regionServerCount.get());
196  }
197
198  @Test
199  public void testFlushTableAndRegion() throws Exception {
200    RegionInfo hri = createTableAndGetOneRegion(tableName);
201    ServerName serverName =
202        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
203            .getRegionServerOfRegion(hri);
204    HRegionServer regionServer =
205        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
206            .map(rsThread -> rsThread.getRegionServer())
207            .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
208
209    // write a put into the specific region
210    ASYNC_CONN.getTable(tableName)
211        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
212        .join();
213    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
214    // flush region and wait flush operation finished.
215    LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
216    admin.flushRegion(hri.getRegionName()).get();
217    LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
218    Threads.sleepWithoutInterrupt(500);
219    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
220      Threads.sleep(50);
221    }
222    // check the memstore.
223    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
224
225    // write another put into the specific region
226    ASYNC_CONN.getTable(tableName)
227        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
228        .join();
229    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
230    admin.flush(tableName).get();
231    Threads.sleepWithoutInterrupt(500);
232    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
233      Threads.sleep(50);
234    }
235    // check the memstore.
236    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
237  }
238
239  private void waitUntilMobCompactionFinished(TableName tableName)
240      throws ExecutionException, InterruptedException {
241    long finished = EnvironmentEdgeManager.currentTime() + 60000;
242    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
243    while (EnvironmentEdgeManager.currentTime() < finished) {
244      if (state == CompactionState.NONE) {
245        break;
246      }
247      Thread.sleep(10);
248      state = admin.getCompactionState(tableName, CompactType.MOB).get();
249    }
250    assertEquals(CompactionState.NONE, state);
251  }
252
253  @Test
254  public void testCompactMob() throws Exception {
255    ColumnFamilyDescriptor columnDescriptor =
256        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
257            .setMobEnabled(true).setMobThreshold(0).build();
258
259    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
260        .setColumnFamily(columnDescriptor).build();
261
262    admin.createTable(tableDescriptor).get();
263
264    byte[][] families = { Bytes.toBytes("mob") };
265    loadData(tableName, families, 3000, 8);
266
267    admin.majorCompact(tableName, CompactType.MOB).get();
268
269    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
270    assertNotEquals(CompactionState.NONE, state);
271
272    waitUntilMobCompactionFinished(tableName);
273  }
274
275  @Test
276  public void testCompactRegionServer() throws Exception {
277    byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
278    createTableWithDefaultConf(tableName, null, families);
279    loadData(tableName, families, 3000, 8);
280
281    List<HRegionServer> rsList =
282        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
283            .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
284    List<Region> regions = new ArrayList<>();
285    rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
286    assertEquals(1, regions.size());
287    int countBefore = countStoreFilesInFamilies(regions, families);
288    assertTrue(countBefore > 0);
289
290    // Minor compaction for all region servers.
291    for (HRegionServer rs : rsList)
292      admin.compactRegionServer(rs.getServerName()).get();
293    Thread.sleep(5000);
294    int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
295    assertTrue(countAfterMinorCompaction < countBefore);
296
297    // Major compaction for all region servers.
298    for (HRegionServer rs : rsList)
299      admin.majorCompactRegionServer(rs.getServerName()).get();
300    Thread.sleep(5000);
301    int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
302    assertEquals(3, countAfterMajorCompaction);
303  }
304
305  @Test
306  public void testCompactionSwitchStates() throws Exception {
307    // Create a table with regions
308    byte[] family = Bytes.toBytes("family");
309    byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")),
310        Bytes.add(family, Bytes.toBytes("3"))};
311    createTableWithDefaultConf(tableName, null, families);
312    loadData(tableName, families, 3000, 8);
313    List<Region> regions = new ArrayList<>();
314    TEST_UTIL
315        .getHBaseCluster()
316        .getLiveRegionServerThreads()
317        .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
318    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture =
319        admin.compactionSwitch(true, new ArrayList<>());
320    Map<ServerName, Boolean> pairs = listCompletableFuture.get();
321    for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) {
322      assertEquals("Default compaction state, expected=enabled actual=disabled",
323          true, p.getValue());
324    }
325    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 =
326        admin.compactionSwitch(false, new ArrayList<>());
327    Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get();
328    for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) {
329      assertEquals("Last compaction state, expected=enabled actual=disabled",
330          true, p.getValue());
331    }
332    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 =
333        admin.compactionSwitch(true, new ArrayList<>());
334    Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get();
335    for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) {
336      assertEquals("Last compaction state, expected=disabled actual=enabled",
337          false, p.getValue());
338    }
339    ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0)
340        .getServerName();
341    List<String> serverNameList = new ArrayList<String>();
342    serverNameList.add(serverName.getServerName());
343    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 =
344        admin.compactionSwitch(false, serverNameList);
345    Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get();
346    assertEquals(pairs3.entrySet().size(), 1);
347    for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) {
348      assertEquals("Last compaction state, expected=enabled actual=disabled",
349          true, p.getValue());
350    }
351    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 =
352        admin.compactionSwitch(true, serverNameList);
353    Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get();
354    assertEquals(pairs4.entrySet().size(), 1);
355    for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) {
356      assertEquals("Last compaction state, expected=disabled actual=enabled",
357          false, p.getValue());
358    }
359  }
360
361  @Test
362  public void testCompact() throws Exception {
363    compactionTest(TableName.valueOf("testCompact1"), 15, CompactionState.MINOR, false);
364    compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, true);
365
366    // For major compaction, set up a higher hbase.hstore.compaction.min to avoid
367    // minor compactions. It is a hack to avoid random delays introduced by Admins's
368    // updateConfiguration() method.
369    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
370      Configuration conf = thread.getRegionServer().getConfiguration();
371      conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 25);
372    });
373
374    compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, false);
375    compactionTest(TableName.valueOf("testCompact4"), 8, CompactionState.MAJOR, true);
376
377    // Restore to default
378    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
379      Configuration conf = thread.getRegionServer().getConfiguration();
380      conf.unset(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY);
381    });
382  }
383
384  private void compactionTest(final TableName tableName, final int flushes,
385      final CompactionState expectedState, boolean singleFamily) throws Exception {
386    // Create a table with regions
387    byte[] family = Bytes.toBytes("family");
388    byte[][] families =
389        { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
390    createTableWithDefaultConf(tableName, null, families);
391
392    byte[][] singleFamilyArray = { family };
393
394    // When singleFamily is true, only load data for the family being tested. This is to avoid
395    // the case that while major compaction is going on for the family, minor compaction could
396    // happen for other families at the same time (Two compaction threads long/short), thus
397    // pollute the compaction and store file numbers for the region.
398    if (singleFamily) {
399      loadData(tableName, singleFamilyArray, 3000, flushes);
400    } else {
401      loadData(tableName, families, 3000, flushes);
402    }
403
404    List<Region> regions = new ArrayList<>();
405    TEST_UTIL
406        .getHBaseCluster()
407        .getLiveRegionServerThreads()
408        .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
409    assertEquals(1, regions.size());
410
411    int countBefore = countStoreFilesInFamilies(regions, families);
412    int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
413    assertTrue(countBefore > 0); // there should be some data files
414    if (expectedState == CompactionState.MINOR) {
415      if (singleFamily) {
416        admin.compact(tableName, family).get();
417      } else {
418        admin.compact(tableName).get();
419      }
420    } else {
421      if (singleFamily) {
422        admin.majorCompact(tableName, family).get();
423      } else {
424        admin.majorCompact(tableName).get();
425      }
426    }
427
428    long curt = System.currentTimeMillis();
429    long waitTime = 10000;
430    long endt = curt + waitTime;
431    CompactionState state = admin.getCompactionState(tableName).get();
432    while (state == CompactionState.NONE && curt < endt) {
433      Thread.sleep(1);
434      state = admin.getCompactionState(tableName).get();
435      curt = System.currentTimeMillis();
436    }
437    // Now, should have the right compaction state,
438    // otherwise, the compaction should have already been done
439    if (expectedState != state) {
440      for (Region region : regions) {
441        state = CompactionState.valueOf(region.getCompactionState().toString());
442        assertEquals(CompactionState.NONE, state);
443      }
444    } else {
445      // Wait until the compaction is done
446      state = admin.getCompactionState(tableName).get();
447      while (state != CompactionState.NONE && curt < endt) {
448        Thread.sleep(10);
449        state = admin.getCompactionState(tableName).get();
450      }
451      // Now, compaction should be done.
452      assertEquals(CompactionState.NONE, state);
453    }
454
455    int countAfter = countStoreFilesInFamilies(regions, families);
456    int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
457    assertTrue(countAfter < countBefore);
458    if (!singleFamily) {
459      if (expectedState == CompactionState.MAJOR) {
460        assertEquals(families.length, countAfter);
461      } else {
462        assertTrue(families.length <= countAfter);
463      }
464    } else {
465      int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
466      // assert only change was to single column family
467      assertEquals(singleFamDiff, (countBefore - countAfter));
468      if (expectedState == CompactionState.MAJOR) {
469        assertEquals(1, countAfterSingleFamily);
470      } else {
471        assertTrue("" + countAfterSingleFamily, 1 <= countAfterSingleFamily);
472      }
473    }
474  }
475
476  @Test
477  public void testNonExistentTableCompaction() {
478    testNonExistentTableCompaction(CompactionState.MINOR);
479    testNonExistentTableCompaction(CompactionState.MAJOR);
480  }
481
482  private void testNonExistentTableCompaction(CompactionState compactionState) {
483    try {
484      if (compactionState == CompactionState.MINOR) {
485        admin.compact(TableName.valueOf("NonExistentTable")).get();
486      } else {
487        admin.majorCompact(TableName.valueOf("NonExistentTable")).get();
488      }
489      fail("Expected TableNotFoundException when table doesn't exist");
490    } catch (Exception e) {
491      // expected.
492      assertTrue(e.getCause() instanceof TableNotFoundException);
493    }
494  }
495
496  private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
497    return countStoreFilesInFamilies(regions, new byte[][] { family });
498  }
499
500  private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
501    int count = 0;
502    for (Region region : regions) {
503      count += region.getStoreFileList(families).size();
504    }
505    return count;
506  }
507
508  static void loadData(final TableName tableName, final byte[][] families, final int rows)
509      throws IOException {
510    loadData(tableName, families, rows, 1);
511  }
512
513  static void loadData(final TableName tableName, final byte[][] families, final int rows,
514      final int flushes) throws IOException {
515    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
516    List<Put> puts = new ArrayList<>(rows);
517    byte[] qualifier = Bytes.toBytes("val");
518    for (int i = 0; i < flushes; i++) {
519      for (int k = 0; k < rows; k++) {
520        byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
521        Put p = new Put(row);
522        for (int j = 0; j < families.length; ++j) {
523          p.addColumn(families[j], qualifier, row);
524        }
525        puts.add(p);
526      }
527      table.putAll(puts).join();
528      TEST_UTIL.flush();
529      puts.clear();
530    }
531  }
532}