001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Random;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.CompactionState;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.RegionSplitter;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.Before;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061  * Mob file compaction base test.
062  * 1. Enables batch mode for regular MOB compaction,
063  *    Sets batch size to 7 regions. (Optional)
064  * 2. Disables periodic MOB compactions, sets minimum age to archive to 10 sec
065  * 3. Creates MOB table with 20 regions
066  * 4. Loads MOB data (randomized keys, 1000 rows), flushes data.
067  * 5. Repeats 4. two more times
068  * 6. Verifies that we have 20 *3 = 60 mob files (equals to number of regions x 3)
069  * 7. Runs major MOB compaction.
070  * 8. Verifies that number of MOB files in a mob directory is 20 x4 = 80
071  * 9. Waits for a period of time larger than minimum age to archive
072  * 10. Runs Mob cleaner chore
073  * 11 Verifies that number of MOB files in a mob directory is 20.
074  * 12 Runs scanner and checks all 3 * 1000 rows.
075 */
076@Category(LargeTests.class)
077public class TestMobCompactionWithDefaults {
078  private static final Logger LOG =
079      LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
083
084  protected static HBaseTestingUtil HTU;
085  protected static Configuration conf;
086  protected static long minAgeToArchive = 10000;
087
088  protected final static String famStr = "f1";
089  protected final static byte[] fam = Bytes.toBytes(famStr);
090  protected final static byte[] qualifier = Bytes.toBytes("q1");
091  protected final static long mobLen = 10;
092  protected final static byte[] mobVal = Bytes
093      .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
094
095  @Rule
096  public TestName test = new TestName();
097  protected TableDescriptor tableDescriptor;
098  private ColumnFamilyDescriptor familyDescriptor;
099  protected Admin admin;
100  protected TableName table = null;
101  protected int numRegions = 20;
102  protected int rows = 1000;
103
104  protected MobFileCleanerChore cleanerChore;
105
106  @BeforeClass
107  public static void htuStart() throws Exception {
108    HTU = new HBaseTestingUtil();
109    conf = HTU.getConfiguration();
110    conf.setInt("hfile.format.version", 3);
111    // Disable automatic MOB compaction
112    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
113    // Disable automatic MOB file cleaner chore
114    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
115    // Set minimum age to archive to 10 sec
116    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
117    // Set compacted file discharger interval to a half minAgeToArchive
118    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive/2);
119    conf.setBoolean("hbase.regionserver.compaction.enabled", false);
120    HTU.startMiniCluster();
121  }
122
123  @AfterClass
124  public static void htuStop() throws Exception {
125    HTU.shutdownMiniCluster();
126  }
127
128  @Before
129  public void setUp() throws Exception {
130    admin = HTU.getAdmin();
131    cleanerChore = new MobFileCleanerChore();
132    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
133      .setMobThreshold(mobLen).setMaxVersions(1).build();
134    tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName())
135      .setColumnFamily(familyDescriptor).build();
136    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
137    byte[][] splitKeys = splitAlgo.split(numRegions);
138    table = HTU.createTable(tableDescriptor, splitKeys).getName();
139  }
140
141  private void loadData(TableName tableName, int num) {
142
143    Random r = new Random();
144    LOG.info("Started loading {} rows into {}", num, tableName);
145    try (final Table table = HTU.getConnection().getTable(tableName)) {
146      for (int i = 0; i < num; i++) {
147        byte[] key = new byte[32];
148        r.nextBytes(key);
149        Put p = new Put(key);
150        p.addColumn(fam, qualifier, mobVal);
151        table.put(p);
152      }
153      admin.flush(tableName);
154      LOG.info("Finished loading {} rows into {}", num, tableName);
155    } catch (Exception e) {
156      LOG.error("MOB file compaction chore test FAILED", e);
157      fail("MOB file compaction chore test FAILED");
158    }
159  }
160
161  @After
162  public void tearDown() throws Exception {
163    admin.disableTable(tableDescriptor.getTableName());
164    admin.deleteTable(tableDescriptor.getTableName());
165  }
166
167  @Test
168  public void baseTestMobFileCompaction() throws InterruptedException, IOException {
169    LOG.info("MOB compaction " + description() + " started");
170    loadAndFlushThreeTimes(rows, table, famStr);
171    mobCompact(tableDescriptor, familyDescriptor);
172    assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4,
173        getNumberOfMobFiles(table, famStr));
174    cleanupAndVerifyCounts(table, famStr, 3*rows);
175    LOG.info("MOB compaction " + description() + " finished OK");
176  }
177
178  @Test
179  public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
180    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
181    LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
182    loadAndFlushThreeTimes(rows, table, famStr);
183    LOG.debug("Taking snapshot and cloning table {}", table);
184    admin.snapshot(test.getMethodName(), table);
185    admin.cloneSnapshot(test.getMethodName(), clone);
186    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
187        getNumberOfMobFiles(clone, famStr));
188    mobCompact(admin.getDescriptor(clone), familyDescriptor);
189    assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact",
190        4 * numRegions, getNumberOfMobFiles(clone, famStr));
191    cleanupAndVerifyCounts(clone, famStr, 3*rows);
192    LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
193  }
194
195  @Test
196  public void testMobFileCompactionAfterSnapshotCloneAndFlush() throws InterruptedException,
197      IOException {
198    final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
199    LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
200    loadAndFlushThreeTimes(rows, table, famStr);
201    LOG.debug("Taking snapshot and cloning table {}", table);
202    admin.snapshot(test.getMethodName(), table);
203    admin.cloneSnapshot(test.getMethodName(), clone);
204    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
205        getNumberOfMobFiles(clone, famStr));
206    loadAndFlushThreeTimes(rows, clone, famStr);
207    mobCompact(admin.getDescriptor(clone), familyDescriptor);
208    assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact",
209        7 * numRegions, getNumberOfMobFiles(clone, famStr));
210    cleanupAndVerifyCounts(clone, famStr, 6*rows);
211    LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
212  }
213
214  protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
215      throws IOException {
216    final long start = getNumberOfMobFiles(table, family);
217    // Load and flush data 3 times
218    loadData(table, rows);
219    loadData(table, rows);
220    loadData(table, rows);
221    assertEquals("Should have 3 more mob files per region from flushing.", start +  numRegions * 3,
222        getNumberOfMobFiles(table, family));
223  }
224
225  protected String description() {
226    return "regular mode";
227  }
228
229  protected void enableCompactions() throws IOException {
230    final List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
231          .collect(Collectors.toList());
232    admin.compactionSwitch(true, serverList);
233  }
234
235  protected void disableCompactions() throws IOException {
236    final List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
237          .collect(Collectors.toList());
238    admin.compactionSwitch(false, serverList);
239  }
240
241  /**
242   * compact the given table and return once it is done.
243   * should presume compactions are disabled when called.
244   * should ensure compactions are disabled before returning.
245   */
246  protected void mobCompact(TableDescriptor tableDescriptor,
247      ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
248    LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
249    enableCompactions();
250    mobCompactImpl(tableDescriptor, familyDescriptor);
251    waitUntilCompactionIsComplete(tableDescriptor.getTableName());
252    disableCompactions();
253  }
254
255  /**
256   * Call the API for compaction specific to the test set.
257   * should not wait for compactions to finish.
258   * may assume compactions are enabled when called.
259   */
260  protected void mobCompactImpl(TableDescriptor tableDescriptor,
261      ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
262    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
263  }
264
265  protected void waitUntilCompactionIsComplete(TableName table)
266      throws IOException, InterruptedException {
267    CompactionState state = admin.getCompactionState(table);
268    while (state != CompactionState.NONE) {
269      LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
270      Thread.sleep(100);
271      state = admin.getCompactionState(table);
272    }
273    LOG.debug("done waiting for compaction on {}", table);
274  }
275
276  protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
277      throws InterruptedException, IOException {
278    // We have guarantee, that compacted file discharger will run during this pause
279    // because it has interval less than this wait time
280    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
281
282    Thread.sleep(minAgeToArchive + 1000);
283    LOG.info("Cleaning up MOB files");
284    // Cleanup again
285    cleanerChore.cleanupObsoleteMobFiles(conf, table);
286
287    assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
288        getNumberOfMobFiles(table, family));
289
290    LOG.debug("checking count of rows");
291    long scanned = scanTable(table);
292    assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned);
293
294  }
295
296  protected  long getNumberOfMobFiles(TableName tableName, String family)
297      throws IOException {
298    FileSystem fs = FileSystem.get(conf);
299    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
300    FileStatus[] stat = fs.listStatus(dir);
301    for (FileStatus st : stat) {
302      LOG.debug("MOB Directory content: {}", st.getPath());
303    }
304    LOG.debug("MOB Directory content total files: {}", stat.length);
305
306    return stat.length;
307  }
308
309
310  protected long scanTable(TableName tableName) {
311    try (final Table table = HTU.getConnection().getTable(tableName);
312         final ResultScanner scanner = table.getScanner(fam)) {
313      Result result;
314      long counter = 0;
315      while ((result = scanner.next()) != null) {
316        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
317        counter++;
318      }
319      return counter;
320    } catch (Exception e) {
321      LOG.error("MOB file compaction test FAILED", e);
322      if (HTU != null) {
323        fail(e.getMessage());
324      } else {
325        System.exit(-1);
326      }
327    }
328    return 0;
329  }
330}