001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.stream.Collectors;
028import java.util.stream.Stream;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.CompactionState;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.regionserver.HStore;
048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
049import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.RegionSplitter;
054import org.junit.jupiter.api.AfterEach;
055import org.junit.jupiter.api.BeforeEach;
056import org.junit.jupiter.api.Tag;
057import org.junit.jupiter.api.TestInfo;
058import org.junit.jupiter.api.TestTemplate;
059import org.junit.jupiter.params.provider.Arguments;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size
065 * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10
066 * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
067 * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
068 * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
069 * mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
070 * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
071 * scanner and checks all 3 * 1000 rows.
072 */
073@Tag(LargeTests.TAG)
074@HBaseParameterizedTestTemplate(name = "{index}: useFileBasedSFT={0}")
075public class TestMobCompactionWithDefaults {
076  private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
077
078  protected HBaseTestingUtil HTU;
079  protected static Configuration conf;
080  protected static long minAgeToArchive = 10000;
081
082  protected final static String famStr = "f1";
083  protected final static byte[] fam = Bytes.toBytes(famStr);
084  protected final static byte[] qualifier = Bytes.toBytes("q1");
085  protected final static long mobLen = 10;
086  protected final static byte[] mobVal = Bytes
087    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
088
089  private String testMethodName;
090  protected TableDescriptor tableDescriptor;
091  private ColumnFamilyDescriptor familyDescriptor;
092  protected Admin admin;
093  protected TableName table = null;
094  protected int numRegions = 20;
095  protected int rows = 1000;
096
097  protected Boolean useFileBasedSFT;
098
099  public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
100    this.useFileBasedSFT = useFileBasedSFT;
101  }
102
103  public static Stream<Arguments> parameters() {
104    return Stream.of(false, true).map(Arguments::of);
105  }
106
107  protected void htuStart() throws Exception {
108    HTU = new HBaseTestingUtil();
109    conf = HTU.getConfiguration();
110    conf.setInt("hfile.format.version", 3);
111    // Disable automatic MOB compaction
112    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
113    // Disable automatic MOB file cleaner chore
114    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
115    // Set minimum age to archive to 10 sec
116    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
117    // Set compacted file discharger interval to a half minAgeToArchive
118    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
119    conf.setBoolean("hbase.regionserver.compaction.enabled", false);
120    if (useFileBasedSFT) {
121      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
122        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
123    }
124    additonalConfigSetup();
125    HTU.startMiniCluster();
126  }
127
128  protected void additonalConfigSetup() {
129  }
130
131  @BeforeEach
132  public void setUp(TestInfo testInfo) throws Exception {
133    testMethodName = testInfo.getTestMethod().get().getName()
134      + testInfo.getDisplayName().replaceAll("[:= ]", "_").replaceAll("_+", "_").trim();
135    htuStart();
136    admin = HTU.getAdmin();
137    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
138      .setMobThreshold(mobLen).setMaxVersions(1).build();
139    tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(testMethodName))
140      .setColumnFamily(familyDescriptor).build();
141    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
142    byte[][] splitKeys = splitAlgo.split(numRegions);
143    table = HTU.createTable(tableDescriptor, splitKeys).getName();
144  }
145
146  private void loadData(TableName tableName, int num) {
147    LOG.info("Started loading {} rows into {}", num, tableName);
148    try (final Table table = HTU.getConnection().getTable(tableName)) {
149      for (int i = 0; i < num; i++) {
150        byte[] key = new byte[32];
151        Bytes.random(key);
152        Put p = new Put(key);
153        p.addColumn(fam, qualifier, mobVal);
154        table.put(p);
155      }
156      admin.flush(tableName);
157      LOG.info("Finished loading {} rows into {}", num, tableName);
158    } catch (Exception e) {
159      LOG.error("MOB file compaction chore test FAILED", e);
160      fail("MOB file compaction chore test FAILED");
161    }
162  }
163
164  @AfterEach
165  public void tearDown() throws Exception {
166    admin.disableTable(tableDescriptor.getTableName());
167    admin.deleteTable(tableDescriptor.getTableName());
168    HTU.shutdownMiniCluster();
169  }
170
171  @TestTemplate
172  public void baseTestMobFileCompaction() throws InterruptedException, IOException {
173    LOG.info("MOB compaction " + description() + " started");
174    loadAndFlushThreeTimes(rows, table, famStr);
175    mobCompact(tableDescriptor, familyDescriptor);
176    long filesAfterCompaction =
177      getNumberOfFilesInMobFamilyDir(tableDescriptor, familyDescriptor, famStr);
178    LOG.info("Files after compaction: {}", filesAfterCompaction);
179
180    assertEquals(numRegions * 4,
181      getNumberOfFilesInMobFamilyDir(tableDescriptor,
182        tableDescriptor.getColumnFamily(famStr.getBytes()), famStr),
183      "Should have 4 MOB files per region due to 3xflush + compaction.");
184    cleanupAndVerifyCounts(table, famStr, 3 * rows);
185    LOG.info("MOB compaction " + description() + " finished OK");
186  }
187
188  @TestTemplate
189  public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
190    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(testMethodName) + "-clone");
191    LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
192    loadAndFlushThreeTimes(rows, table, famStr);
193    LOG.debug("Taking snapshot and cloning table {}", table);
194    admin.snapshot(TestMobUtils.getTableName(testMethodName), table);
195    admin.cloneSnapshot(TestMobUtils.getTableName(testMethodName), clone);
196    assertEquals(3 * numRegions,
197      getNumberOfStoreFiles(admin.getDescriptor(clone),
198        admin.getDescriptor(clone).getColumnFamily(famStr.getBytes()), famStr),
199      "Should have 3 hlinks per region in MOB area from snapshot clone");
200    mobCompact(admin.getDescriptor(clone),
201      admin.getDescriptor(clone).getColumnFamily(famStr.getBytes()));
202    assertEquals(4 * numRegions,
203      getNumberOfFilesInMobFamilyDir(admin.getDescriptor(clone),
204        admin.getDescriptor(clone).getColumnFamily(famStr.getBytes()), famStr),
205      "Should have 3 hlinks + 1 MOB file per region due to clone + compact");
206    cleanupAndVerifyCounts(clone, famStr, 3 * rows);
207    LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
208  }
209
210  protected long getNumberOfFilesInMobFamilyDir(TableDescriptor descriptor,
211    ColumnFamilyDescriptor familyDesc, String family) throws IOException {
212    FileSystem fs = FileSystem.get(conf);
213    Path dir = MobUtils.getMobFamilyPath(conf, descriptor.getTableName(), family);
214    FileStatus[] stat = fs.listStatus(dir);
215    for (FileStatus st : stat) {
216      LOG.debug("MOB Directory content: {}", st.getPath());
217    }
218    LOG.debug("MOB Directory content total files: {}", stat.length);
219    return stat.length;
220  }
221
222  @TestTemplate
223  public void testMobFileCompactionAfterSnapshotCloneAndFlush()
224    throws InterruptedException, IOException {
225    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(testMethodName) + "-clone");
226    LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
227    loadAndFlushThreeTimes(rows, table, famStr);
228    LOG.debug("Taking snapshot and cloning table {}", table);
229    admin.snapshot(TestMobUtils.getTableName(testMethodName), table);
230    admin.cloneSnapshot(TestMobUtils.getTableName(testMethodName), clone);
231    assertEquals(3 * numRegions,
232      getNumberOfStoreFiles(admin.getDescriptor(clone),
233        admin.getDescriptor(clone).getColumnFamily(famStr.getBytes()), famStr),
234      "Should have 3 hlinks per region in MOB area from snapshot clone");
235    loadAndFlushThreeTimes(rows, clone, famStr);
236    mobCompact(admin.getDescriptor(clone),
237      admin.getDescriptor(clone).getColumnFamily(famStr.getBytes()));
238    CommonFSUtils.logFileSystemState(FileSystem.get(conf), HTU.getDefaultRootDirPath(), LOG);
239    assertEquals(7 * numRegions,
240      getNumberOfFilesInMobFamilyDir(admin.getDescriptor(clone),
241        admin.getDescriptor(clone).getColumnFamily(famStr.getBytes()), famStr),
242      "Should have 7 MOB file per region due to clone + 3xflush + compact");
243    cleanupAndVerifyCounts(clone, famStr, 6 * rows);
244    LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
245  }
246
247  protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
248    throws IOException {
249    final long start = getNumberOfFilesInMobFamilyDir(admin.getDescriptor(table),
250      admin.getDescriptor(table).getColumnFamily(family.getBytes()), family);
251    // Load and flush data 3 times
252    loadData(table, rows);
253    loadData(table, rows);
254    loadData(table, rows);
255    assertEquals(start + numRegions * 3,
256      getNumberOfFilesInMobFamilyDir(admin.getDescriptor(table),
257        admin.getDescriptor(table).getColumnFamily(family.getBytes()), family),
258      "Should have 3 more mob files per region from flushing.");
259  }
260
261  protected String description() {
262    return "regular mode";
263  }
264
265  protected void enableCompactions() throws IOException {
266    final List<String> serverList =
267      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
268    admin.compactionSwitch(true, serverList);
269  }
270
271  protected void disableCompactions() throws IOException {
272    final List<String> serverList =
273      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
274    admin.compactionSwitch(false, serverList);
275  }
276
277  /**
278   * compact the given table and return once it is done. should presume compactions are disabled
279   * when called. should ensure compactions are disabled before returning.
280   */
281  protected void mobCompact(TableDescriptor tableDescriptor,
282    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
283    LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
284    enableCompactions();
285    mobCompactImpl(tableDescriptor, familyDescriptor);
286    waitUntilCompactionIsComplete(tableDescriptor.getTableName());
287    disableCompactions();
288  }
289
290  /**
291   * Call the API for compaction specific to the test set. should not wait for compactions to
292   * finish. may assume compactions are enabled when called.
293   */
294  protected void mobCompactImpl(TableDescriptor tableDescriptor,
295    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
296    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
297  }
298
299  protected void waitUntilCompactionIsComplete(TableName table)
300    throws IOException, InterruptedException {
301    CompactionState state = admin.getCompactionState(table);
302    while (state != CompactionState.NONE) {
303      LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
304      Thread.sleep(100);
305      state = admin.getCompactionState(table);
306    }
307    LOG.debug("done waiting for compaction on {}", table);
308  }
309
310  protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
311    throws InterruptedException, IOException {
312    // We have guarantee, that compacted file discharger will run during this pause
313    // because it has interval less than this wait time
314    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
315
316    Thread.sleep(minAgeToArchive + 1000);
317    LOG.info("Cleaning up MOB files");
318
319    // run cleaner chore on each RS
320    for (ServerName sn : admin.getRegionServers()) {
321      HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
322    }
323
324    assertEquals(numRegions,
325      getNumberOfFilesInMobFamilyDir(admin.getDescriptor(table),
326        admin.getDescriptor(table).getColumnFamily(family.getBytes()), family),
327      "After cleaning, we should have 1 MOB file per region based on size.");
328
329    LOG.debug("checking count of rows");
330    long scanned = scanTable(table);
331    assertEquals(rows, scanned, "Got the wrong number of rows in table " + table + " cf " + family);
332
333  }
334
335  protected long scanTable(TableName tableName) {
336    try (final Table table = HTU.getConnection().getTable(tableName);
337      final ResultScanner scanner = table.getScanner(fam)) {
338      Result result;
339      long counter = 0;
340      while ((result = scanner.next()) != null) {
341        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
342        counter++;
343      }
344      return counter;
345    } catch (Exception e) {
346      LOG.error("MOB file compaction test FAILED", e);
347      if (HTU != null) {
348        fail(e.getMessage());
349      } else {
350        System.exit(-1);
351      }
352    }
353    return 0;
354  }
355
356  protected long getNumberOfStoreFiles(TableDescriptor descriptor,
357    ColumnFamilyDescriptor familyDesc, String family) throws IOException {
358    List<HRegion> regions = HTU.getHBaseCluster().getRegions(descriptor.getTableName());
359    long totalFiles = 0;
360    for (HRegion region : regions) {
361      HStore store = region.getStore(familyDesc.getName());
362      // This counts regular region files (with MOB references) via SFT
363      StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, store.getStoreContext());
364      totalFiles += sft.load().size();
365    }
366    return totalFiles;
367  }
368}