001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.stream.Collectors;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.master.HMaster;
036import org.apache.hadoop.hbase.master.RegionState;
037import org.apache.hadoop.hbase.master.ServerManager;
038import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
039import org.apache.hadoop.hbase.master.assignment.RegionStates;
040import org.apache.hadoop.hbase.regionserver.HRegionServer;
041import org.apache.hadoop.hbase.regionserver.Region;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.util.JVMClusterUtil;
047import org.apache.hadoop.hbase.util.Threads;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053
054/**
055 * Class to test asynchronous region admin operations.
056 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
057 * ten minute timeout so they were split.
058 */
059@RunWith(Parameterized.class)
060@Category({ LargeTests.class, ClientTests.class })
061public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class);
065
066  @Test
067  public void testAssignRegionAndUnassignRegion() throws Exception {
068    createTableWithDefaultConf(tableName);
069
070    // assign region.
071    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
072    AssignmentManager am = master.getAssignmentManager();
073    RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
074
075    // assert region on server
076    RegionStates regionStates = am.getRegionStates();
077    ServerName serverName = regionStates.getRegionServerOfRegion(hri);
078    TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
079    assertTrue(regionStates.getRegionState(hri).isOpened());
080
081    // Region is assigned now. Let's assign it again.
082    // Master should not abort, and region should stay assigned.
083    admin.assign(hri.getRegionName()).get();
084    assertTrue(regionStates.getRegionState(hri).isOpened());
085
086    // unassign region
087    admin.unassign(hri.getRegionName(), true).get();
088    assertTrue(regionStates.getRegionState(hri).isClosed());
089  }
090
091  RegionInfo createTableAndGetOneRegion(final TableName tableName)
092      throws IOException, InterruptedException, ExecutionException {
093    TableDescriptor desc =
094        TableDescriptorBuilder.newBuilder(tableName)
095            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
096    admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
097
098    // wait till the table is assigned
099    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
100    long timeoutTime = System.currentTimeMillis() + 3000;
101    while (true) {
102      List<RegionInfo> regions =
103          master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
104      if (regions.size() > 3) {
105        return regions.get(2);
106      }
107      long now = System.currentTimeMillis();
108      if (now > timeoutTime) {
109        fail("Could not find an online region");
110      }
111      Thread.sleep(10);
112    }
113  }
114
115  @Test
116  public void testGetRegionByStateOfTable() throws Exception {
117    RegionInfo hri = createTableAndGetOneRegion(tableName);
118
119    RegionStates regionStates =
120        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
121    assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
122        .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
123    assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
124        .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
125  }
126
127  @Test
128  public void testMoveRegion() throws Exception {
129    admin.balancerSwitch(false).join();
130
131    RegionInfo hri = createTableAndGetOneRegion(tableName);
132    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
133    ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
134
135    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
136    ServerManager serverManager = master.getServerManager();
137    ServerName destServerName = null;
138    List<JVMClusterUtil.RegionServerThread> regionServers =
139        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
140    for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
141      HRegionServer destServer = regionServer.getRegionServer();
142      destServerName = destServer.getServerName();
143      if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
144        break;
145      }
146    }
147
148    assertTrue(destServerName != null && !destServerName.equals(serverName));
149    admin.move(hri.getRegionName(), destServerName).get();
150
151    long timeoutTime = System.currentTimeMillis() + 30000;
152    while (true) {
153      ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
154      if (sn != null && sn.equals(destServerName)) {
155        break;
156      }
157      long now = System.currentTimeMillis();
158      if (now > timeoutTime) {
159        fail("Failed to move the region in time: " + hri);
160      }
161      Thread.sleep(100);
162    }
163    admin.balancerSwitch(true).join();
164  }
165
166  @Test
167  public void testGetOnlineRegions() throws Exception {
168    createTableAndGetOneRegion(tableName);
169    AtomicInteger regionServerCount = new AtomicInteger(0);
170    TEST_UTIL
171        .getHBaseCluster()
172        .getLiveRegionServerThreads()
173        .stream()
174        .map(rsThread -> rsThread.getRegionServer())
175        .forEach(
176          rs -> {
177            ServerName serverName = rs.getServerName();
178            try {
179              assertEquals(admin.getRegions(serverName).get().size(), rs
180                  .getRegions().size());
181            } catch (Exception e) {
182              fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
183            }
184            regionServerCount.incrementAndGet();
185          });
186    assertEquals(2, regionServerCount.get());
187  }
188
189  @Test
190  public void testFlushTableAndRegion() throws Exception {
191    RegionInfo hri = createTableAndGetOneRegion(tableName);
192    ServerName serverName =
193        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
194            .getRegionServerOfRegion(hri);
195    HRegionServer regionServer =
196        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
197            .map(rsThread -> rsThread.getRegionServer())
198            .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
199
200    // write a put into the specific region
201    ASYNC_CONN.getTable(tableName)
202        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
203        .join();
204    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
205    // flush region and wait flush operation finished.
206    LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
207    admin.flushRegion(hri.getRegionName()).get();
208    LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
209    Threads.sleepWithoutInterrupt(500);
210    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
211      Threads.sleep(50);
212    }
213    // check the memstore.
214    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
215
216    // write another put into the specific region
217    ASYNC_CONN.getTable(tableName)
218        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
219        .join();
220    assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
221    admin.flush(tableName).get();
222    Threads.sleepWithoutInterrupt(500);
223    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
224      Threads.sleep(50);
225    }
226    // check the memstore.
227    assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
228  }
229
230  private void waitUntilMobCompactionFinished(TableName tableName)
231      throws ExecutionException, InterruptedException {
232    long finished = EnvironmentEdgeManager.currentTime() + 60000;
233    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
234    while (EnvironmentEdgeManager.currentTime() < finished) {
235      if (state == CompactionState.NONE) {
236        break;
237      }
238      Thread.sleep(10);
239      state = admin.getCompactionState(tableName, CompactType.MOB).get();
240    }
241    assertEquals(CompactionState.NONE, state);
242  }
243
244  @Test
245  public void testCompactMob() throws Exception {
246    ColumnFamilyDescriptor columnDescriptor =
247        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
248            .setMobEnabled(true).setMobThreshold(0).build();
249
250    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
251        .setColumnFamily(columnDescriptor).build();
252
253    admin.createTable(tableDescriptor).get();
254
255    byte[][] families = { Bytes.toBytes("mob") };
256    loadData(tableName, families, 3000, 8);
257
258    admin.majorCompact(tableName, CompactType.MOB).get();
259
260    CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
261    assertNotEquals(CompactionState.NONE, state);
262
263    waitUntilMobCompactionFinished(tableName);
264  }
265
266  @Test
267  public void testCompactRegionServer() throws Exception {
268    byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
269    createTableWithDefaultConf(tableName, null, families);
270    loadData(tableName, families, 3000, 8);
271
272    List<HRegionServer> rsList =
273        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
274            .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
275    List<Region> regions = new ArrayList<>();
276    rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
277    assertEquals(1, regions.size());
278    int countBefore = countStoreFilesInFamilies(regions, families);
279    assertTrue(countBefore > 0);
280
281    // Minor compaction for all region servers.
282    for (HRegionServer rs : rsList)
283      admin.compactRegionServer(rs.getServerName()).get();
284    Thread.sleep(5000);
285    int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
286    assertTrue(countAfterMinorCompaction < countBefore);
287
288    // Major compaction for all region servers.
289    for (HRegionServer rs : rsList)
290      admin.majorCompactRegionServer(rs.getServerName()).get();
291    Thread.sleep(5000);
292    int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
293    assertEquals(3, countAfterMajorCompaction);
294  }
295
296  @Test
297  public void testCompact() throws Exception {
298    compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);
299    compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false);
300    compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true);
301    compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true);
302  }
303
304  private void compactionTest(final TableName tableName, final int flushes,
305      final CompactionState expectedState, boolean singleFamily) throws Exception {
306    // Create a table with regions
307    byte[] family = Bytes.toBytes("family");
308    byte[][] families =
309        { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
310    createTableWithDefaultConf(tableName, null, families);
311    loadData(tableName, families, 3000, flushes);
312
313    List<Region> regions = new ArrayList<>();
314    TEST_UTIL
315        .getHBaseCluster()
316        .getLiveRegionServerThreads()
317        .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
318    assertEquals(1, regions.size());
319
320    int countBefore = countStoreFilesInFamilies(regions, families);
321    int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
322    assertTrue(countBefore > 0); // there should be some data files
323    if (expectedState == CompactionState.MINOR) {
324      if (singleFamily) {
325        admin.compact(tableName, family).get();
326      } else {
327        admin.compact(tableName).get();
328      }
329    } else {
330      if (singleFamily) {
331        admin.majorCompact(tableName, family).get();
332      } else {
333        admin.majorCompact(tableName).get();
334      }
335    }
336
337    long curt = System.currentTimeMillis();
338    long waitTime = 5000;
339    long endt = curt + waitTime;
340    CompactionState state = admin.getCompactionState(tableName).get();
341    while (state == CompactionState.NONE && curt < endt) {
342      Thread.sleep(10);
343      state = admin.getCompactionState(tableName).get();
344      curt = System.currentTimeMillis();
345    }
346    // Now, should have the right compaction state,
347    // otherwise, the compaction should have already been done
348    if (expectedState != state) {
349      for (Region region : regions) {
350        state = CompactionState.valueOf(region.getCompactionState().toString());
351        assertEquals(CompactionState.NONE, state);
352      }
353    } else {
354      // Wait until the compaction is done
355      state = admin.getCompactionState(tableName).get();
356      while (state != CompactionState.NONE && curt < endt) {
357        Thread.sleep(10);
358        state = admin.getCompactionState(tableName).get();
359      }
360      // Now, compaction should be done.
361      assertEquals(CompactionState.NONE, state);
362    }
363
364    int countAfter = countStoreFilesInFamilies(regions, families);
365    int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
366    assertTrue(countAfter < countBefore);
367    if (!singleFamily) {
368      if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
369      else assertTrue(families.length < countAfter);
370    } else {
371      int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
372      // assert only change was to single column family
373      assertTrue(singleFamDiff == (countBefore - countAfter));
374      if (expectedState == CompactionState.MAJOR) {
375        assertTrue(1 == countAfterSingleFamily);
376      } else {
377        assertTrue(1 < countAfterSingleFamily);
378      }
379    }
380  }
381
382  private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
383    return countStoreFilesInFamilies(regions, new byte[][] { family });
384  }
385
386  private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
387    int count = 0;
388    for (Region region : regions) {
389      count += region.getStoreFileList(families).size();
390    }
391    return count;
392  }
393
394  static void loadData(final TableName tableName, final byte[][] families, final int rows)
395      throws IOException {
396    loadData(tableName, families, rows, 1);
397  }
398
399  static void loadData(final TableName tableName, final byte[][] families, final int rows,
400      final int flushes) throws IOException {
401    AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
402    List<Put> puts = new ArrayList<>(rows);
403    byte[] qualifier = Bytes.toBytes("val");
404    for (int i = 0; i < flushes; i++) {
405      for (int k = 0; k < rows; k++) {
406        byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
407        Put p = new Put(row);
408        for (int j = 0; j < families.length; ++j) {
409          p.addColumn(families[j], qualifier, row);
410        }
411        puts.add(p);
412      }
413      table.putAll(puts).join();
414      TEST_UTIL.flush();
415      puts.clear();
416    }
417  }
418}