001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UPPER_BOUND;
021import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_THREAD_COUNT;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023
024import java.io.IOException;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.BufferedMutator;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.testclassification.MasterTests;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.junit.jupiter.api.AfterAll;
041import org.junit.jupiter.api.AfterEach;
042import org.junit.jupiter.api.BeforeAll;
043import org.junit.jupiter.api.Tag;
044import org.junit.jupiter.api.Test;
045
046@Tag(MediumTests.TAG)
047@Tag(MasterTests.TAG)
048public class TestExpiredMobFileCleanerChore {
049  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
050  private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner");
051  private final static TableName tableName2 = TableName.valueOf("TestExpiredMobFileCleaner2");
052  private final static String family = "family";
053  private final static byte[] row1 = Bytes.toBytes("row1");
054  private final static byte[] row2 = Bytes.toBytes("row2");
055  private final static byte[] row3 = Bytes.toBytes("row3");
056  private final static byte[] qf = Bytes.toBytes("qf");
057
058  private static BufferedMutator table;
059  private static Admin admin;
060  private static BufferedMutator table2;
061  private static MobFileCleanerChore mobFileCleanerChore;
062
063  @BeforeAll
064  public static void setUp() throws Exception {
065    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
066    TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2);
067    TEST_UTIL.startMiniCluster(1);
068    mobFileCleanerChore = TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore();
069  }
070
071  @AfterEach
072  public void cleanUp() throws IOException {
073    admin.disableTable(tableName);
074    admin.deleteTable(tableName);
075    admin.disableTable(tableName2);
076    admin.deleteTable(tableName2);
077    admin.close();
078  }
079
080  @AfterAll
081  public static void tearDown() throws Exception {
082    TEST_UTIL.shutdownMiniCluster();
083    TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
084  }
085
086  @Test
087  public void testCleanerSingleThread() throws Exception {
088    TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 1);
089    mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration());
090    int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize();
091    assertEquals(1, corePoolSize);
092    testCleanerInternal();
093  }
094
095  @Test
096  public void testCleanerMultiThread() throws Exception {
097    TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_THREAD_COUNT, 2);
098    mobFileCleanerChore.onConfigurationChange(TEST_UTIL.getConfiguration());
099    int corePoolSize = mobFileCleanerChore.getExecutor().getCorePoolSize();
100    assertEquals(2, corePoolSize);
101    testCleanerInternal();
102  }
103
104  private static void init() throws Exception {
105    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
106    ColumnFamilyDescriptor columnFamilyDescriptor =
107      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
108        .setMobThreshold(3L).setMaxVersions(4).build();
109    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
110
111    admin = TEST_UTIL.getAdmin();
112    admin.createTable(tableDescriptorBuilder.build());
113
114    table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
115      .getBufferedMutator(tableName);
116
117    TableDescriptorBuilder tableDescriptorBuilder2 = TableDescriptorBuilder.newBuilder(tableName2);
118    ColumnFamilyDescriptor columnFamilyDescriptor2 =
119      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
120        .setMobThreshold(3L).setMaxVersions(4).build();
121    tableDescriptorBuilder2.setColumnFamily(columnFamilyDescriptor2);
122    admin.createTable(tableDescriptorBuilder2.build());
123
124    table2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
125      .getBufferedMutator(tableName2);
126  }
127
128  private static void modifyColumnExpiryDays(int expireDays) throws Exception {
129
130    // change ttl as expire days to make some row expired
131    int timeToLive = expireDays * secondsOfDay();
132    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder
133      .newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
134    columnFamilyDescriptorBuilder.setTimeToLive(timeToLive);
135
136    admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build());
137
138    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder2 = ColumnFamilyDescriptorBuilder
139      .newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
140    columnFamilyDescriptorBuilder2.setTimeToLive(timeToLive);
141
142    admin.modifyColumnFamily(tableName2, columnFamilyDescriptorBuilder2.build());
143  }
144
145  private static void putKVAndFlush(BufferedMutator table, byte[] row, byte[] value, long ts,
146    TableName tableName) throws Exception {
147
148    Put put = new Put(row, ts);
149    put.addColumn(Bytes.toBytes(family), qf, value);
150    table.mutate(put);
151
152    table.flush();
153    admin.flush(tableName);
154  }
155
156  /**
157   * Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days. Verifies that the
158   * 3 day old hfile is removed but the 1 day one is still present after the expiry based cleaner is
159   * run.
160   */
161  private static void testCleanerInternal() throws Exception {
162    init();
163
164    Path mobDirPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);
165
166    byte[] dummyData = makeDummyData(600);
167    long ts = EnvironmentEdgeManager.currentTime() - 3 * secondsOfDay() * 1000; // 3 days before
168    putKVAndFlush(table, row1, dummyData, ts, tableName);
169    FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
170    // the first mob file
171    assertEquals(1, firstFiles.length, "Before cleanup without delay 1");
172    String firstFile = firstFiles[0].getPath().getName();
173
174    // 1.5 day before
175    ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000);
176    putKVAndFlush(table, row2, dummyData, ts, tableName);
177    FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
178    // now there are 2 mob files
179    assertEquals(2, secondFiles.length, "Before cleanup without delay 2");
180    String f1 = secondFiles[0].getPath().getName();
181    String f2 = secondFiles[1].getPath().getName();
182    String secondFile = f1.equals(firstFile) ? f2 : f1;
183
184    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
185    putKVAndFlush(table, row3, dummyData, ts, tableName);
186    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
187    putKVAndFlush(table, row3, dummyData, ts, tableName);
188    FileStatus[] thirdFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
189    // now there are 4 mob files
190    assertEquals(4, thirdFiles.length, "Before cleanup without delay 3");
191
192    // modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
193
194    // for table 2
195    Path mobDirPath2 = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName2, family);
196
197    byte[] dummyData2 = makeDummyData(600);
198
199    putKVAndFlush(table2, row1, dummyData2, ts, tableName2);
200    FileStatus[] firstFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
201    // the first mob file
202    assertEquals(1, firstFiles2.length, "Before cleanup without delay 1");
203    String firstFile2 = firstFiles2[0].getPath().getName();
204
205    // 1.5 day before
206    ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000);
207    putKVAndFlush(table2, row2, dummyData2, ts, tableName2);
208    FileStatus[] secondFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
209    // now there are 2 mob files
210    assertEquals(2, secondFiles2.length, "Before cleanup without delay 2");
211    String f1Second = secondFiles2[0].getPath().getName();
212    String f2Second = secondFiles2[1].getPath().getName();
213    String secondFile2 = f1Second.equals(firstFile2) ? f2Second : f1Second;
214    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
215    putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
216    ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
217    putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
218    FileStatus[] thirdFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
219    // now there are 4 mob files
220    assertEquals(4, thirdFiles2.length, "Before cleanup without delay 3");
221
222    modifyColumnExpiryDays(2); // ttl = 2, make the first row expired
223
224    // run the cleaner chore
225    mobFileCleanerChore.chore();
226
227    FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
228    String lastFile = filesAfterClean[0].getPath().getName();
229    // there are 4 mob files in total, but only 3 need to be cleaned
230    assertEquals(1, filesAfterClean.length, "After cleanup without delay 1");
231    assertEquals(secondFile, lastFile, "After cleanup without delay 2");
232
233    filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
234    lastFile = filesAfterClean[0].getPath().getName();
235    // there are 4 mob files in total, but only 3 need to be cleaned
236    assertEquals(1, filesAfterClean.length, "After cleanup without delay 1");
237    assertEquals(secondFile2, lastFile, "After cleanup without delay 2");
238  }
239
240  private static int secondsOfDay() {
241    return 24 * 3600;
242  }
243
244  private static byte[] makeDummyData(int size) {
245    byte[] dummyData = new byte[size];
246    Bytes.random(dummyData);
247    return dummyData;
248  }
249}