001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Optional;
033import java.util.Set;
034import java.util.UUID;
035import java.util.stream.Stream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.ExtendedCell;
042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Durability;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionInfoBuilder;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.io.hfile.CacheConfig;
058import org.apache.hadoop.hbase.io.hfile.HFile;
059import org.apache.hadoop.hbase.io.hfile.HFileContext;
060import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
061import org.apache.hadoop.hbase.regionserver.BloomType;
062import org.apache.hadoop.hbase.regionserver.HRegion;
063import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
064import org.apache.hadoop.hbase.regionserver.HStore;
065import org.apache.hadoop.hbase.regionserver.HStoreFile;
066import org.apache.hadoop.hbase.regionserver.InternalScanner;
067import org.apache.hadoop.hbase.regionserver.RegionAsTable;
068import org.apache.hadoop.hbase.regionserver.StoreContext;
069import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
071import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
072import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
073import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
074import org.apache.hadoop.hbase.security.User;
075import org.apache.hadoop.hbase.testclassification.MediumTests;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.CommonFSUtils;
078import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
079import org.apache.hadoop.hbase.util.Pair;
080import org.junit.jupiter.api.AfterEach;
081import org.junit.jupiter.api.BeforeEach;
082import org.junit.jupiter.api.Tag;
083import org.junit.jupiter.api.TestInfo;
084import org.junit.jupiter.api.TestTemplate;
085import org.junit.jupiter.params.provider.Arguments;
086
087/**
088 * Test mob store compaction
089 */
090@Tag(MediumTests.TAG)
091@HBaseParameterizedTestTemplate(name = "{index}: useFileBasedSFT={0}")
092public class TestMobStoreCompaction {
093
094  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
095  private Configuration conf = null;
096  private String testMethodName;
097
098  private HRegion region = null;
099  private TableDescriptor tableDescriptor = null;
100  private ColumnFamilyDescriptor familyDescriptor = null;
101  private long mobCellThreshold = 1000;
102
103  private FileSystem fs;
104
105  private static final byte[] COLUMN_FAMILY = fam1;
106  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
107  private int compactionThreshold;
108
109  private Boolean useFileBasedSFT;
110
111  public TestMobStoreCompaction(Boolean useFileBasedSFT) {
112    this.useFileBasedSFT = useFileBasedSFT;
113  }
114
115  public static Stream<Arguments> parameters() {
116    return Stream.of(false, true).map(Arguments::of);
117  }
118
119  @BeforeEach
120  public void setUp(TestInfo testInfo) {
121    testMethodName = testInfo.getTestMethod().get().getName()
122      + testInfo.getDisplayName().replaceAll("[:= ]", "_").replaceAll("_+", "_").trim();
123  }
124
125  private void init(Configuration conf, long mobThreshold) throws Exception {
126    if (useFileBasedSFT) {
127      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
128        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
129    }
130
131    this.conf = conf;
132    this.mobCellThreshold = mobThreshold;
133
134    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
135    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
136      .setMobThreshold(mobThreshold).setMaxVersions(1).build();
137    tableDescriptor =
138      UTIL.createModifyableTableDescriptor(TestMobUtils.getTableName(testMethodName))
139        .modifyColumnFamily(familyDescriptor).build();
140
141    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
142    region = HBaseTestingUtil.createRegionAndWAL(regionInfo, UTIL.getDataTestDir(), conf,
143      tableDescriptor, new MobFileCache(conf));
144    fs = FileSystem.get(conf);
145  }
146
147  @AfterEach
148  public void tearDown() throws Exception {
149    region.close();
150    fs.delete(UTIL.getDataTestDir(), true);
151  }
152
153  /**
154   * During compaction, cells smaller than the threshold won't be affected.
155   */
156  @TestTemplate
157  public void testSmallerValue() throws Exception {
158    init(UTIL.getConfiguration(), 500);
159    byte[] dummyData = makeDummyData(300); // smaller than mob threshold
160    Table loader = new RegionAsTable(region);
161    // one hfile per row
162    for (int i = 0; i < compactionThreshold; i++) {
163      Put p = createPut(i, dummyData);
164      loader.put(p);
165      region.flush(true);
166    }
167    assertEquals(compactionThreshold, countStoreFiles(), "Before compaction: store files");
168    assertEquals(0, countMobFiles(), "Before compaction: mob file count");
169    assertEquals(compactionThreshold, UTIL.countRows(region), "Before compaction: rows");
170    assertEquals(0, countMobRows(), "Before compaction: mob rows");
171
172    region.compactStores();
173
174    assertEquals(1, countStoreFiles(), "After compaction: store files");
175    assertEquals(0, countMobFiles(), "After compaction: mob file count");
176    assertEquals(0, countReferencedMobFiles(), "After compaction: referenced mob file count");
177    assertEquals(compactionThreshold, UTIL.countRows(region), "After compaction: rows");
178    assertEquals(0, countMobRows(), "After compaction: mob rows");
179  }
180
181  /**
182   * During compaction, the mob threshold size is changed.
183   */
184  @TestTemplate
185  public void testLargerValue() throws Exception {
186    init(UTIL.getConfiguration(), 200);
187    byte[] dummyData = makeDummyData(300); // larger than mob threshold
188    Table loader = new RegionAsTable(region);
189    for (int i = 0; i < compactionThreshold; i++) {
190      Put p = createPut(i, dummyData);
191      loader.put(p);
192      region.flush(true);
193    }
194    assertEquals(compactionThreshold, countStoreFiles(), "Before compaction: store files");
195    assertEquals(compactionThreshold, countMobFiles(), "Before compaction: mob file count");
196    assertEquals(compactionThreshold, UTIL.countRows(region), "Before compaction: rows");
197    assertEquals(compactionThreshold, countMobRows(), "Before compaction: mob rows");
198    assertEquals(compactionThreshold, countMobCellsInMetadata(),
199      "Before compaction: number of mob cells");
200    // Change the threshold larger than the data size
201    setMobThreshold(region, COLUMN_FAMILY, 500);
202    region.initialize();
203
204    List<HStore> stores = region.getStores();
205    for (HStore store : stores) {
206      // Force major compaction
207      store.triggerMajorCompaction();
208      Optional<CompactionContext> context = store.requestCompaction(HStore.PRIORITY_USER,
209        CompactionLifeCycleTracker.DUMMY, User.getCurrent());
210      if (!context.isPresent()) {
211        continue;
212      }
213      region.compact(context.get(), store, NoLimitThroughputController.INSTANCE, User.getCurrent());
214    }
215
216    assertEquals(1, countStoreFiles(), "After compaction: store files");
217    assertEquals(compactionThreshold, countMobFiles(), "After compaction: mob file count");
218    assertEquals(0, countReferencedMobFiles(), "After compaction: referenced mob file count");
219    assertEquals(compactionThreshold, UTIL.countRows(region), "After compaction: rows");
220    assertEquals(0, countMobRows(), "After compaction: mob rows");
221  }
222
223  private static HRegion setMobThreshold(HRegion region, byte[] cfName, long modThreshold) {
224    ColumnFamilyDescriptor cfd =
225      ColumnFamilyDescriptorBuilder.newBuilder(region.getTableDescriptor().getColumnFamily(cfName))
226        .setMobThreshold(modThreshold).build();
227    TableDescriptor td = TableDescriptorBuilder.newBuilder(region.getTableDescriptor())
228      .removeColumnFamily(cfName).setColumnFamily(cfd).build();
229    region.setTableDescriptor(td);
230    return region;
231  }
232
233  /**
234   * This test will first generate store files, then bulk load them and trigger the compaction. When
235   * compaction, the cell value will be larger than the threshold.
236   */
237  @TestTemplate
238  public void testMobCompactionWithBulkload() throws Exception {
239    // The following will produce store files of 600.
240    init(UTIL.getConfiguration(), 300);
241    byte[] dummyData = makeDummyData(600);
242
243    Path hbaseRootDir = CommonFSUtils.getRootDir(conf);
244    Path basedir = new Path(hbaseRootDir, tableDescriptor.getTableName().getNameAsString());
245    List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
246    for (int i = 0; i < compactionThreshold; i++) {
247      Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", ""));
248      hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
249      createHFile(hpath, i, dummyData);
250    }
251
252    // The following will bulk load the above generated store files and compact, with 600(fileSize)
253    // > 300(threshold)
254    Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null);
255    assertTrue(!map.isEmpty(), "Bulkload result:");
256    assertEquals(compactionThreshold, countStoreFiles(), "Before compaction: store files");
257    assertEquals(0, countMobFiles(), "Before compaction: mob file count");
258    assertEquals(compactionThreshold, UTIL.countRows(region), "Before compaction: rows");
259    assertEquals(0, countMobRows(), "Before compaction: mob rows");
260    assertEquals(0, countReferencedMobFiles(), "Before compaction: referenced mob file count");
261
262    region.compactStores();
263
264    assertEquals(1, countStoreFiles(), "After compaction: store files");
265    assertEquals(1, countMobFiles(), "After compaction: mob file count:");
266    assertEquals(compactionThreshold, UTIL.countRows(region), "After compaction: rows");
267    assertEquals(compactionThreshold, countMobRows(), "After compaction: mob rows");
268    assertEquals(1, countReferencedMobFiles(), "After compaction: referenced mob file count");
269    assertEquals(compactionThreshold, countMobCellsInMetadata(),
270      "After compaction: number of mob cells");
271  }
272
273  @TestTemplate
274  public void testMajorCompactionAfterDelete() throws Exception {
275    init(UTIL.getConfiguration(), 100);
276    byte[] dummyData = makeDummyData(200); // larger than mob threshold
277    Table loader = new RegionAsTable(region);
278    // create hfiles and mob hfiles but don't trigger compaction
279    int numHfiles = compactionThreshold - 1;
280    byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
281    for (int i = 0; i < numHfiles; i++) {
282      Put p = createPut(i, dummyData);
283      loader.put(p);
284      region.flush(true);
285    }
286    assertEquals(numHfiles, countStoreFiles(), "Before compaction: store files");
287    assertEquals(numHfiles, countMobFiles(), "Before compaction: mob file count");
288    assertEquals(numHfiles, UTIL.countRows(region), "Before compaction: rows");
289    assertEquals(numHfiles, countMobRows(), "Before compaction: mob rows");
290    assertEquals(numHfiles, countMobCellsInMetadata(), "Before compaction: number of mob cells");
291    // now let's delete some cells that contain mobs
292    Delete delete = new Delete(deleteRow);
293    delete.addFamily(COLUMN_FAMILY);
294    region.delete(delete);
295    region.flush(true);
296
297    assertEquals(numHfiles + 1, countStoreFiles(), "Before compaction: store files");
298    assertEquals(numHfiles, countMobFiles(), "Before compaction: mob files");
299    // region.compactStores();
300    region.compact(true);
301    assertEquals(1, countStoreFiles(), "After compaction: store files");
302  }
303
304  private int countStoreFiles() throws IOException {
305    HStore store = region.getStore(COLUMN_FAMILY);
306    return store.getStorefilesCount();
307  }
308
309  private int countMobFiles() throws IOException {
310    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
311      familyDescriptor.getNameAsString());
312    if (fs.exists(mobDirPath)) {
313      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
314      return files.length;
315    }
316    return 0;
317  }
318
319  private long countMobCellsInMetadata() throws IOException {
320    long mobCellsCount = 0;
321    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
322      familyDescriptor.getNameAsString());
323    Configuration copyOfConf = new Configuration(conf);
324    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
325    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
326    if (fs.exists(mobDirPath)) {
327      // TODO: use sft.load() api here
328      HRegionFileSystem regionFs = HRegionFileSystem.create(copyOfConf, fs,
329        MobUtils.getMobTableDir(copyOfConf, tableDescriptor.getTableName()),
330        region.getRegionInfo());
331      StoreFileTracker sft = StoreFileTrackerFactory.create(copyOfConf, false,
332        StoreContext.getBuilder().withColumnFamilyDescriptor(familyDescriptor)
333          .withFamilyStoreDirectoryPath(mobDirPath).withCacheConfig(cacheConfig)
334          .withRegionFileSystem(regionFs).build());
335      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
336      for (FileStatus file : files) {
337        HStoreFile sf =
338          new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true, sft);
339        sf.initReader();
340        Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
341        byte[] count = fileInfo.get(MOB_CELLS_COUNT);
342        assertTrue(count != null);
343        mobCellsCount += Bytes.toLong(count);
344      }
345    }
346    return mobCellsCount;
347  }
348
349  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
350    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
351    p.setDurability(Durability.SKIP_WAL);
352    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
353    return p;
354  }
355
356  /**
357   * Create an HFile with the given number of bytes
358   */
359  private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
360    HFileContext meta = new HFileContextBuilder().build();
361    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
362      .withFileContext(meta).create();
363    long now = EnvironmentEdgeManager.currentTime();
364    try {
365      KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
366        Bytes.toBytes("colX"), now, dummyData);
367      writer.append(kv);
368    } finally {
369      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
370      writer.close();
371    }
372  }
373
374  private int countMobRows() throws IOException {
375    Scan scan = new Scan();
376    // Do not retrieve the mob data when scanning
377    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
378    InternalScanner scanner = region.getScanner(scan);
379
380    int scannedCount = 0;
381    List<ExtendedCell> results = new ArrayList<>();
382    boolean hasMore = true;
383    while (hasMore) {
384      hasMore = scanner.next(results);
385      for (ExtendedCell c : results) {
386        if (MobUtils.isMobReferenceCell(c)) {
387          scannedCount++;
388        }
389      }
390      results.clear();
391    }
392    scanner.close();
393
394    return scannedCount;
395  }
396
397  private byte[] makeDummyData(int size) {
398    byte[] dummyData = new byte[size];
399    Bytes.random(dummyData);
400    return dummyData;
401  }
402
403  private int countReferencedMobFiles() throws IOException {
404    Scan scan = new Scan();
405    // Do not retrieve the mob data when scanning
406    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
407    InternalScanner scanner = region.getScanner(scan);
408
409    List<ExtendedCell> kvs = new ArrayList<>();
410    boolean hasMore = true;
411    String fileName;
412    Set<String> files = new HashSet<>();
413    do {
414      kvs.clear();
415      hasMore = scanner.next(kvs);
416      for (Cell kv : kvs) {
417        if (!MobUtils.isMobReferenceCell((ExtendedCell) kv)) {
418          continue;
419        }
420        if (!MobUtils.hasValidMobRefCellValue(kv)) {
421          continue;
422        }
423        int size = MobUtils.getMobValueLength(kv);
424        if (size <= mobCellThreshold) {
425          continue;
426        }
427        fileName = MobUtils.getMobFileName(kv);
428        if (fileName.isEmpty()) {
429          continue;
430        }
431        files.add(fileName);
432        Path familyPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
433          familyDescriptor.getNameAsString());
434        assertTrue(fs.exists(new Path(familyPath, fileName)));
435      }
436    } while (hasMore);
437
438    scanner.close();
439
440    return files.size();
441  }
442
443}