001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.stream.Collectors;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.TableNotFoundException;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.RegionState;
038import org.apache.hadoop.hbase.master.ServerManager;
039import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
040import org.apache.hadoop.hbase.master.assignment.RegionStates;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.regionserver.Region;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.JVMClusterUtil;
048import org.apache.hadoop.hbase.util.Threads;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.runner.RunWith;
053import org.junit.runners.Parameterized;
054
055/**
056 * Class to test asynchronous region admin operations.
057 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
058 * ten minute timeout so they were split.
059 */
060@RunWith(Parameterized.class)
061@Category({ LargeTests.class, ClientTests.class })
062public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class);
066
067  @Test
068  public void testAssignRegionAndUnassignRegion() throws Exception {
069    createTableWithDefaultConf(tableName);
070
071    // assign region.
072    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
073    AssignmentManager am = master.getAssignmentManager();
074    RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
075
076    // assert region on server
077    RegionStates regionStates = am.getRegionStates();
078    ServerName serverName = regionStates.getRegionServerOfRegion(hri);
079    TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
080    assertTrue(regionStates.getRegionState(hri).isOpened());
081
082    // Region is assigned now. Let's assign it again.
083    // Master should not abort, and region should stay assigned.
084    admin.assign(hri.getRegionName()).get();
085    assertTrue(regionStates.getRegionState(hri).isOpened());
086
087    // unassign region
088    admin.unassign(hri.getRegionName(), true).get();
089    assertTrue(regionStates.getRegionState(hri).isClosed());
090  }
091
092  RegionInfo createTableAndGetOneRegion(final TableName tableName)
093      throws IOException, InterruptedException, ExecutionException {
094    TableDescriptor desc =
095        TableDescriptorBuilder.newBuilder(tableName)
096            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
097    admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
098
099    // wait till the table is assigned
100    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
101    long timeoutTime = System.currentTimeMillis() + 3000;
102    while (true) {
103      List<RegionInfo> regions =
104          master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
105      if (regions.size() > 3) {
106        return regions.get(2);
107      }
108      long now = System.currentTimeMillis();
109      if (now > timeoutTime) {
110        fail("Could not find an online region");
111      }
112      Thread.sleep(10);
113    }
114  }
115
116  @Test
117  public void testGetRegionByStateOfTable() throws Exception {
118    RegionInfo hri = createTableAndGetOneRegion(tableName);
119
120    RegionStates regionStates =
121        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
122    assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
123        .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
124    assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
125        .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
126  }
127
128  @Test
129  public void testMoveRegion() throws Exception {
130    admin.balancerSwitch(false).join();
131
132    RegionInfo hri = createTableAndGetOneRegion(tableName);
133    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
134    ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
135
136    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
137    ServerManager serverManager = master.getServerManager();
138    ServerName destServerName = null;
139    List<JVMClusterUtil.RegionServerThread> regionServers =
140        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
141    for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
142      HRegionServer destServer = regionServer.getRegionServer();
143      destServerName = destServer.getServerName();
144      if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
145        break;
146      }
147    }
148
149    assertTrue(destServerName != null && !destServerName.equals(serverName));
150    admin.move(hri.getRegionName(), destServerName).get();
151
152    long timeoutTime = System.currentTimeMillis() + 30000;
153    while (true) {
154      ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
155      if (sn != null && sn.equals(destServerName)) {
156        break;
157      }
158      long now = System.currentTimeMillis();
159      if (now > timeoutTime) {
160        fail("Failed to move the region in time: " + hri);
161      }
162      Thread.sleep(100);
163    }
164    admin.balancerSwitch(true).join();
165  }
166
167  @Test
168  public void testGetOnlineRegions() throws Exception {
169    createTableAndGetOneRegion(tableName);
170    AtomicInteger regionServerCount = new AtomicInteger(0);
171    TEST_UTIL
172        .getHBaseCluster()
173        .getLiveRegionServerThreads()
174        .stream()
175        .map(rsThread -> rsThread.getRegionServer())
176        .forEach(
177          rs -> {
178            ServerName serverName = rs.getServerName();
179            try {
180              assertEquals(admin.getRegions(serverName).get().size(), rs
181                  .getRegions().size());
182            } catch (Exception e) {
183              fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
184            }
185            regionServerCount.incrementAndGet();
186          });
187    assertEquals(2, regionServerCount.get());
188  }
189
190  @Test
191  public void testFlushTableAndRegion() throws Exception {
192    RegionInfo hri = createTableAndGetOneRegion(tableName);
193    ServerName serverName =
194        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
195            .getRegionServerOfRegion(hri);
196    HRegionServer regionServer =
197        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
198            .map(rsThread -> rsThread.getRegionServer())
199            .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
200
201    // write a put into the specific region
202    ASYNC_CONN.getTable(tableName)
203        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
204        .join();
205    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
206    // flush region and wait flush operation finished.
207    LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
208    admin.flushRegion(hri.getRegionName()).get();
209    LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
210    Threads.sleepWithoutInterrupt(500);
211    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
212      Threads.sleep(50);
213    }
214    // check the memstore.
215    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
216
217    // write another put into the specific region
218    ASYNC_CONN.getTable(tableName)
219        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
220        .join();
221    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
222    admin.flush(tableName).get();
223    Threads.sleepWithoutInterrupt(500);
224    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
225      Threads.sleep(50);
226    }
227    // check the memstore.
228    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
229  }
230
231  private void waitUntilMobCompactionFinished(TableName tableName)
232      throws ExecutionException, InterruptedException {
233    long finished = EnvironmentEdgeManager.currentTime() + 60000;
234    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
235    while (EnvironmentEdgeManager.currentTime() < finished) {
236      if (state == CompactionState.NONE) {
237        break;
238      }
239      Thread.sleep(10);
240      state = admin.getCompactionState(tableName, CompactType.MOB).get();
241    }
242    assertEquals(CompactionState.NONE, state);
243  }
244
245  @Test
246  public void testCompactMob() throws Exception {
247    ColumnFamilyDescriptor columnDescriptor =
248        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
249            .setMobEnabled(true).setMobThreshold(0).build();
250
251    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
252        .setColumnFamily(columnDescriptor).build();
253
254    admin.createTable(tableDescriptor).get();
255
256    byte[][] families = { Bytes.toBytes("mob") };
257    loadData(tableName, families, 3000, 8);
258
259    admin.majorCompact(tableName, CompactType.MOB).get();
260
261    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
262    assertNotEquals(CompactionState.NONE, state);
263
264    waitUntilMobCompactionFinished(tableName);
265  }
266
267  @Test
268  public void testCompactRegionServer() throws Exception {
269    byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
270    createTableWithDefaultConf(tableName, null, families);
271    loadData(tableName, families, 3000, 8);
272
273    List<HRegionServer> rsList =
274        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
275            .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
276    List<Region> regions = new ArrayList<>();
277    rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
278    assertEquals(1, regions.size());
279    int countBefore = countStoreFilesInFamilies(regions, families);
280    assertTrue(countBefore > 0);
281
282    // Minor compaction for all region servers.
283    for (HRegionServer rs : rsList)
284      admin.compactRegionServer(rs.getServerName()).get();
285    Thread.sleep(5000);
286    int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
287    assertTrue(countAfterMinorCompaction < countBefore);
288
289    // Major compaction for all region servers.
290    for (HRegionServer rs : rsList)
291      admin.majorCompactRegionServer(rs.getServerName()).get();
292    Thread.sleep(5000);
293    int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
294    assertEquals(3, countAfterMajorCompaction);
295  }
296
297  @Test
298  public void testCompact() throws Exception {
299    compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);
300    compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false);
301    compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true);
302    compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true);
303  }
304
305  private void compactionTest(final TableName tableName, final int flushes,
306      final CompactionState expectedState, boolean singleFamily) throws Exception {
307    // Create a table with regions
308    byte[] family = Bytes.toBytes("family");
309    byte[][] families =
310        { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
311    createTableWithDefaultConf(tableName, null, families);
312    loadData(tableName, families, 3000, flushes);
313
314    List<Region> regions = new ArrayList<>();
315    TEST_UTIL
316        .getHBaseCluster()
317        .getLiveRegionServerThreads()
318        .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
319    assertEquals(1, regions.size());
320
321    int countBefore = countStoreFilesInFamilies(regions, families);
322    int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
323    assertTrue(countBefore > 0); // there should be some data files
324    if (expectedState == CompactionState.MINOR) {
325      if (singleFamily) {
326        admin.compact(tableName, family).get();
327      } else {
328        admin.compact(tableName).get();
329      }
330    } else {
331      if (singleFamily) {
332        admin.majorCompact(tableName, family).get();
333      } else {
334        admin.majorCompact(tableName).get();
335      }
336    }
337
338    long curt = System.currentTimeMillis();
339    long waitTime = 5000;
340    long endt = curt + waitTime;
341    CompactionState state = admin.getCompactionState(tableName).get();
342    while (state == CompactionState.NONE && curt < endt) {
343      Thread.sleep(10);
344      state = admin.getCompactionState(tableName).get();
345      curt = System.currentTimeMillis();
346    }
347    // Now, should have the right compaction state,
348    // otherwise, the compaction should have already been done
349    if (expectedState != state) {
350      for (Region region : regions) {
351        state = CompactionState.valueOf(region.getCompactionState().toString());
352        assertEquals(CompactionState.NONE, state);
353      }
354    } else {
355      // Wait until the compaction is done
356      state = admin.getCompactionState(tableName).get();
357      while (state != CompactionState.NONE && curt < endt) {
358        Thread.sleep(10);
359        state = admin.getCompactionState(tableName).get();
360      }
361      // Now, compaction should be done.
362      assertEquals(CompactionState.NONE, state);
363    }
364
365    int countAfter = countStoreFilesInFamilies(regions, families);
366    int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
367    assertTrue(countAfter < countBefore);
368    if (!singleFamily) {
369      if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
370      else assertTrue(families.length < countAfter);
371    } else {
372      int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
373      // assert only change was to single column family
374      assertTrue(singleFamDiff == (countBefore - countAfter));
375      if (expectedState == CompactionState.MAJOR) {
376        assertTrue(1 == countAfterSingleFamily);
377      } else {
378        assertTrue(1 < countAfterSingleFamily);
379      }
380    }
381  }
382
383  @Test
384  public void testNonExistentTableCompaction() {
385    testNonExistentTableCompaction(CompactionState.MINOR);
386    testNonExistentTableCompaction(CompactionState.MAJOR);
387  }
388
389  private void testNonExistentTableCompaction(CompactionState compactionState) {
390    try {
391      if (compactionState == CompactionState.MINOR) {
392        admin.compact(TableName.valueOf("NonExistentTable")).get();
393      } else {
394        admin.majorCompact(TableName.valueOf("NonExistentTable")).get();
395      }
396      fail("Expected TableNotFoundException when table doesn't exist");
397    } catch (Exception e) {
398      // expected.
399      assertTrue(e.getCause() instanceof TableNotFoundException);
400    }
401  }
402
403  private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
404    return countStoreFilesInFamilies(regions, new byte[][] { family });
405  }
406
407  private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
408    int count = 0;
409    for (Region region : regions) {
410      count += region.getStoreFileList(families).size();
411    }
412    return count;
413  }
414
415  static void loadData(final TableName tableName, final byte[][] families, final int rows)
416      throws IOException {
417    loadData(tableName, families, rows, 1);
418  }
419
420  static void loadData(final TableName tableName, final byte[][] families, final int rows,
421      final int flushes) throws IOException {
422    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
423    List<Put> puts = new ArrayList<>(rows);
424    byte[] qualifier = Bytes.toBytes("val");
425    for (int i = 0; i < flushes; i++) {
426      for (int k = 0; k < rows; k++) {
427        byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
428        Put p = new Put(row);
429        for (int j = 0; j < families.length; ++j) {
430          p.addColumn(families[j], qualifier, row);
431        }
432        puts.add(p);
433      }
434      table.putAll(puts).join();
435      TEST_UTIL.flush();
436      puts.clear();
437    }
438  }
439}