001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Random;
027import java.util.concurrent.ThreadLocalRandom;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.CompactionState;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.master.HMaster;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.junit.jupiter.api.AfterAll;
040import org.junit.jupiter.api.BeforeAll;
041import org.junit.jupiter.api.BeforeEach;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.junit.jupiter.api.TestInfo;
045
046/** Unit tests to test retrieving table/region compaction state */
047@Tag(VerySlowRegionServerTests.TAG)
048@Tag(LargeTests.TAG)
049public class TestCompactionState {
050
051  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
052  private String name;
053
054  @BeforeEach
055  public void setTestName(TestInfo testInfo) {
056    this.name = testInfo.getTestMethod().get().getName();
057  }
058
059  @BeforeAll
060  public static void setUpBeforeClass() throws Exception {
061    TEST_UTIL.startMiniCluster();
062  }
063
064  @AfterAll
065  public static void tearDownAfterClass() throws Exception {
066    TEST_UTIL.shutdownMiniCluster();
067  }
068
069  enum StateSource {
070    ADMIN,
071    MASTER
072  }
073
074  @Test
075  public void testMajorCompactionStateFromAdmin() throws IOException, InterruptedException {
076    compaction(name, 8, CompactionState.MAJOR, false, StateSource.ADMIN);
077  }
078
079  @Test
080  public void testMinorCompactionStateFromAdmin() throws IOException, InterruptedException {
081    compaction(name, 15, CompactionState.MINOR, false, StateSource.ADMIN);
082  }
083
084  @Test
085  public void testMajorCompactionOnFamilyStateFromAdmin() throws IOException, InterruptedException {
086    compaction(name, 8, CompactionState.MAJOR, true, StateSource.ADMIN);
087  }
088
089  @Test
090  public void testMinorCompactionOnFamilyStateFromAdmin() throws IOException, InterruptedException {
091    compaction(name, 15, CompactionState.MINOR, true, StateSource.ADMIN);
092  }
093
094  @Test
095  public void testMajorCompactionStateFromMaster() throws IOException, InterruptedException {
096    compaction(name, 8, CompactionState.MAJOR, false, StateSource.MASTER);
097  }
098
099  @Test
100  public void testMinorCompactionStateFromMaster() throws IOException, InterruptedException {
101    compaction(name, 15, CompactionState.MINOR, false, StateSource.MASTER);
102  }
103
104  @Test
105  public void testMajorCompactionOnFamilyStateFromMaster()
106    throws IOException, InterruptedException {
107    compaction(name, 8, CompactionState.MAJOR, true, StateSource.MASTER);
108  }
109
110  @Test
111  public void testMinorCompactionOnFamilyStateFromMaster()
112    throws IOException, InterruptedException {
113    compaction(name, 15, CompactionState.MINOR, true, StateSource.MASTER);
114  }
115
116  @Test
117  public void testInvalidColumnFamily() throws IOException, InterruptedException {
118    final TableName tableName = TableName.valueOf(name);
119    byte[] family = Bytes.toBytes("family");
120    byte[] fakecf = Bytes.toBytes("fakecf");
121    boolean caughtMinorCompact = false;
122    boolean caughtMajorCompact = false;
123    Table ht = null;
124    try {
125      ht = TEST_UTIL.createTable(tableName, family);
126      Admin admin = TEST_UTIL.getAdmin();
127      try {
128        admin.compact(tableName, fakecf);
129      } catch (IOException ioe) {
130        caughtMinorCompact = true;
131      }
132      try {
133        admin.majorCompact(tableName, fakecf);
134      } catch (IOException ioe) {
135        caughtMajorCompact = true;
136      }
137    } finally {
138      if (ht != null) {
139        TEST_UTIL.deleteTable(tableName);
140      }
141      assertTrue(caughtMinorCompact);
142      assertTrue(caughtMajorCompact);
143    }
144  }
145
146  /**
147   * Load data to a table, flush it to disk, trigger compaction, confirm the compaction state is
148   * right and wait till it is done.
149   * @param singleFamily otherwise, run compaction on all cfs
150   * @param stateSource  get the state by Admin or Master
151   */
152  private void compaction(final String tableName, final int flushes,
153    final CompactionState expectedState, boolean singleFamily, StateSource stateSource)
154    throws IOException, InterruptedException {
155    // Create a table with regions
156    TableName table = TableName.valueOf(tableName);
157    byte[] family = Bytes.toBytes("family");
158    byte[][] families =
159      { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
160    Table ht = null;
161    try {
162      ht = TEST_UTIL.createTable(table, families);
163      loadData(ht, families, 3000, flushes);
164      HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
165      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
166      List<HRegion> regions = rs.getRegions(table);
167      int countBefore = countStoreFilesInFamilies(regions, families);
168      int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
169      assertTrue(countBefore > 0); // there should be some data files
170      Admin admin = TEST_UTIL.getAdmin();
171      if (expectedState == CompactionState.MINOR) {
172        if (singleFamily) {
173          admin.compact(table, family);
174        } else {
175          admin.compact(table);
176        }
177      } else {
178        if (singleFamily) {
179          admin.majorCompact(table, family);
180        } else {
181          admin.majorCompact(table);
182        }
183      }
184      long curt = EnvironmentEdgeManager.currentTime();
185      long waitTime = 5000;
186      long endt = curt + waitTime;
187      CompactionState state = getCompactionState(stateSource, master, admin, table);
188      while (state == CompactionState.NONE && curt < endt) {
189        Thread.sleep(10);
190        state = getCompactionState(stateSource, master, admin, table);
191        curt = EnvironmentEdgeManager.currentTime();
192      }
193      // Now, should have the right compaction state,
194      // otherwise, the compaction should have already been done
195      if (expectedState != state) {
196        for (Region region : regions) {
197          state = CompactionState.valueOf(region.getCompactionState().toString());
198          assertEquals(CompactionState.NONE, state);
199        }
200      } else {
201        // Wait until the compaction is done
202        state = getCompactionState(stateSource, master, admin, table);
203        while (state != CompactionState.NONE && curt < endt) {
204          Thread.sleep(10);
205          state = getCompactionState(stateSource, master, admin, table);
206        }
207        // Now, compaction should be done.
208        assertEquals(CompactionState.NONE, state);
209      }
210      int countAfter = countStoreFilesInFamilies(regions, families);
211      int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
212      assertTrue(countAfter < countBefore);
213      if (!singleFamily) {
214        if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
215        else assertTrue(families.length < countAfter);
216      } else {
217        int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
218        // assert only change was to single column family
219        assertTrue(singleFamDiff == (countBefore - countAfter));
220        if (expectedState == CompactionState.MAJOR) {
221          assertTrue(1 == countAfterSingleFamily);
222        } else {
223          assertTrue(1 < countAfterSingleFamily);
224        }
225      }
226    } finally {
227      if (ht != null) {
228        TEST_UTIL.deleteTable(table);
229      }
230    }
231  }
232
233  private static CompactionState getCompactionState(StateSource stateSource, HMaster master,
234    Admin admin, TableName table) throws IOException {
235    CompactionState state = stateSource == StateSource.ADMIN
236      ? admin.getCompactionState(table)
237      : master.getCompactionState(table);
238    return state;
239  }
240
241  private static int countStoreFilesInFamily(List<HRegion> regions, final byte[] family) {
242    return countStoreFilesInFamilies(regions, new byte[][] { family });
243  }
244
245  private static int countStoreFilesInFamilies(List<HRegion> regions, final byte[][] families) {
246    int count = 0;
247    for (HRegion region : regions) {
248      count += region.getStoreFileList(families).size();
249    }
250    return count;
251  }
252
253  private static void loadData(final Table ht, final byte[][] families, final int rows,
254    final int flushes) throws IOException {
255    List<Put> puts = new ArrayList<>(rows);
256    byte[] qualifier = Bytes.toBytes("val");
257    Random rand = ThreadLocalRandom.current();
258    for (int i = 0; i < flushes; i++) {
259      for (int k = 0; k < rows; k++) {
260        byte[] row = Bytes.toBytes(rand.nextLong());
261        Put p = new Put(row);
262        for (int j = 0; j < families.length; ++j) {
263          p.addColumn(families[j], qualifier, row);
264        }
265        puts.add(p);
266      }
267      ht.put(puts);
268      TEST_UTIL.flush();
269      puts.clear();
270    }
271  }
272}