001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotEquals;
024import static org.junit.Assert.assertThat;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.stream.Collectors;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.master.HMaster;
041import org.apache.hadoop.hbase.master.RegionState;
042import org.apache.hadoop.hbase.master.ServerManager;
043import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
044import org.apache.hadoop.hbase.master.assignment.RegionStates;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.regionserver.Region;
047import org.apache.hadoop.hbase.testclassification.ClientTests;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.JVMClusterUtil;
052import org.apache.hadoop.hbase.util.Threads;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.runner.RunWith;
057import org.junit.runners.Parameterized;
058
059/**
060 * Class to test asynchronous region admin operations.
061 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
062 * ten minute timeout so they were split.
063 */
064@RunWith(Parameterized.class)
065@Category({ LargeTests.class, ClientTests.class })
066public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069      HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class);
070
071  @Test
072  public void testAssignRegionAndUnassignRegion() throws Exception {
073    createTableWithDefaultConf(tableName);
074
075    // assign region.
076    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
077    AssignmentManager am = master.getAssignmentManager();
078    RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
079
080    // assert region on server
081    RegionStates regionStates = am.getRegionStates();
082    ServerName serverName = regionStates.getRegionServerOfRegion(hri);
083    TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
084    assertTrue(regionStates.getRegionState(hri).isOpened());
085
086    // Region is assigned now. Let's assign it again.
087    // Master should not abort, and region should stay assigned.
088    try {
089      admin.assign(hri.getRegionName()).get();
090      fail("Should fail when assigning an already onlined region");
091    } catch (ExecutionException e) {
092      // Expected
093      assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class));
094    }
095    assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
096    assertTrue(regionStates.getRegionState(hri).isOpened());
097
098    // unassign region
099    admin.unassign(hri.getRegionName(), true).get();
100    assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
101    assertTrue(regionStates.getRegionState(hri).isClosed());
102  }
103
104  RegionInfo createTableAndGetOneRegion(final TableName tableName)
105      throws IOException, InterruptedException, ExecutionException {
106    TableDescriptor desc =
107        TableDescriptorBuilder.newBuilder(tableName)
108            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
109    admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
110
111    // wait till the table is assigned
112    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
113    long timeoutTime = System.currentTimeMillis() + 3000;
114    while (true) {
115      List<RegionInfo> regions =
116          master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
117      if (regions.size() > 3) {
118        return regions.get(2);
119      }
120      long now = System.currentTimeMillis();
121      if (now > timeoutTime) {
122        fail("Could not find an online region");
123      }
124      Thread.sleep(10);
125    }
126  }
127
128  @Test
129  public void testGetRegionByStateOfTable() throws Exception {
130    RegionInfo hri = createTableAndGetOneRegion(tableName);
131
132    RegionStates regionStates =
133        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
134    assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
135        .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
136    assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
137        .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
138  }
139
140  @Test
141  public void testMoveRegion() throws Exception {
142    admin.balancerSwitch(false).join();
143
144    RegionInfo hri = createTableAndGetOneRegion(tableName);
145    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
146    ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
147
148    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
149    ServerManager serverManager = master.getServerManager();
150    ServerName destServerName = null;
151    List<JVMClusterUtil.RegionServerThread> regionServers =
152        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
153    for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
154      HRegionServer destServer = regionServer.getRegionServer();
155      destServerName = destServer.getServerName();
156      if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
157        break;
158      }
159    }
160
161    assertTrue(destServerName != null && !destServerName.equals(serverName));
162    admin.move(hri.getRegionName(), destServerName).get();
163
164    long timeoutTime = System.currentTimeMillis() + 30000;
165    while (true) {
166      ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
167      if (sn != null && sn.equals(destServerName)) {
168        break;
169      }
170      long now = System.currentTimeMillis();
171      if (now > timeoutTime) {
172        fail("Failed to move the region in time: " + hri);
173      }
174      Thread.sleep(100);
175    }
176    admin.balancerSwitch(true).join();
177  }
178
179  @Test
180  public void testGetOnlineRegions() throws Exception {
181    createTableAndGetOneRegion(tableName);
182    AtomicInteger regionServerCount = new AtomicInteger(0);
183    TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
184      .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> {
185        ServerName serverName = rs.getServerName();
186        try {
187          assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size());
188        } catch (Exception e) {
189          fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
190        }
191        regionServerCount.incrementAndGet();
192      });
193    assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(),
194      regionServerCount.get());
195  }
196
197  @Test
198  public void testFlushTableAndRegion() throws Exception {
199    RegionInfo hri = createTableAndGetOneRegion(tableName);
200    ServerName serverName =
201        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
202            .getRegionServerOfRegion(hri);
203    HRegionServer regionServer =
204        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
205            .map(rsThread -> rsThread.getRegionServer())
206            .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
207
208    // write a put into the specific region
209    ASYNC_CONN.getTable(tableName)
210        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
211        .join();
212    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
213    // flush region and wait flush operation finished.
214    LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
215    admin.flushRegion(hri.getRegionName()).get();
216    LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
217    Threads.sleepWithoutInterrupt(500);
218    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
219      Threads.sleep(50);
220    }
221    // check the memstore.
222    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
223
224    // write another put into the specific region
225    ASYNC_CONN.getTable(tableName)
226        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
227        .join();
228    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
229    admin.flush(tableName).get();
230    Threads.sleepWithoutInterrupt(500);
231    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
232      Threads.sleep(50);
233    }
234    // check the memstore.
235    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
236  }
237
238  private void waitUntilMobCompactionFinished(TableName tableName)
239      throws ExecutionException, InterruptedException {
240    long finished = EnvironmentEdgeManager.currentTime() + 60000;
241    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
242    while (EnvironmentEdgeManager.currentTime() < finished) {
243      if (state == CompactionState.NONE) {
244        break;
245      }
246      Thread.sleep(10);
247      state = admin.getCompactionState(tableName, CompactType.MOB).get();
248    }
249    assertEquals(CompactionState.NONE, state);
250  }
251
252  @Test
253  public void testCompactMob() throws Exception {
254    ColumnFamilyDescriptor columnDescriptor =
255        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
256            .setMobEnabled(true).setMobThreshold(0).build();
257
258    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
259        .setColumnFamily(columnDescriptor).build();
260
261    admin.createTable(tableDescriptor).get();
262
263    byte[][] families = { Bytes.toBytes("mob") };
264    loadData(tableName, families, 3000, 8);
265
266    admin.majorCompact(tableName, CompactType.MOB).get();
267
268    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
269    assertNotEquals(CompactionState.NONE, state);
270
271    waitUntilMobCompactionFinished(tableName);
272  }
273
274  @Test
275  public void testCompactRegionServer() throws Exception {
276    byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
277    createTableWithDefaultConf(tableName, null, families);
278    loadData(tableName, families, 3000, 8);
279
280    List<HRegionServer> rsList =
281        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
282            .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
283    List<Region> regions = new ArrayList<>();
284    rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
285    assertEquals(1, regions.size());
286    int countBefore = countStoreFilesInFamilies(regions, families);
287    assertTrue(countBefore > 0);
288
289    // Minor compaction for all region servers.
290    for (HRegionServer rs : rsList)
291      admin.compactRegionServer(rs.getServerName()).get();
292    Thread.sleep(5000);
293    int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
294    assertTrue(countAfterMinorCompaction < countBefore);
295
296    // Major compaction for all region servers.
297    for (HRegionServer rs : rsList)
298      admin.majorCompactRegionServer(rs.getServerName()).get();
299    Thread.sleep(5000);
300    int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
301    assertEquals(3, countAfterMajorCompaction);
302  }
303
304  @Test
305  public void testCompactionSwitchStates() throws Exception {
306    // Create a table with regions
307    byte[] family = Bytes.toBytes("family");
308    byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")),
309        Bytes.add(family, Bytes.toBytes("3"))};
310    createTableWithDefaultConf(tableName, null, families);
311    loadData(tableName, families, 3000, 8);
312    List<Region> regions = new ArrayList<>();
313    TEST_UTIL
314        .getHBaseCluster()
315        .getLiveRegionServerThreads()
316        .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
317    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture =
318        admin.compactionSwitch(true, new ArrayList<>());
319    Map<ServerName, Boolean> pairs = listCompletableFuture.get();
320    for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) {
321      assertEquals("Default compaction state, expected=enabled actual=disabled",
322          true, p.getValue());
323    }
324    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 =
325        admin.compactionSwitch(false, new ArrayList<>());
326    Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get();
327    for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) {
328      assertEquals("Last compaction state, expected=enabled actual=disabled",
329          true, p.getValue());
330    }
331    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 =
332        admin.compactionSwitch(true, new ArrayList<>());
333    Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get();
334    for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) {
335      assertEquals("Last compaction state, expected=disabled actual=enabled",
336          false, p.getValue());
337    }
338    ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0)
339        .getServerName();
340    List<String> serverNameList = new ArrayList<String>();
341    serverNameList.add(serverName.getServerName());
342    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 =
343        admin.compactionSwitch(false, serverNameList);
344    Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get();
345    assertEquals(pairs3.entrySet().size(), 1);
346    for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) {
347      assertEquals("Last compaction state, expected=enabled actual=disabled",
348          true, p.getValue());
349    }
350    CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 =
351        admin.compactionSwitch(true, serverNameList);
352    Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get();
353    assertEquals(pairs4.entrySet().size(), 1);
354    for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) {
355      assertEquals("Last compaction state, expected=disabled actual=enabled",
356          false, p.getValue());
357    }
358  }
359
360  @Test
361  public void testCompact() throws Exception {
362    compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);
363    compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false);
364    compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true);
365    compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true);
366  }
367
368  private void compactionTest(final TableName tableName, final int flushes,
369      final CompactionState expectedState, boolean singleFamily) throws Exception {
370    // Create a table with regions
371    byte[] family = Bytes.toBytes("family");
372    byte[][] families =
373        { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
374    createTableWithDefaultConf(tableName, null, families);
375    loadData(tableName, families, 3000, flushes);
376
377    List<Region> regions = new ArrayList<>();
378    TEST_UTIL
379        .getHBaseCluster()
380        .getLiveRegionServerThreads()
381        .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
382    assertEquals(1, regions.size());
383
384    int countBefore = countStoreFilesInFamilies(regions, families);
385    int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
386    assertTrue(countBefore > 0); // there should be some data files
387    if (expectedState == CompactionState.MINOR) {
388      if (singleFamily) {
389        admin.compact(tableName, family).get();
390      } else {
391        admin.compact(tableName).get();
392      }
393    } else {
394      if (singleFamily) {
395        admin.majorCompact(tableName, family).get();
396      } else {
397        admin.majorCompact(tableName).get();
398      }
399    }
400
401    long curt = System.currentTimeMillis();
402    long waitTime = 5000;
403    long endt = curt + waitTime;
404    CompactionState state = admin.getCompactionState(tableName).get();
405    while (state == CompactionState.NONE && curt < endt) {
406      Thread.sleep(10);
407      state = admin.getCompactionState(tableName).get();
408      curt = System.currentTimeMillis();
409    }
410    // Now, should have the right compaction state,
411    // otherwise, the compaction should have already been done
412    if (expectedState != state) {
413      for (Region region : regions) {
414        state = CompactionState.valueOf(region.getCompactionState().toString());
415        assertEquals(CompactionState.NONE, state);
416      }
417    } else {
418      // Wait until the compaction is done
419      state = admin.getCompactionState(tableName).get();
420      while (state != CompactionState.NONE && curt < endt) {
421        Thread.sleep(10);
422        state = admin.getCompactionState(tableName).get();
423      }
424      // Now, compaction should be done.
425      assertEquals(CompactionState.NONE, state);
426    }
427
428    int countAfter = countStoreFilesInFamilies(regions, families);
429    int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
430    assertTrue(countAfter < countBefore);
431    if (!singleFamily) {
432      if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
433      else assertTrue(families.length < countAfter);
434    } else {
435      int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
436      // assert only change was to single column family
437      assertTrue(singleFamDiff == (countBefore - countAfter));
438      if (expectedState == CompactionState.MAJOR) {
439        assertTrue(1 == countAfterSingleFamily);
440      } else {
441        assertTrue(1 < countAfterSingleFamily);
442      }
443    }
444  }
445
446  @Test
447  public void testNonExistentTableCompaction() {
448    testNonExistentTableCompaction(CompactionState.MINOR);
449    testNonExistentTableCompaction(CompactionState.MAJOR);
450  }
451
452  private void testNonExistentTableCompaction(CompactionState compactionState) {
453    try {
454      if (compactionState == CompactionState.MINOR) {
455        admin.compact(TableName.valueOf("NonExistentTable")).get();
456      } else {
457        admin.majorCompact(TableName.valueOf("NonExistentTable")).get();
458      }
459      fail("Expected TableNotFoundException when table doesn't exist");
460    } catch (Exception e) {
461      // expected.
462      assertTrue(e.getCause() instanceof TableNotFoundException);
463    }
464  }
465
466  private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
467    return countStoreFilesInFamilies(regions, new byte[][] { family });
468  }
469
470  private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
471    int count = 0;
472    for (Region region : regions) {
473      count += region.getStoreFileList(families).size();
474    }
475    return count;
476  }
477
478  static void loadData(final TableName tableName, final byte[][] families, final int rows)
479      throws IOException {
480    loadData(tableName, families, rows, 1);
481  }
482
483  static void loadData(final TableName tableName, final byte[][] families, final int rows,
484      final int flushes) throws IOException {
485    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
486    List<Put> puts = new ArrayList<>(rows);
487    byte[] qualifier = Bytes.toBytes("val");
488    for (int i = 0; i < flushes; i++) {
489      for (int k = 0; k < rows; k++) {
490        byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
491        Put p = new Put(row);
492        for (int j = 0; j < families.length; ++j) {
493          p.addColumn(families[j], qualifier, row);
494        }
495        puts.add(p);
496      }
497      table.putAll(puts).join();
498      TEST_UTIL.flush();
499      puts.clear();
500    }
501  }
502}