001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022
023import java.util.Collection;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
034import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.CommonFSUtils;
038import org.junit.jupiter.api.AfterEach;
039import org.junit.jupiter.api.BeforeEach;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042
043@Tag(MediumTests.TAG)
044public class TestCompactSplitThread {
045
046  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
047  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
048  private final byte[] family = Bytes.toBytes("f");
049  private static final int NUM_RS = 1;
050  private static final int blockingStoreFiles = 3;
051  private static Path rootDir;
052  private static FileSystem fs;
053
054  /**
055   * Setup the config for the cluster
056   */
057  @BeforeEach
058  public void setupCluster() throws Exception {
059    setupConf(TEST_UTIL.getConfiguration());
060    TEST_UTIL.startMiniCluster(NUM_RS);
061    fs = TEST_UTIL.getDFSCluster().getFileSystem();
062    rootDir = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
063
064  }
065
066  private static void setupConf(Configuration conf) {
067    // disable the ui
068    conf.setInt("hbase.regionsever.info.port", -1);
069    // so make sure we get a compaction when doing a load, but keep around some
070    // files in the store
071    conf.setInt("hbase.hstore.compaction.min", 2);
072    conf.setInt("hbase.hstore.compactionThreshold", 5);
073    // change the flush size to a small amount, regulating number of store files
074    conf.setInt("hbase.hregion.memstore.flush.size", 25000);
075
076    // block writes if we get to blockingStoreFiles store files
077    conf.setInt("hbase.hstore.blockingStoreFiles", blockingStoreFiles);
078    // Ensure no extra cleaners on by default (e.g. TimeToLiveHFileCleaner)
079    conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 3);
080    conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 4);
081    conf.setInt(CompactSplit.SPLIT_THREADS, 5);
082  }
083
084  @AfterEach
085  public void cleanupTest() throws Exception {
086    try {
087      TEST_UTIL.shutdownMiniCluster();
088    } catch (Exception e) {
089      // NOOP;
090    }
091  }
092
093  @Test
094  public void testThreadPoolSizeTuning() throws Exception {
095    Configuration conf = TEST_UTIL.getConfiguration();
096    Connection conn = ConnectionFactory.createConnection(conf);
097    try {
098      TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
099        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
100        .build();
101      TEST_UTIL.getAdmin().createTable(tableDescriptor);
102      TEST_UTIL.waitTableAvailable(tableName);
103      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
104
105      // check initial configuration of thread pool sizes
106      assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
107      assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
108      assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum());
109
110      // change bigger configurations and do online update
111      conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4);
112      conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5);
113      conf.setInt(CompactSplit.SPLIT_THREADS, 6);
114      assertDoesNotThrow(() -> regionServer.getCompactSplitThread().onConfigurationChange(conf),
115        "Update bigger configuration failed!");
116
117      // check again after online update
118      assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
119      assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
120      assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum());
121
122      // change smaller configurations and do online update
123      conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2);
124      conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3);
125      conf.setInt(CompactSplit.SPLIT_THREADS, 4);
126      assertDoesNotThrow(() -> regionServer.getCompactSplitThread().onConfigurationChange(conf),
127        "Update smaller configuration failed!");
128
129      // check again after online update
130      assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
131      assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
132      assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum());
133    } finally {
134      conn.close();
135    }
136  }
137
138  @Test
139  public void testFlushWithTableCompactionDisabled() throws Exception {
140    TableDescriptor htd =
141      TableDescriptorBuilder.newBuilder(tableName).setCompactionEnabled(false).build();
142    TEST_UTIL.createTable(htd, new byte[][] { family }, null);
143
144    // load the table
145    for (int i = 0; i < blockingStoreFiles + 1; i++) {
146      TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(tableName), family);
147      TEST_UTIL.flush(tableName);
148    }
149
150    // Make sure that store file number is greater than blockingStoreFiles + 1
151    Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
152    Collection<String> hfiles = SnapshotTestingUtils.listHFileNames(fs, tableDir);
153    assert (hfiles.size() > blockingStoreFiles + 1);
154  }
155
156  @Test
157  public void testFlushWithRegionReplicas() throws Exception {
158    TableDescriptor htd =
159      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(2).build();
160    TEST_UTIL.createTable(htd, new byte[][] { family }, null);
161
162    // load the table
163    for (int i = 0; i < blockingStoreFiles + 1; i++) {
164      TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(tableName), family);
165      TEST_UTIL.flush(tableName);
166    }
167
168    // One region split should have taken place, because the primary replica gets split, and not the
169    // secondary replica.
170    assertEquals(1, TEST_UTIL.getRSForFirstRegionInTable(tableName).getCompactSplitThread()
171      .getSubmittedSplitsCount());
172  }
173}