001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellComparatorImpl;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HTableDescriptor;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Durability;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionInfoBuilder;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.io.hfile.CacheConfig;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.io.hfile.HFileContext;
062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
063import org.apache.hadoop.hbase.mob.MobConstants;
064import org.apache.hadoop.hbase.mob.MobFileCache;
065import org.apache.hadoop.hbase.mob.MobUtils;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.FSUtils;
069import org.apache.hadoop.hbase.util.Pair;
070import org.junit.After;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * Test mob store compaction
081 */
082@Category(MediumTests.class)
083public class TestMobStoreCompaction {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestMobStoreCompaction.class);
088
089  @Rule
090  public TestName name = new TestName();
091  static final Logger LOG = LoggerFactory.getLogger(TestMobStoreCompaction.class.getName());
092  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
093  private Configuration conf = null;
094
095  private HRegion region = null;
096  private HTableDescriptor htd = null;
097  private HColumnDescriptor hcd = null;
098  private long mobCellThreshold = 1000;
099
100  private FileSystem fs;
101
102  private static final byte[] COLUMN_FAMILY = fam1;
103  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
104  private int compactionThreshold;
105
106  private void init(Configuration conf, long mobThreshold) throws Exception {
107    this.conf = conf;
108    this.mobCellThreshold = mobThreshold;
109    HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
110
111    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
112    htd = UTIL.createTableDescriptor(name.getMethodName());
113    hcd = new HColumnDescriptor(COLUMN_FAMILY);
114    hcd.setMobEnabled(true);
115    hcd.setMobThreshold(mobThreshold);
116    hcd.setMaxVersions(1);
117    htd.modifyFamily(hcd);
118
119    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
120    region = HBaseTestingUtility
121        .createRegionAndWAL(regionInfo, UTIL.getDataTestDir(), conf, htd, new MobFileCache(conf));
122    fs = FileSystem.get(conf);
123  }
124
125  @After
126  public void tearDown() throws Exception {
127    region.close();
128    fs.delete(UTIL.getDataTestDir(), true);
129  }
130
131  /**
132   * During compaction, cells smaller than the threshold won't be affected.
133   */
134  @Test
135  public void testSmallerValue() throws Exception {
136    init(UTIL.getConfiguration(), 500);
137    byte[] dummyData = makeDummyData(300); // smaller than mob threshold
138    Table loader = new RegionAsTable(region);
139    // one hfile per row
140    for (int i = 0; i < compactionThreshold; i++) {
141      Put p = createPut(i, dummyData);
142      loader.put(p);
143      region.flush(true);
144    }
145    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
146    assertEquals("Before compaction: mob file count", 0, countMobFiles());
147    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
148    assertEquals("Before compaction: mob rows", 0, countMobRows());
149
150    region.compactStores();
151
152    assertEquals("After compaction: store files", 1, countStoreFiles());
153    assertEquals("After compaction: mob file count", 0, countMobFiles());
154    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
155    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
156    assertEquals("After compaction: mob rows", 0, countMobRows());
157  }
158
159  /**
160   * During compaction, the mob threshold size is changed.
161   */
162  @Test
163  public void testLargerValue() throws Exception {
164    init(UTIL.getConfiguration(), 200);
165    byte[] dummyData = makeDummyData(300); // larger than mob threshold
166    Table loader = new RegionAsTable(region);
167    for (int i = 0; i < compactionThreshold; i++) {
168      Put p = createPut(i, dummyData);
169      loader.put(p);
170      region.flush(true);
171    }
172    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
173    assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
174    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
175    assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
176    assertEquals("Before compaction: number of mob cells", compactionThreshold,
177        countMobCellsInMetadata());
178    // Change the threshold larger than the data size
179    setMobThreshold(region, COLUMN_FAMILY, 500);
180    region.initialize();
181    region.compactStores();
182
183    assertEquals("After compaction: store files", 1, countStoreFiles());
184    assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
185    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
186    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
187    assertEquals("After compaction: mob rows", 0, countMobRows());
188  }
189
190  private static HRegion setMobThreshold(HRegion region, byte[] cfName, long modThreshold) {
191    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder
192            .newBuilder(region.getTableDescriptor().getColumnFamily(cfName))
193            .setMobThreshold(modThreshold)
194            .build();
195    TableDescriptor td = TableDescriptorBuilder
196            .newBuilder(region.getTableDescriptor())
197            .removeColumnFamily(cfName)
198            .setColumnFamily(cfd)
199            .build();
200    region.setTableDescriptor(td);
201    return region;
202  }
203
204  /**
205   * This test will first generate store files, then bulk load them and trigger the compaction.
206   * When compaction, the cell value will be larger than the threshold.
207   */
208  @Test
209  public void testMobCompactionWithBulkload() throws Exception {
210    // The following will produce store files of 600.
211    init(UTIL.getConfiguration(), 300);
212    byte[] dummyData = makeDummyData(600);
213
214    Path hbaseRootDir = FSUtils.getRootDir(conf);
215    Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
216    List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
217    for (int i = 0; i < compactionThreshold; i++) {
218      Path hpath = new Path(basedir, "hfile" + i);
219      hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
220      createHFile(hpath, i, dummyData);
221    }
222
223    // The following will bulk load the above generated store files and compact, with 600(fileSize)
224    // > 300(threshold)
225    Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null);
226    assertTrue("Bulkload result:", !map.isEmpty());
227    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
228    assertEquals("Before compaction: mob file count", 0, countMobFiles());
229    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
230    assertEquals("Before compaction: mob rows", 0, countMobRows());
231    assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
232
233    region.compactStores();
234
235    assertEquals("After compaction: store files", 1, countStoreFiles());
236    assertEquals("After compaction: mob file count:", 1, countMobFiles());
237    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
238    assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
239    assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
240    assertEquals("After compaction: number of mob cells", compactionThreshold,
241        countMobCellsInMetadata());
242  }
243
244  @Test
245  public void testMajorCompactionAfterDelete() throws Exception {
246    init(UTIL.getConfiguration(), 100);
247    byte[] dummyData = makeDummyData(200); // larger than mob threshold
248    Table loader = new RegionAsTable(region);
249    // create hfiles and mob hfiles but don't trigger compaction
250    int numHfiles = compactionThreshold - 1;
251    byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
252    for (int i = 0; i < numHfiles; i++) {
253      Put p = createPut(i, dummyData);
254      loader.put(p);
255      region.flush(true);
256    }
257    assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
258    assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
259    assertEquals("Before compaction: rows", numHfiles, UTIL.countRows(region));
260    assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
261    assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
262    // now let's delete some cells that contain mobs
263    Delete delete = new Delete(deleteRow);
264    delete.addFamily(COLUMN_FAMILY);
265    region.delete(delete);
266    region.flush(true);
267
268    assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
269    assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
270    // region.compactStores();
271    region.compact(true);
272    assertEquals("After compaction: store files", 1, countStoreFiles());
273    // still have original mob hfiles and now added a mob del file
274    assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles());
275
276    Scan scan = new Scan();
277    scan.setRaw(true);
278    InternalScanner scanner = region.getScanner(scan);
279    List<Cell> results = new ArrayList<>();
280    scanner.next(results);
281    int deleteCount = 0;
282    while (!results.isEmpty()) {
283      for (Cell c : results) {
284        if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
285          deleteCount++;
286          assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
287        }
288      }
289      results.clear();
290      scanner.next(results);
291    }
292    // assert the delete mark is retained after the major compaction
293    assertEquals(1, deleteCount);
294    scanner.close();
295    // assert the deleted cell is not counted
296    assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1));
297  }
298
299  private int countStoreFiles() throws IOException {
300    HStore store = region.getStore(COLUMN_FAMILY);
301    return store.getStorefilesCount();
302  }
303
304  private int countMobFiles() throws IOException {
305    Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString());
306    if (fs.exists(mobDirPath)) {
307      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
308      return files.length;
309    }
310    return 0;
311  }
312
313  private long countMobCellsInMetadata() throws IOException {
314    long mobCellsCount = 0;
315    Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString());
316    Configuration copyOfConf = new Configuration(conf);
317    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
318    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
319    if (fs.exists(mobDirPath)) {
320      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
321      for (FileStatus file : files) {
322        HStoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true);
323        sf.initReader();
324        Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
325        byte[] count = fileInfo.get(MOB_CELLS_COUNT);
326        assertTrue(count != null);
327        mobCellsCount += Bytes.toLong(count);
328      }
329    }
330    return mobCellsCount;
331  }
332
333  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
334    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
335    p.setDurability(Durability.SKIP_WAL);
336    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
337    return p;
338  }
339
340  /**
341   * Create an HFile with the given number of bytes
342   */
343  private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
344    HFileContext meta = new HFileContextBuilder().build();
345    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
346        .withFileContext(meta).create();
347    long now = System.currentTimeMillis();
348    try {
349      KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
350          Bytes.toBytes("colX"), now, dummyData);
351      writer.append(kv);
352    } finally {
353      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
354      writer.close();
355    }
356  }
357
358  private int countMobRows() throws IOException {
359    Scan scan = new Scan();
360    // Do not retrieve the mob data when scanning
361    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
362    InternalScanner scanner = region.getScanner(scan);
363
364    int scannedCount = 0;
365    List<Cell> results = new ArrayList<>();
366    boolean hasMore = true;
367    while (hasMore) {
368      hasMore = scanner.next(results);
369      for (Cell c : results) {
370        if (MobUtils.isMobReferenceCell(c)) {
371          scannedCount++;
372        }
373      }
374      results.clear();
375    }
376    scanner.close();
377
378    return scannedCount;
379  }
380
381  private byte[] makeDummyData(int size) {
382    byte[] dummyData = new byte[size];
383    new Random().nextBytes(dummyData);
384    return dummyData;
385  }
386
387  private int countReferencedMobFiles() throws IOException {
388    Scan scan = new Scan();
389    // Do not retrieve the mob data when scanning
390    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
391    InternalScanner scanner = region.getScanner(scan);
392
393    List<Cell> kvs = new ArrayList<>();
394    boolean hasMore = true;
395    String fileName;
396    Set<String> files = new HashSet<>();
397    do {
398      kvs.clear();
399      hasMore = scanner.next(kvs);
400      for (Cell kv : kvs) {
401        if (!MobUtils.isMobReferenceCell(kv)) {
402          continue;
403        }
404        if (!MobUtils.hasValidMobRefCellValue(kv)) {
405          continue;
406        }
407        int size = MobUtils.getMobValueLength(kv);
408        if (size <= mobCellThreshold) {
409          continue;
410        }
411        fileName = MobUtils.getMobFileName(kv);
412        if (fileName.isEmpty()) {
413          continue;
414        }
415        files.add(fileName);
416        Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
417            hcd.getNameAsString());
418        assertTrue(fs.exists(new Path(familyPath, fileName)));
419      }
420    } while (hasMore);
421
422    scanner.close();
423
424    return files.size();
425  }
426
427  private int countMobCellsInMobFiles(int expectedNumDelfiles) throws IOException {
428    Configuration copyOfConf = new Configuration(conf);
429    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
430    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
431    Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString());
432    List<HStoreFile> sfs = new ArrayList<>();
433    int numDelfiles = 0;
434    int size = 0;
435    if (fs.exists(mobDirPath)) {
436      for (FileStatus f : fs.listStatus(mobDirPath)) {
437        HStoreFile sf = new HStoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE, true);
438        sfs.add(sf);
439        if (StoreFileInfo.isDelFile(sf.getPath())) {
440          numDelfiles++;
441        }
442      }
443
444      List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
445        false, false, HConstants.LATEST_TIMESTAMP);
446      long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
447      long ttl = HStore.determineTTLFromFamily(hcd);
448      ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, timeToPurgeDeletes,
449        CellComparatorImpl.COMPARATOR);
450      StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_DROP_DELETES, scanners);
451      try {
452        size += UTIL.countRows(scanner);
453      } finally {
454        scanner.close();
455      }
456    }
457    // assert the number of the existing del files
458    assertEquals(expectedNumDelfiles, numDelfiles);
459    return size;
460  }
461}