001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellComparatorImpl;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HColumnDescriptor;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HTableDescriptor;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Durability;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.io.hfile.CacheConfig;
057import org.apache.hadoop.hbase.io.hfile.HFile;
058import org.apache.hadoop.hbase.io.hfile.HFileContext;
059import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
060import org.apache.hadoop.hbase.mob.MobConstants;
061import org.apache.hadoop.hbase.mob.MobUtils;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.FSUtils;
065import org.apache.hadoop.hbase.util.Pair;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.BeforeClass;
069import org.junit.ClassRule;
070import org.junit.Rule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.junit.rules.TestName;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/**
078 * Test mob store compaction
079 */
080@Category(MediumTests.class)
081public class TestMobStoreCompaction {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestMobStoreCompaction.class);
086
087  @Rule
088  public TestName name = new TestName();
089  static final Logger LOG = LoggerFactory.getLogger(TestMobStoreCompaction.class.getName());
090  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
091  private Configuration conf = null;
092
093  private HRegion region = null;
094  private HTableDescriptor htd = null;
095  private HColumnDescriptor hcd = null;
096  private long mobCellThreshold = 1000;
097
098  private FileSystem fs;
099
100  private static final byte[] COLUMN_FAMILY = fam1;
101  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
102  private int compactionThreshold;
103
104  @BeforeClass
105  public static void setUpBeforeClass() throws Exception {
106    UTIL.startMiniCluster(1);
107  }
108
109  @AfterClass
110  public static void tearDownAfterClass() throws Exception {
111    UTIL.shutdownMiniCluster();
112  }
113
114  private void init(Configuration conf, long mobThreshold) throws Exception {
115    this.conf = conf;
116    this.mobCellThreshold = mobThreshold;
117    HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
118
119    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
120    htd = UTIL.createTableDescriptor(name.getMethodName());
121    hcd = new HColumnDescriptor(COLUMN_FAMILY);
122    hcd.setMobEnabled(true);
123    hcd.setMobThreshold(mobThreshold);
124    hcd.setMaxVersions(1);
125    htd.modifyFamily(hcd);
126
127    region = UTIL.createLocalHRegion(htd, null, null);
128    fs = FileSystem.get(conf);
129  }
130
131  @After
132  public void tearDown() throws Exception {
133    region.close();
134    fs.delete(UTIL.getDataTestDir(), true);
135  }
136
137  /**
138   * During compaction, cells smaller than the threshold won't be affected.
139   */
140  @Test
141  public void testSmallerValue() throws Exception {
142    init(UTIL.getConfiguration(), 500);
143    byte[] dummyData = makeDummyData(300); // smaller than mob threshold
144    Table loader = new RegionAsTable(region);
145    // one hfile per row
146    for (int i = 0; i < compactionThreshold; i++) {
147      Put p = createPut(i, dummyData);
148      loader.put(p);
149      region.flush(true);
150    }
151    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
152    assertEquals("Before compaction: mob file count", 0, countMobFiles());
153    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
154    assertEquals("Before compaction: mob rows", 0, countMobRows());
155
156    region.compactStores();
157
158    assertEquals("After compaction: store files", 1, countStoreFiles());
159    assertEquals("After compaction: mob file count", 0, countMobFiles());
160    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
161    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
162    assertEquals("After compaction: mob rows", 0, countMobRows());
163  }
164
165  /**
166   * During compaction, the mob threshold size is changed.
167   */
168  @Test
169  public void testLargerValue() throws Exception {
170    init(UTIL.getConfiguration(), 200);
171    byte[] dummyData = makeDummyData(300); // larger than mob threshold
172    Table loader = new RegionAsTable(region);
173    for (int i = 0; i < compactionThreshold; i++) {
174      Put p = createPut(i, dummyData);
175      loader.put(p);
176      region.flush(true);
177    }
178    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
179    assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
180    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
181    assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
182    assertEquals("Before compaction: number of mob cells", compactionThreshold,
183        countMobCellsInMetadata());
184    // Change the threshold larger than the data size
185    setMobThreshold(region, COLUMN_FAMILY, 500);
186    region.initialize();
187    region.compactStores();
188
189    assertEquals("After compaction: store files", 1, countStoreFiles());
190    assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
191    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
192    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
193    assertEquals("After compaction: mob rows", 0, countMobRows());
194  }
195
196  private static HRegion setMobThreshold(HRegion region, byte[] cfName, long modThreshold) {
197    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder
198            .newBuilder(region.getTableDescriptor().getColumnFamily(cfName))
199            .setMobThreshold(modThreshold)
200            .build();
201    TableDescriptor td = TableDescriptorBuilder
202            .newBuilder(region.getTableDescriptor())
203            .removeColumnFamily(cfName)
204            .setColumnFamily(cfd)
205            .build();
206    region.setTableDescriptor(td);
207    return region;
208  }
209
210  /**
211   * This test will first generate store files, then bulk load them and trigger the compaction.
212   * When compaction, the cell value will be larger than the threshold.
213   */
214  @Test
215  public void testMobCompactionWithBulkload() throws Exception {
216    // The following will produce store files of 600.
217    init(UTIL.getConfiguration(), 300);
218    byte[] dummyData = makeDummyData(600);
219
220    Path hbaseRootDir = FSUtils.getRootDir(conf);
221    Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
222    List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
223    for (int i = 0; i < compactionThreshold; i++) {
224      Path hpath = new Path(basedir, "hfile" + i);
225      hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
226      createHFile(hpath, i, dummyData);
227    }
228
229    // The following will bulk load the above generated store files and compact, with 600(fileSize)
230    // > 300(threshold)
231    Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null);
232    assertTrue("Bulkload result:", !map.isEmpty());
233    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
234    assertEquals("Before compaction: mob file count", 0, countMobFiles());
235    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
236    assertEquals("Before compaction: mob rows", 0, countMobRows());
237    assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
238
239    region.compactStores();
240
241    assertEquals("After compaction: store files", 1, countStoreFiles());
242    assertEquals("After compaction: mob file count:", 1, countMobFiles());
243    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
244    assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
245    assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
246    assertEquals("After compaction: number of mob cells", compactionThreshold,
247        countMobCellsInMetadata());
248  }
249
250  @Test
251  public void testMajorCompactionAfterDelete() throws Exception {
252    init(UTIL.getConfiguration(), 100);
253    byte[] dummyData = makeDummyData(200); // larger than mob threshold
254    Table loader = new RegionAsTable(region);
255    // create hfiles and mob hfiles but don't trigger compaction
256    int numHfiles = compactionThreshold - 1;
257    byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
258    for (int i = 0; i < numHfiles; i++) {
259      Put p = createPut(i, dummyData);
260      loader.put(p);
261      region.flush(true);
262    }
263    assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
264    assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
265    assertEquals("Before compaction: rows", numHfiles, UTIL.countRows(region));
266    assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
267    assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
268    // now let's delete some cells that contain mobs
269    Delete delete = new Delete(deleteRow);
270    delete.addFamily(COLUMN_FAMILY);
271    region.delete(delete);
272    region.flush(true);
273
274    assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
275    assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
276    // region.compactStores();
277    region.compact(true);
278    assertEquals("After compaction: store files", 1, countStoreFiles());
279    // still have original mob hfiles and now added a mob del file
280    assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles());
281
282    Scan scan = new Scan();
283    scan.setRaw(true);
284    InternalScanner scanner = region.getScanner(scan);
285    List<Cell> results = new ArrayList<>();
286    scanner.next(results);
287    int deleteCount = 0;
288    while (!results.isEmpty()) {
289      for (Cell c : results) {
290        if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
291          deleteCount++;
292          assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
293        }
294      }
295      results.clear();
296      scanner.next(results);
297    }
298    // assert the delete mark is retained after the major compaction
299    assertEquals(1, deleteCount);
300    scanner.close();
301    // assert the deleted cell is not counted
302    assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1));
303  }
304
305  private int countStoreFiles() throws IOException {
306    HStore store = region.getStore(COLUMN_FAMILY);
307    return store.getStorefilesCount();
308  }
309
310  private int countMobFiles() throws IOException {
311    Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString());
312    if (fs.exists(mobDirPath)) {
313      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
314      return files.length;
315    }
316    return 0;
317  }
318
319  private long countMobCellsInMetadata() throws IOException {
320    long mobCellsCount = 0;
321    Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString());
322    Configuration copyOfConf = new Configuration(conf);
323    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
324    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
325    if (fs.exists(mobDirPath)) {
326      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
327      for (FileStatus file : files) {
328        HStoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true);
329        sf.initReader();
330        Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
331        byte[] count = fileInfo.get(MOB_CELLS_COUNT);
332        assertTrue(count != null);
333        mobCellsCount += Bytes.toLong(count);
334      }
335    }
336    return mobCellsCount;
337  }
338
339  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
340    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
341    p.setDurability(Durability.SKIP_WAL);
342    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
343    return p;
344  }
345
346  /**
347   * Create an HFile with the given number of bytes
348   */
349  private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
350    HFileContext meta = new HFileContextBuilder().build();
351    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
352        .withFileContext(meta).create();
353    long now = System.currentTimeMillis();
354    try {
355      KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
356          Bytes.toBytes("colX"), now, dummyData);
357      writer.append(kv);
358    } finally {
359      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
360      writer.close();
361    }
362  }
363
364  private int countMobRows() throws IOException {
365    Scan scan = new Scan();
366    // Do not retrieve the mob data when scanning
367    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
368    InternalScanner scanner = region.getScanner(scan);
369
370    int scannedCount = 0;
371    List<Cell> results = new ArrayList<>();
372    boolean hasMore = true;
373    while (hasMore) {
374      hasMore = scanner.next(results);
375      for (Cell c : results) {
376        if (MobUtils.isMobReferenceCell(c)) {
377          scannedCount++;
378        }
379      }
380      results.clear();
381    }
382    scanner.close();
383
384    return scannedCount;
385  }
386
387  private byte[] makeDummyData(int size) {
388    byte[] dummyData = new byte[size];
389    new Random().nextBytes(dummyData);
390    return dummyData;
391  }
392
393  private int countReferencedMobFiles() throws IOException {
394    Scan scan = new Scan();
395    // Do not retrieve the mob data when scanning
396    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
397    InternalScanner scanner = region.getScanner(scan);
398
399    List<Cell> kvs = new ArrayList<>();
400    boolean hasMore = true;
401    String fileName;
402    Set<String> files = new HashSet<>();
403    do {
404      kvs.clear();
405      hasMore = scanner.next(kvs);
406      for (Cell kv : kvs) {
407        if (!MobUtils.isMobReferenceCell(kv)) {
408          continue;
409        }
410        if (!MobUtils.hasValidMobRefCellValue(kv)) {
411          continue;
412        }
413        int size = MobUtils.getMobValueLength(kv);
414        if (size <= mobCellThreshold) {
415          continue;
416        }
417        fileName = MobUtils.getMobFileName(kv);
418        if (fileName.isEmpty()) {
419          continue;
420        }
421        files.add(fileName);
422        Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
423            hcd.getNameAsString());
424        assertTrue(fs.exists(new Path(familyPath, fileName)));
425      }
426    } while (hasMore);
427
428    scanner.close();
429
430    return files.size();
431  }
432
433  private int countMobCellsInMobFiles(int expectedNumDelfiles) throws IOException {
434    Configuration copyOfConf = new Configuration(conf);
435    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
436    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
437    Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString());
438    List<HStoreFile> sfs = new ArrayList<>();
439    int numDelfiles = 0;
440    int size = 0;
441    if (fs.exists(mobDirPath)) {
442      for (FileStatus f : fs.listStatus(mobDirPath)) {
443        HStoreFile sf = new HStoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE, true);
444        sfs.add(sf);
445        if (StoreFileInfo.isDelFile(sf.getPath())) {
446          numDelfiles++;
447        }
448      }
449
450      List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
451        false, false, HConstants.LATEST_TIMESTAMP);
452      long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
453      long ttl = HStore.determineTTLFromFamily(hcd);
454      ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, timeToPurgeDeletes,
455        CellComparatorImpl.COMPARATOR);
456      StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_DROP_DELETES, scanners);
457      try {
458        size += UTIL.countRows(scanner);
459      } finally {
460        scanner.close();
461      }
462    }
463    // assert the number of the existing del files
464    assertEquals(expectedNumDelfiles, numDelfiles);
465    return size;
466  }
467}