001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022 023import java.util.Collection; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.CommonFSUtils; 038import org.junit.jupiter.api.AfterEach; 039import org.junit.jupiter.api.BeforeEach; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042 043@Tag(MediumTests.TAG) 044public class TestCompactSplitThread { 045 046 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 047 private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); 048 private final byte[] family = Bytes.toBytes("f"); 049 private static final int NUM_RS = 1; 050 private static final int blockingStoreFiles = 3; 051 private static Path rootDir; 052 private static FileSystem fs; 053 054 /** 055 * Setup the config for the cluster 056 */ 057 @BeforeEach 058 public void setupCluster() throws Exception { 059 setupConf(TEST_UTIL.getConfiguration()); 060 TEST_UTIL.startMiniCluster(NUM_RS); 061 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 062 rootDir = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 063 064 } 065 066 private static void setupConf(Configuration conf) { 067 // disable the ui 068 conf.setInt("hbase.regionsever.info.port", -1); 069 // so make sure we get a compaction when doing a load, but keep around some 070 // files in the store 071 conf.setInt("hbase.hstore.compaction.min", 2); 072 conf.setInt("hbase.hstore.compactionThreshold", 5); 073 // change the flush size to a small amount, regulating number of store files 074 conf.setInt("hbase.hregion.memstore.flush.size", 25000); 075 076 // block writes if we get to blockingStoreFiles store files 077 conf.setInt("hbase.hstore.blockingStoreFiles", blockingStoreFiles); 078 // Ensure no extra cleaners on by default (e.g. TimeToLiveHFileCleaner) 079 conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 3); 080 conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 4); 081 conf.setInt(CompactSplit.SPLIT_THREADS, 5); 082 } 083 084 @AfterEach 085 public void cleanupTest() throws Exception { 086 try { 087 TEST_UTIL.shutdownMiniCluster(); 088 } catch (Exception e) { 089 // NOOP; 090 } 091 } 092 093 @Test 094 public void testThreadPoolSizeTuning() throws Exception { 095 Configuration conf = TEST_UTIL.getConfiguration(); 096 Connection conn = ConnectionFactory.createConnection(conf); 097 try { 098 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 099 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 100 .build(); 101 TEST_UTIL.getAdmin().createTable(tableDescriptor); 102 TEST_UTIL.waitTableAvailable(tableName); 103 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 104 105 // check initial configuration of thread pool sizes 106 assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum()); 107 assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum()); 108 assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum()); 109 110 // change bigger configurations and do online update 111 conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4); 112 conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5); 113 conf.setInt(CompactSplit.SPLIT_THREADS, 6); 114 assertDoesNotThrow(() -> regionServer.getCompactSplitThread().onConfigurationChange(conf), 115 "Update bigger configuration failed!"); 116 117 // check again after online update 118 assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum()); 119 assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum()); 120 assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum()); 121 122 // change smaller configurations and do online update 123 conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2); 124 conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3); 125 conf.setInt(CompactSplit.SPLIT_THREADS, 4); 126 assertDoesNotThrow(() -> regionServer.getCompactSplitThread().onConfigurationChange(conf), 127 "Update smaller configuration failed!"); 128 129 // check again after online update 130 assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum()); 131 assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum()); 132 assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum()); 133 } finally { 134 conn.close(); 135 } 136 } 137 138 @Test 139 public void testFlushWithTableCompactionDisabled() throws Exception { 140 TableDescriptor htd = 141 TableDescriptorBuilder.newBuilder(tableName).setCompactionEnabled(false).build(); 142 TEST_UTIL.createTable(htd, new byte[][] { family }, null); 143 144 // load the table 145 for (int i = 0; i < blockingStoreFiles + 1; i++) { 146 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(tableName), family); 147 TEST_UTIL.flush(tableName); 148 } 149 150 // Make sure that store file number is greater than blockingStoreFiles + 1 151 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 152 Collection<String> hfiles = SnapshotTestingUtils.listHFileNames(fs, tableDir); 153 assert (hfiles.size() > blockingStoreFiles + 1); 154 } 155 156 @Test 157 public void testFlushWithRegionReplicas() throws Exception { 158 TableDescriptor htd = 159 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(2).build(); 160 TEST_UTIL.createTable(htd, new byte[][] { family }, null); 161 162 // load the table 163 for (int i = 0; i < blockingStoreFiles + 1; i++) { 164 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(tableName), family); 165 TEST_UTIL.flush(tableName); 166 } 167 168 // One region split should have taken place, because the primary replica gets split, and not the 169 // secondary replica. 170 assertEquals(1, TEST_UTIL.getRSForFirstRegionInTable(tableName).getCompactSplitThread() 171 .getSubmittedSplitsCount()); 172 } 173}