001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.List;
028import java.util.Random;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.CompactionState;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.RegionSplitter;
052import org.junit.After;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.junit.runner.RunWith;
060import org.junit.runners.Parameterized;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size
066 * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10
067 * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
068 * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
069 * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
070 * mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
071 * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
072 * scanner and checks all 3 * 1000 rows.
073 */
074@RunWith(Parameterized.class)
075@Category(LargeTests.class)
076public class TestMobCompactionWithDefaults {
077  private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
081
082  protected static HBaseTestingUtility HTU;
083  protected static Configuration conf;
084  protected static long minAgeToArchive = 10000;
085
086  protected final static String famStr = "f1";
087  protected final static byte[] fam = Bytes.toBytes(famStr);
088  protected final static byte[] qualifier = Bytes.toBytes("q1");
089  protected final static long mobLen = 10;
090  protected final static byte[] mobVal = Bytes
091    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
092
093  @Rule
094  public TestName test = new TestName();
095  protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
096  protected ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor;
097  protected Admin admin;
098  protected TableName table = null;
099  protected int numRegions = 20;
100  protected int rows = 1000;
101
102  protected MobFileCleanerChore cleanerChore;
103
104  protected Boolean useFileBasedSFT;
105
106  public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
107    this.useFileBasedSFT = useFileBasedSFT;
108  }
109
110  @Parameterized.Parameters
111  public static Collection<Boolean> data() {
112    Boolean[] data = { false, true };
113    return Arrays.asList(data);
114  }
115
116  protected void htuStart() throws Exception {
117    HTU = new HBaseTestingUtility();
118    conf = HTU.getConfiguration();
119    conf.setInt("hfile.format.version", 3);
120    // Disable automatic MOB compaction
121    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
122    // Disable automatic MOB file cleaner chore
123    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
124    // Set minimum age to archive to 10 sec
125    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
126    // Set compacted file discharger interval to a half minAgeToArchive
127    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
128    conf.setBoolean("hbase.regionserver.compaction.enabled", false);
129    if (useFileBasedSFT) {
130      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
131        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
132    }
133    additonalConfigSetup();
134    HTU.startMiniCluster();
135  }
136
137  protected void additonalConfigSetup() {
138  }
139
140  @Before
141  public void setUp() throws Exception {
142    htuStart();
143    tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test));
144    admin = HTU.getAdmin();
145    cleanerChore = new MobFileCleanerChore();
146    familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
147    familyDescriptor.setMobEnabled(true);
148    familyDescriptor.setMobThreshold(mobLen);
149    familyDescriptor.setMaxVersions(1);
150    tableDescriptor.setColumnFamily(familyDescriptor);
151    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
152    byte[][] splitKeys = splitAlgo.split(numRegions);
153    table = HTU.createTable(tableDescriptor, splitKeys).getName();
154  }
155
156  private void loadData(TableName tableName, int num) {
157
158    Random r = new Random();
159    LOG.info("Started loading {} rows into {}", num, tableName);
160    try (final Table table = HTU.getConnection().getTable(tableName)) {
161      for (int i = 0; i < num; i++) {
162        byte[] key = new byte[32];
163        r.nextBytes(key);
164        Put p = new Put(key);
165        p.addColumn(fam, qualifier, mobVal);
166        table.put(p);
167      }
168      admin.flush(tableName);
169      LOG.info("Finished loading {} rows into {}", num, tableName);
170    } catch (Exception e) {
171      LOG.error("MOB file compaction chore test FAILED", e);
172      fail("MOB file compaction chore test FAILED");
173    }
174  }
175
176  @After
177  public void tearDown() throws Exception {
178    admin.disableTable(tableDescriptor.getTableName());
179    admin.deleteTable(tableDescriptor.getTableName());
180    HTU.shutdownMiniCluster();
181  }
182
183  @Test
184  public void baseTestMobFileCompaction() throws InterruptedException, IOException {
185    LOG.info("MOB compaction " + description() + " started");
186    loadAndFlushThreeTimes(rows, table, famStr);
187    mobCompact(tableDescriptor, familyDescriptor);
188    assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4,
189      getNumberOfMobFiles(table, famStr));
190    cleanupAndVerifyCounts(table, famStr, 3 * rows);
191    LOG.info("MOB compaction " + description() + " finished OK");
192  }
193
194  @Test
195  public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
196    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
197    LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
198    loadAndFlushThreeTimes(rows, table, famStr);
199    LOG.debug("Taking snapshot and cloning table {}", table);
200    admin.snapshot(TestMobUtils.getTableName(test), table);
201    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
202    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
203      getNumberOfMobFiles(clone, famStr));
204    mobCompact(admin.getDescriptor(clone), familyDescriptor);
205    assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact",
206      4 * numRegions, getNumberOfMobFiles(clone, famStr));
207    cleanupAndVerifyCounts(clone, famStr, 3 * rows);
208    LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
209  }
210
211  @Test
212  public void testMobFileCompactionAfterSnapshotCloneAndFlush()
213    throws InterruptedException, IOException {
214    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
215    LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
216    loadAndFlushThreeTimes(rows, table, famStr);
217    LOG.debug("Taking snapshot and cloning table {}", table);
218    admin.snapshot(TestMobUtils.getTableName(test), table);
219    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
220    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
221      getNumberOfMobFiles(clone, famStr));
222    loadAndFlushThreeTimes(rows, clone, famStr);
223    mobCompact(admin.getDescriptor(clone), familyDescriptor);
224    assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact",
225      7 * numRegions, getNumberOfMobFiles(clone, famStr));
226    cleanupAndVerifyCounts(clone, famStr, 6 * rows);
227    LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
228  }
229
230  protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
231    throws IOException {
232    final long start = getNumberOfMobFiles(table, family);
233    // Load and flush data 3 times
234    loadData(table, rows);
235    loadData(table, rows);
236    loadData(table, rows);
237    assertEquals("Should have 3 more mob files per region from flushing.", start + numRegions * 3,
238      getNumberOfMobFiles(table, family));
239  }
240
241  protected String description() {
242    return "regular mode";
243  }
244
245  protected void enableCompactions() throws IOException {
246    final List<String> serverList =
247      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
248    admin.compactionSwitch(true, serverList);
249  }
250
251  protected void disableCompactions() throws IOException {
252    final List<String> serverList =
253      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
254    admin.compactionSwitch(false, serverList);
255  }
256
257  /**
258   * compact the given table and return once it is done. should presume compactions are disabled
259   * when called. should ensure compactions are disabled before returning.
260   */
261  protected void mobCompact(TableDescriptor tableDescriptor,
262    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
263    LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
264    enableCompactions();
265    mobCompactImpl(tableDescriptor, familyDescriptor);
266    waitUntilCompactionIsComplete(tableDescriptor.getTableName());
267    disableCompactions();
268  }
269
270  /**
271   * Call the API for compaction specific to the test set. should not wait for compactions to
272   * finish. may assume compactions are enabled when called.
273   */
274  protected void mobCompactImpl(TableDescriptor tableDescriptor,
275    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
276    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
277  }
278
279  protected void waitUntilCompactionIsComplete(TableName table)
280    throws IOException, InterruptedException {
281    CompactionState state = admin.getCompactionState(table);
282    while (state != CompactionState.NONE) {
283      LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
284      Thread.sleep(100);
285      state = admin.getCompactionState(table);
286    }
287    LOG.debug("done waiting for compaction on {}", table);
288  }
289
290  protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
291    throws InterruptedException, IOException {
292    // We have guarantee, that compacted file discharger will run during this pause
293    // because it has interval less than this wait time
294    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
295
296    Thread.sleep(minAgeToArchive + 1000);
297    LOG.info("Cleaning up MOB files");
298
299    // run cleaner chore on each RS
300    for (ServerName sn : admin.getRegionServers()) {
301      HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
302    }
303
304    assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
305      getNumberOfMobFiles(table, family));
306
307    LOG.debug("checking count of rows");
308    long scanned = scanTable(table);
309    assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned);
310
311  }
312
313  protected long getNumberOfMobFiles(TableName tableName, String family) throws IOException {
314    FileSystem fs = FileSystem.get(conf);
315    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
316    FileStatus[] stat = fs.listStatus(dir);
317    for (FileStatus st : stat) {
318      LOG.debug("MOB Directory content: {}", st.getPath());
319    }
320    LOG.debug("MOB Directory content total files: {}", stat.length);
321
322    return stat.length;
323  }
324
325  protected long scanTable(TableName tableName) {
326    try (final Table table = HTU.getConnection().getTable(tableName);
327      final ResultScanner scanner = table.getScanner(fam)) {
328      Result result;
329      long counter = 0;
330      while ((result = scanner.next()) != null) {
331        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
332        counter++;
333      }
334      return counter;
335    } catch (Exception e) {
336      LOG.error("MOB file compaction test FAILED", e);
337      if (HTU != null) {
338        fail(e.getMessage());
339      } else {
340        System.exit(-1);
341      }
342    }
343    return 0;
344  }
345}