001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotEquals;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.stream.Collectors;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.TableNotFoundException;
041import org.apache.hadoop.hbase.master.HMaster;
042import org.apache.hadoop.hbase.master.RegionState;
043import org.apache.hadoop.hbase.master.ServerManager;
044import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
045import org.apache.hadoop.hbase.master.assignment.RegionStates;
046import org.apache.hadoop.hbase.regionserver.HRegionServer;
047import org.apache.hadoop.hbase.regionserver.Region;
048import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.JVMClusterUtil;
054import org.apache.hadoop.hbase.util.Threads;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.runner.RunWith;
059import org.junit.runners.Parameterized;
060
061/**
062 * Class to test asynchronous region admin operations.
063 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
064 *      ten minute timeout so they were split.
065 */
066@RunWith(Parameterized.class)
067@Category({ LargeTests.class, ClientTests.class })
068public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class);
072
073  @Test
074  public void testAssignRegionAndUnassignRegion() throws Exception {
075    createTableWithDefaultConf(tableName);
076
077    // assign region.
078    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
079    AssignmentManager am = master.getAssignmentManager();
080    RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
081
082    // assert region on server
083    RegionStates regionStates = am.getRegionStates();
084    ServerName serverName = regionStates.getRegionServerOfRegion(hri);
085    TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
086    assertTrue(regionStates.getRegionState(hri).isOpened());
087
088    // Region is assigned now. Let's assign it again.
089    // Master should not abort, and region should stay assigned.
090    try {
091      admin.assign(hri.getRegionName()).get();
092      fail("Should fail when assigning an already onlined region");
093    } catch (ExecutionException e) {
094      // Expected
095      assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class));
096    }
097    assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
098    assertTrue(regionStates.getRegionState(hri).isOpened());
099
100    // unassign region
101    admin.unassign(hri.getRegionName(), true).get();
102    assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
103    assertTrue(regionStates.getRegionState(hri).isClosed());
104  }
105
106  RegionInfo createTableAndGetOneRegion(final TableName tableName)
107    throws IOException, InterruptedException, ExecutionException {
108    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
109      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
110    admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
111
112    // wait till the table is assigned
113    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
114    long timeoutTime = EnvironmentEdgeManager.currentTime() + 3000;
115    while (true) {
116      List<RegionInfo> regions =
117        master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
118      if (regions.size() > 3) {
119        return regions.get(2);
120      }
121      long now = EnvironmentEdgeManager.currentTime();
122      if (now > timeoutTime) {
123        fail("Could not find an online region");
124      }
125      Thread.sleep(10);
126    }
127  }
128
129  @Test
130  public void testGetRegionByStateOfTable() throws Exception {
131    RegionInfo hri = createTableAndGetOneRegion(tableName);
132
133    RegionStates regionStates =
134      TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
135    assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN).stream()
136      .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
137    assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
138      .get(RegionState.State.OPEN).stream()
139      .anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
140  }
141
142  @Test
143  public void testMoveRegion() throws Exception {
144    admin.balancerSwitch(false).join();
145
146    RegionInfo hri = createTableAndGetOneRegion(tableName);
147    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
148    ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
149
150    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
151    ServerManager serverManager = master.getServerManager();
152    ServerName destServerName = null;
153    List<JVMClusterUtil.RegionServerThread> regionServers =
154      TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
155    for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
156      HRegionServer destServer = regionServer.getRegionServer();
157      destServerName = destServer.getServerName();
158      if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
159        break;
160      }
161    }
162
163    assertTrue(destServerName != null && !destServerName.equals(serverName));
164    admin.move(hri.getRegionName(), destServerName).get();
165
166    long timeoutTime = EnvironmentEdgeManager.currentTime() + 30000;
167    while (true) {
168      ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
169      if (sn != null && sn.equals(destServerName)) {
170        break;
171      }
172      long now = EnvironmentEdgeManager.currentTime();
173      if (now > timeoutTime) {
174        fail("Failed to move the region in time: " + hri);
175      }
176      Thread.sleep(100);
177    }
178    admin.balancerSwitch(true).join();
179  }
180
181  @Test
182  public void testGetOnlineRegions() throws Exception {
183    createTableAndGetOneRegion(tableName);
184    AtomicInteger regionServerCount = new AtomicInteger(0);
185    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
186      .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> {
187        ServerName serverName = rs.getServerName();
188        try {
189          assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size());
190        } catch (Exception e) {
191          fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
192        }
193        regionServerCount.incrementAndGet();
194      });
195    assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(),
196      regionServerCount.get());
197  }
198
199  @Test
200  public void testFlushTableAndRegion() throws Exception {
201    RegionInfo hri = createTableAndGetOneRegion(tableName);
202    ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
203      .getRegionStates().getRegionServerOfRegion(hri);
204    HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
205      .map(rsThread -> rsThread.getRegionServer())
206      .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
207
208    // write a put into the specific region
209    ASYNC_CONN.getTable(tableName)
210      .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))).join();
211    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
212    // flush region and wait flush operation finished.
213    LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
214    admin.flushRegion(hri.getRegionName()).get();
215    LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
216    Threads.sleepWithoutInterrupt(500);
217    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
218      Threads.sleep(50);
219    }
220    // check the memstore.
221    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
222
223    // write another put into the specific region
224    ASYNC_CONN.getTable(tableName)
225      .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))).join();
226    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
227    admin.flush(tableName).get();
228    Threads.sleepWithoutInterrupt(500);
229    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
230      Threads.sleep(50);
231    }
232    // check the memstore.
233    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
234  }
235
236  private void waitUntilMobCompactionFinished(TableName tableName)
237    throws ExecutionException, InterruptedException {
238    long finished = EnvironmentEdgeManager.currentTime() + 60000;
239    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
240    while (EnvironmentEdgeManager.currentTime() < finished) {
241      if (state == CompactionState.NONE) {
242        break;
243      }
244      Thread.sleep(10);
245      state = admin.getCompactionState(tableName, CompactType.MOB).get();
246    }
247    assertEquals(CompactionState.NONE, state);
248  }
249
250  @Test
251  public void testCompactMob() throws Exception {
252    ColumnFamilyDescriptor columnDescriptor = ColumnFamilyDescriptorBuilder
253      .newBuilder(Bytes.toBytes("mob")).setMobEnabled(true).setMobThreshold(0).build();
254
255    TableDescriptor tableDescriptor =
256      TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnDescriptor).build();
257
258    admin.createTable(tableDescriptor).get();
259
260    byte[][] families = { Bytes.toBytes("mob") };
261    loadData(tableName, families, 3000, 8);
262
263    admin.majorCompact(tableName).get();
264
265    CompactionState state = admin.getCompactionState(tableName).get();
266    assertNotEquals(CompactionState.NONE, state);
267
268    waitUntilMobCompactionFinished(tableName);
269  }
270
271  @Test
272  public void testCompactRegionServer() throws Exception {
273    byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
274    createTableWithDefaultConf(tableName, null, families);
275    loadData(tableName, families, 3000, 8);
276
277    List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
278      .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
279    List<Region> regions = new ArrayList<>();
280    rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
281    assertEquals(1, regions.size());
282    int countBefore = countStoreFilesInFamilies(regions, families);
283    assertTrue(countBefore > 0);
284
285    // Minor compaction for all region servers.
286    for (HRegionServer rs : rsList)
287      admin.compactRegionServer(rs.getServerName()).get();
288    Thread.sleep(5000);
289    int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
290    assertTrue(countAfterMinorCompaction < countBefore);
291
292    // Major compaction for all region servers.
293    for (HRegionServer rs : rsList)
294      admin.majorCompactRegionServer(rs.getServerName()).get();
295    Thread.sleep(5000);
296    int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
297    assertEquals(3, countAfterMajorCompaction);
298  }
299
300  @Test
301  public void testCompactionSwitchStates() throws Exception {
302    // Create a table with regions
303    byte[] family = Bytes.toBytes("family");
304    byte[][] families =
305      { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
306    createTableWithDefaultConf(tableName, null, families);
307    loadData(tableName, families, 3000, 8);
308    List<Region> regions = new ArrayList<>();
309    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
310      .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
311    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture =
312      admin.compactionSwitch(true, new ArrayList<>());
313    Map<ServerName, Boolean> pairs = listCompletableFuture.get();
314    for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) {
315      assertEquals("Default compaction state, expected=enabled actual=disabled", true,
316        p.getValue());
317    }
318    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 =
319      admin.compactionSwitch(false, new ArrayList<>());
320    Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get();
321    for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) {
322      assertEquals("Last compaction state, expected=enabled actual=disabled", true, p.getValue());
323    }
324    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 =
325      admin.compactionSwitch(true, new ArrayList<>());
326    Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get();
327    for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) {
328      assertEquals("Last compaction state, expected=disabled actual=enabled", false, p.getValue());
329    }
330    ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
331    List<String> serverNameList = new ArrayList<String>();
332    serverNameList.add(serverName.getServerName());
333    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 =
334      admin.compactionSwitch(false, serverNameList);
335    Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get();
336    assertEquals(pairs3.entrySet().size(), 1);
337    for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) {
338      assertEquals("Last compaction state, expected=enabled actual=disabled", true, p.getValue());
339    }
340    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 =
341      admin.compactionSwitch(true, serverNameList);
342    Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get();
343    assertEquals(pairs4.entrySet().size(), 1);
344    for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) {
345      assertEquals("Last compaction state, expected=disabled actual=enabled", false, p.getValue());
346    }
347  }
348
349  @Test
350  public void testCompact() throws Exception {
351    compactionTest(TableName.valueOf("testCompact1"), 15, CompactionState.MINOR, false);
352    compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, true);
353
354    // For major compaction, set up a higher hbase.hstore.compaction.min to avoid
355    // minor compactions. It is a hack to avoid random delays introduced by Admins's
356    // updateConfiguration() method.
357    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
358      Configuration conf = thread.getRegionServer().getConfiguration();
359      conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 25);
360    });
361
362    compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, false);
363    compactionTest(TableName.valueOf("testCompact4"), 8, CompactionState.MAJOR, true);
364
365    // Restore to default
366    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
367      Configuration conf = thread.getRegionServer().getConfiguration();
368      conf.unset(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY);
369    });
370  }
371
372  private void compactionTest(final TableName tableName, final int flushes,
373    final CompactionState expectedState, boolean singleFamily) throws Exception {
374    // Create a table with regions
375    byte[] family = Bytes.toBytes("family");
376    byte[][] families =
377      { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
378    createTableWithDefaultConf(tableName, null, families);
379
380    byte[][] singleFamilyArray = { family };
381
382    // When singleFamily is true, only load data for the family being tested. This is to avoid
383    // the case that while major compaction is going on for the family, minor compaction could
384    // happen for other families at the same time (Two compaction threads long/short), thus
385    // pollute the compaction and store file numbers for the region.
386    if (singleFamily) {
387      loadData(tableName, singleFamilyArray, 3000, flushes);
388    } else {
389      loadData(tableName, families, 3000, flushes);
390    }
391
392    List<Region> regions = new ArrayList<>();
393    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
394      .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
395    assertEquals(1, regions.size());
396
397    int countBefore = countStoreFilesInFamilies(regions, families);
398    int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
399    assertTrue(countBefore > 0); // there should be some data files
400    if (expectedState == CompactionState.MINOR) {
401      if (singleFamily) {
402        admin.compact(tableName, family).get();
403      } else {
404        admin.compact(tableName).get();
405      }
406    } else {
407      if (singleFamily) {
408        admin.majorCompact(tableName, family).get();
409      } else {
410        admin.majorCompact(tableName).get();
411      }
412    }
413
414    long curt = EnvironmentEdgeManager.currentTime();
415    long waitTime = 10000;
416    long endt = curt + waitTime;
417    CompactionState state = admin.getCompactionState(tableName).get();
418    while (state == CompactionState.NONE && curt < endt) {
419      Thread.sleep(1);
420      state = admin.getCompactionState(tableName).get();
421      curt = EnvironmentEdgeManager.currentTime();
422    }
423    // Now, should have the right compaction state,
424    // otherwise, the compaction should have already been done
425    if (expectedState != state) {
426      for (Region region : regions) {
427        state = CompactionState.valueOf(region.getCompactionState().toString());
428        assertEquals(CompactionState.NONE, state);
429      }
430    } else {
431      // Wait until the compaction is done
432      state = admin.getCompactionState(tableName).get();
433      while (state != CompactionState.NONE && curt < endt) {
434        Thread.sleep(10);
435        state = admin.getCompactionState(tableName).get();
436      }
437      // Now, compaction should be done.
438      assertEquals(CompactionState.NONE, state);
439    }
440
441    int countAfter = countStoreFilesInFamilies(regions, families);
442    int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
443    assertTrue(countAfter < countBefore);
444    if (!singleFamily) {
445      if (expectedState == CompactionState.MAJOR) {
446        assertEquals(families.length, countAfter);
447      } else {
448        assertTrue(families.length <= countAfter);
449      }
450    } else {
451      int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
452      // assert only change was to single column family
453      assertEquals(singleFamDiff, (countBefore - countAfter));
454      if (expectedState == CompactionState.MAJOR) {
455        assertEquals(1, countAfterSingleFamily);
456      } else {
457        assertTrue("" + countAfterSingleFamily, 1 <= countAfterSingleFamily);
458      }
459    }
460  }
461
462  @Test
463  public void testNonExistentTableCompaction() {
464    testNonExistentTableCompaction(CompactionState.MINOR);
465    testNonExistentTableCompaction(CompactionState.MAJOR);
466  }
467
468  private void testNonExistentTableCompaction(CompactionState compactionState) {
469    try {
470      if (compactionState == CompactionState.MINOR) {
471        admin.compact(TableName.valueOf("NonExistentTable")).get();
472      } else {
473        admin.majorCompact(TableName.valueOf("NonExistentTable")).get();
474      }
475      fail("Expected TableNotFoundException when table doesn't exist");
476    } catch (Exception e) {
477      // expected.
478      assertTrue(e.getCause() instanceof TableNotFoundException);
479    }
480  }
481
482  private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
483    return countStoreFilesInFamilies(regions, new byte[][] { family });
484  }
485
486  private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
487    int count = 0;
488    for (Region region : regions) {
489      count += region.getStoreFileList(families).size();
490    }
491    return count;
492  }
493
494  static void loadData(final TableName tableName, final byte[][] families, final int rows)
495    throws IOException {
496    loadData(tableName, families, rows, 1);
497  }
498
499  static void loadData(final TableName tableName, final byte[][] families, final int rows,
500    final int flushes) throws IOException {
501    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
502    List<Put> puts = new ArrayList<>(rows);
503    byte[] qualifier = Bytes.toBytes("val");
504    for (int i = 0; i < flushes; i++) {
505      for (int k = 0; k < rows; k++) {
506        byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
507        Put p = new Put(row);
508        for (int j = 0; j < families.length; ++j) {
509          p.addColumn(families[j], qualifier, row);
510        }
511        puts.add(p);
512      }
513      table.putAll(puts).join();
514      TEST_UTIL.flush();
515      puts.clear();
516    }
517  }
518}