001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.Collection;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
034import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.CommonFSUtils;
038import org.junit.After;
039import org.junit.AfterClass;
040import org.junit.Assert;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048@Category(MediumTests.class)
049public class TestCompactSplitThread {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestCompactSplitThread.class);
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestCompactSplitThread.class);
056  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
057  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
058  private final byte[] family = Bytes.toBytes("f");
059  private static final int NUM_RS = 1;
060  private static final int blockingStoreFiles = 3;
061  private static Path rootDir;
062  private static FileSystem fs;
063
064  /**
065   * Setup the config for the cluster
066   */
067  @BeforeClass
068  public static void setupCluster() throws Exception {
069    setupConf(TEST_UTIL.getConfiguration());
070    TEST_UTIL.startMiniCluster(NUM_RS);
071    fs = TEST_UTIL.getDFSCluster().getFileSystem();
072    rootDir = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
073
074  }
075
076  private static void setupConf(Configuration conf) {
077    // disable the ui
078    conf.setInt("hbase.regionsever.info.port", -1);
079    // so make sure we get a compaction when doing a load, but keep around some
080    // files in the store
081    conf.setInt("hbase.hstore.compaction.min", 2);
082    conf.setInt("hbase.hstore.compactionThreshold", 5);
083    // change the flush size to a small amount, regulating number of store files
084    conf.setInt("hbase.hregion.memstore.flush.size", 25000);
085
086    // block writes if we get to blockingStoreFiles store files
087    conf.setInt("hbase.hstore.blockingStoreFiles", blockingStoreFiles);
088    // Ensure no extra cleaners on by default (e.g. TimeToLiveHFileCleaner)
089    conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 3);
090    conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 4);
091    conf.setInt(CompactSplit.SPLIT_THREADS, 5);
092  }
093
094  @After
095  public void tearDown() throws Exception {
096    TEST_UTIL.deleteTable(tableName);
097  }
098
099  @AfterClass
100  public static void cleanupTest() throws Exception {
101    try {
102      TEST_UTIL.shutdownMiniCluster();
103    } catch (Exception e) {
104      // NOOP;
105    }
106  }
107
108  @Test
109  public void testThreadPoolSizeTuning() throws Exception {
110    Configuration conf = TEST_UTIL.getConfiguration();
111    Connection conn = ConnectionFactory.createConnection(conf);
112    try {
113      TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
114        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
115        .build();
116      TEST_UTIL.getAdmin().createTable(tableDescriptor);
117      TEST_UTIL.waitTableAvailable(tableName);
118      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
119
120      // check initial configuration of thread pool sizes
121      assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
122      assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
123      assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum());
124
125      // change bigger configurations and do online update
126      conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4);
127      conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5);
128      conf.setInt(CompactSplit.SPLIT_THREADS, 6);
129      try {
130        regionServer.getCompactSplitThread().onConfigurationChange(conf);
131      } catch (IllegalArgumentException iae) {
132        Assert.fail("Update bigger configuration failed!");
133      }
134
135      // check again after online update
136      assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
137      assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
138      assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum());
139
140      // change smaller configurations and do online update
141      conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2);
142      conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3);
143      conf.setInt(CompactSplit.SPLIT_THREADS, 4);
144      try {
145        regionServer.getCompactSplitThread().onConfigurationChange(conf);
146      } catch (IllegalArgumentException iae) {
147        Assert.fail("Update smaller configuration failed!");
148      }
149
150      // check again after online update
151      assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
152      assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
153      assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum());
154    } finally {
155      conn.close();
156    }
157  }
158
159  @Test
160  public void testFlushWithTableCompactionDisabled() throws Exception {
161    TableDescriptor htd =
162      TableDescriptorBuilder.newBuilder(tableName).setCompactionEnabled(false).build();
163    TEST_UTIL.createTable(htd, new byte[][] { family }, null);
164
165    // load the table
166    for (int i = 0; i < blockingStoreFiles + 1; i++) {
167      TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(tableName), family);
168      TEST_UTIL.flush(tableName);
169    }
170
171    // Make sure that store file number is greater than blockingStoreFiles + 1
172    Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
173    Collection<String> hfiles = SnapshotTestingUtils.listHFileNames(fs, tableDir);
174    assert (hfiles.size() > blockingStoreFiles + 1);
175  }
176}