001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; 023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Optional; 033import java.util.Set; 034import java.util.UUID; 035import java.util.stream.Stream; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.ExtendedCell; 042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Delete; 049import org.apache.hadoop.hbase.client.Durability; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.RegionInfoBuilder; 053import org.apache.hadoop.hbase.client.Scan; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 057import org.apache.hadoop.hbase.io.hfile.CacheConfig; 058import org.apache.hadoop.hbase.io.hfile.HFile; 059import org.apache.hadoop.hbase.io.hfile.HFileContext; 060import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 061import org.apache.hadoop.hbase.regionserver.BloomType; 062import org.apache.hadoop.hbase.regionserver.HRegion; 063import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 064import org.apache.hadoop.hbase.regionserver.HStore; 065import org.apache.hadoop.hbase.regionserver.HStoreFile; 066import org.apache.hadoop.hbase.regionserver.InternalScanner; 067import org.apache.hadoop.hbase.regionserver.RegionAsTable; 068import org.apache.hadoop.hbase.regionserver.StoreContext; 069import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 070import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 071import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 072import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 073import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 074import org.apache.hadoop.hbase.security.User; 075import org.apache.hadoop.hbase.testclassification.MediumTests; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.CommonFSUtils; 078import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 079import org.apache.hadoop.hbase.util.Pair; 080import org.junit.jupiter.api.AfterEach; 081import org.junit.jupiter.api.BeforeEach; 082import org.junit.jupiter.api.Tag; 083import org.junit.jupiter.api.TestInfo; 084import org.junit.jupiter.api.TestTemplate; 085import org.junit.jupiter.params.provider.Arguments; 086 087/** 088 * Test mob store compaction 089 */ 090@Tag(MediumTests.TAG) 091@HBaseParameterizedTestTemplate(name = "{index}: useFileBasedSFT={0}") 092public class TestMobStoreCompaction { 093 094 private final static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 095 private Configuration conf = null; 096 private String testMethodName; 097 098 private HRegion region = null; 099 private TableDescriptor tableDescriptor = null; 100 private ColumnFamilyDescriptor familyDescriptor = null; 101 private long mobCellThreshold = 1000; 102 103 private FileSystem fs; 104 105 private static final byte[] COLUMN_FAMILY = fam1; 106 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 107 private int compactionThreshold; 108 109 private Boolean useFileBasedSFT; 110 111 public TestMobStoreCompaction(Boolean useFileBasedSFT) { 112 this.useFileBasedSFT = useFileBasedSFT; 113 } 114 115 public static Stream<Arguments> parameters() { 116 return Stream.of(false, true).map(Arguments::of); 117 } 118 119 @BeforeEach 120 public void setUp(TestInfo testInfo) { 121 testMethodName = testInfo.getTestMethod().get().getName() 122 + testInfo.getDisplayName().replaceAll("[:= ]", "_").replaceAll("_+", "_").trim(); 123 } 124 125 private void init(Configuration conf, long mobThreshold) throws Exception { 126 if (useFileBasedSFT) { 127 conf.set(StoreFileTrackerFactory.TRACKER_IMPL, 128 "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); 129 } 130 131 this.conf = conf; 132 this.mobCellThreshold = mobThreshold; 133 134 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 135 familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true) 136 .setMobThreshold(mobThreshold).setMaxVersions(1).build(); 137 tableDescriptor = 138 UTIL.createModifyableTableDescriptor(TestMobUtils.getTableName(testMethodName)) 139 .modifyColumnFamily(familyDescriptor).build(); 140 141 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); 142 region = HBaseTestingUtil.createRegionAndWAL(regionInfo, UTIL.getDataTestDir(), conf, 143 tableDescriptor, new MobFileCache(conf)); 144 fs = FileSystem.get(conf); 145 } 146 147 @AfterEach 148 public void tearDown() throws Exception { 149 region.close(); 150 fs.delete(UTIL.getDataTestDir(), true); 151 } 152 153 /** 154 * During compaction, cells smaller than the threshold won't be affected. 155 */ 156 @TestTemplate 157 public void testSmallerValue() throws Exception { 158 init(UTIL.getConfiguration(), 500); 159 byte[] dummyData = makeDummyData(300); // smaller than mob threshold 160 Table loader = new RegionAsTable(region); 161 // one hfile per row 162 for (int i = 0; i < compactionThreshold; i++) { 163 Put p = createPut(i, dummyData); 164 loader.put(p); 165 region.flush(true); 166 } 167 assertEquals(compactionThreshold, countStoreFiles(), "Before compaction: store files"); 168 assertEquals(0, countMobFiles(), "Before compaction: mob file count"); 169 assertEquals(compactionThreshold, UTIL.countRows(region), "Before compaction: rows"); 170 assertEquals(0, countMobRows(), "Before compaction: mob rows"); 171 172 region.compactStores(); 173 174 assertEquals(1, countStoreFiles(), "After compaction: store files"); 175 assertEquals(0, countMobFiles(), "After compaction: mob file count"); 176 assertEquals(0, countReferencedMobFiles(), "After compaction: referenced mob file count"); 177 assertEquals(compactionThreshold, UTIL.countRows(region), "After compaction: rows"); 178 assertEquals(0, countMobRows(), "After compaction: mob rows"); 179 } 180 181 /** 182 * During compaction, the mob threshold size is changed. 183 */ 184 @TestTemplate 185 public void testLargerValue() throws Exception { 186 init(UTIL.getConfiguration(), 200); 187 byte[] dummyData = makeDummyData(300); // larger than mob threshold 188 Table loader = new RegionAsTable(region); 189 for (int i = 0; i < compactionThreshold; i++) { 190 Put p = createPut(i, dummyData); 191 loader.put(p); 192 region.flush(true); 193 } 194 assertEquals(compactionThreshold, countStoreFiles(), "Before compaction: store files"); 195 assertEquals(compactionThreshold, countMobFiles(), "Before compaction: mob file count"); 196 assertEquals(compactionThreshold, UTIL.countRows(region), "Before compaction: rows"); 197 assertEquals(compactionThreshold, countMobRows(), "Before compaction: mob rows"); 198 assertEquals(compactionThreshold, countMobCellsInMetadata(), 199 "Before compaction: number of mob cells"); 200 // Change the threshold larger than the data size 201 setMobThreshold(region, COLUMN_FAMILY, 500); 202 region.initialize(); 203 204 List<HStore> stores = region.getStores(); 205 for (HStore store : stores) { 206 // Force major compaction 207 store.triggerMajorCompaction(); 208 Optional<CompactionContext> context = store.requestCompaction(HStore.PRIORITY_USER, 209 CompactionLifeCycleTracker.DUMMY, User.getCurrent()); 210 if (!context.isPresent()) { 211 continue; 212 } 213 region.compact(context.get(), store, NoLimitThroughputController.INSTANCE, User.getCurrent()); 214 } 215 216 assertEquals(1, countStoreFiles(), "After compaction: store files"); 217 assertEquals(compactionThreshold, countMobFiles(), "After compaction: mob file count"); 218 assertEquals(0, countReferencedMobFiles(), "After compaction: referenced mob file count"); 219 assertEquals(compactionThreshold, UTIL.countRows(region), "After compaction: rows"); 220 assertEquals(0, countMobRows(), "After compaction: mob rows"); 221 } 222 223 private static HRegion setMobThreshold(HRegion region, byte[] cfName, long modThreshold) { 224 ColumnFamilyDescriptor cfd = 225 ColumnFamilyDescriptorBuilder.newBuilder(region.getTableDescriptor().getColumnFamily(cfName)) 226 .setMobThreshold(modThreshold).build(); 227 TableDescriptor td = TableDescriptorBuilder.newBuilder(region.getTableDescriptor()) 228 .removeColumnFamily(cfName).setColumnFamily(cfd).build(); 229 region.setTableDescriptor(td); 230 return region; 231 } 232 233 /** 234 * This test will first generate store files, then bulk load them and trigger the compaction. When 235 * compaction, the cell value will be larger than the threshold. 236 */ 237 @TestTemplate 238 public void testMobCompactionWithBulkload() throws Exception { 239 // The following will produce store files of 600. 240 init(UTIL.getConfiguration(), 300); 241 byte[] dummyData = makeDummyData(600); 242 243 Path hbaseRootDir = CommonFSUtils.getRootDir(conf); 244 Path basedir = new Path(hbaseRootDir, tableDescriptor.getTableName().getNameAsString()); 245 List<Pair<byte[], String>> hfiles = new ArrayList<>(1); 246 for (int i = 0; i < compactionThreshold; i++) { 247 Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", "")); 248 hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString())); 249 createHFile(hpath, i, dummyData); 250 } 251 252 // The following will bulk load the above generated store files and compact, with 600(fileSize) 253 // > 300(threshold) 254 Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null); 255 assertTrue(!map.isEmpty(), "Bulkload result:"); 256 assertEquals(compactionThreshold, countStoreFiles(), "Before compaction: store files"); 257 assertEquals(0, countMobFiles(), "Before compaction: mob file count"); 258 assertEquals(compactionThreshold, UTIL.countRows(region), "Before compaction: rows"); 259 assertEquals(0, countMobRows(), "Before compaction: mob rows"); 260 assertEquals(0, countReferencedMobFiles(), "Before compaction: referenced mob file count"); 261 262 region.compactStores(); 263 264 assertEquals(1, countStoreFiles(), "After compaction: store files"); 265 assertEquals(1, countMobFiles(), "After compaction: mob file count:"); 266 assertEquals(compactionThreshold, UTIL.countRows(region), "After compaction: rows"); 267 assertEquals(compactionThreshold, countMobRows(), "After compaction: mob rows"); 268 assertEquals(1, countReferencedMobFiles(), "After compaction: referenced mob file count"); 269 assertEquals(compactionThreshold, countMobCellsInMetadata(), 270 "After compaction: number of mob cells"); 271 } 272 273 @TestTemplate 274 public void testMajorCompactionAfterDelete() throws Exception { 275 init(UTIL.getConfiguration(), 100); 276 byte[] dummyData = makeDummyData(200); // larger than mob threshold 277 Table loader = new RegionAsTable(region); 278 // create hfiles and mob hfiles but don't trigger compaction 279 int numHfiles = compactionThreshold - 1; 280 byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0)); 281 for (int i = 0; i < numHfiles; i++) { 282 Put p = createPut(i, dummyData); 283 loader.put(p); 284 region.flush(true); 285 } 286 assertEquals(numHfiles, countStoreFiles(), "Before compaction: store files"); 287 assertEquals(numHfiles, countMobFiles(), "Before compaction: mob file count"); 288 assertEquals(numHfiles, UTIL.countRows(region), "Before compaction: rows"); 289 assertEquals(numHfiles, countMobRows(), "Before compaction: mob rows"); 290 assertEquals(numHfiles, countMobCellsInMetadata(), "Before compaction: number of mob cells"); 291 // now let's delete some cells that contain mobs 292 Delete delete = new Delete(deleteRow); 293 delete.addFamily(COLUMN_FAMILY); 294 region.delete(delete); 295 region.flush(true); 296 297 assertEquals(numHfiles + 1, countStoreFiles(), "Before compaction: store files"); 298 assertEquals(numHfiles, countMobFiles(), "Before compaction: mob files"); 299 // region.compactStores(); 300 region.compact(true); 301 assertEquals(1, countStoreFiles(), "After compaction: store files"); 302 } 303 304 private int countStoreFiles() throws IOException { 305 HStore store = region.getStore(COLUMN_FAMILY); 306 return store.getStorefilesCount(); 307 } 308 309 private int countMobFiles() throws IOException { 310 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(), 311 familyDescriptor.getNameAsString()); 312 if (fs.exists(mobDirPath)) { 313 FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath); 314 return files.length; 315 } 316 return 0; 317 } 318 319 private long countMobCellsInMetadata() throws IOException { 320 long mobCellsCount = 0; 321 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(), 322 familyDescriptor.getNameAsString()); 323 Configuration copyOfConf = new Configuration(conf); 324 copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); 325 CacheConfig cacheConfig = new CacheConfig(copyOfConf); 326 if (fs.exists(mobDirPath)) { 327 // TODO: use sft.load() api here 328 HRegionFileSystem regionFs = HRegionFileSystem.create(copyOfConf, fs, 329 MobUtils.getMobTableDir(copyOfConf, tableDescriptor.getTableName()), 330 region.getRegionInfo()); 331 StoreFileTracker sft = StoreFileTrackerFactory.create(copyOfConf, false, 332 StoreContext.getBuilder().withColumnFamilyDescriptor(familyDescriptor) 333 .withFamilyStoreDirectoryPath(mobDirPath).withCacheConfig(cacheConfig) 334 .withRegionFileSystem(regionFs).build()); 335 FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath); 336 for (FileStatus file : files) { 337 HStoreFile sf = 338 new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true, sft); 339 sf.initReader(); 340 Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo(); 341 byte[] count = fileInfo.get(MOB_CELLS_COUNT); 342 assertTrue(count != null); 343 mobCellsCount += Bytes.toLong(count); 344 } 345 } 346 return mobCellsCount; 347 } 348 349 private Put createPut(int rowIdx, byte[] dummyData) throws IOException { 350 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx))); 351 p.setDurability(Durability.SKIP_WAL); 352 p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData); 353 return p; 354 } 355 356 /** 357 * Create an HFile with the given number of bytes 358 */ 359 private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException { 360 HFileContext meta = new HFileContextBuilder().build(); 361 HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) 362 .withFileContext(meta).create(); 363 long now = EnvironmentEdgeManager.currentTime(); 364 try { 365 KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY, 366 Bytes.toBytes("colX"), now, dummyData); 367 writer.append(kv); 368 } finally { 369 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime())); 370 writer.close(); 371 } 372 } 373 374 private int countMobRows() throws IOException { 375 Scan scan = new Scan(); 376 // Do not retrieve the mob data when scanning 377 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); 378 InternalScanner scanner = region.getScanner(scan); 379 380 int scannedCount = 0; 381 List<ExtendedCell> results = new ArrayList<>(); 382 boolean hasMore = true; 383 while (hasMore) { 384 hasMore = scanner.next(results); 385 for (ExtendedCell c : results) { 386 if (MobUtils.isMobReferenceCell(c)) { 387 scannedCount++; 388 } 389 } 390 results.clear(); 391 } 392 scanner.close(); 393 394 return scannedCount; 395 } 396 397 private byte[] makeDummyData(int size) { 398 byte[] dummyData = new byte[size]; 399 Bytes.random(dummyData); 400 return dummyData; 401 } 402 403 private int countReferencedMobFiles() throws IOException { 404 Scan scan = new Scan(); 405 // Do not retrieve the mob data when scanning 406 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); 407 InternalScanner scanner = region.getScanner(scan); 408 409 List<ExtendedCell> kvs = new ArrayList<>(); 410 boolean hasMore = true; 411 String fileName; 412 Set<String> files = new HashSet<>(); 413 do { 414 kvs.clear(); 415 hasMore = scanner.next(kvs); 416 for (Cell kv : kvs) { 417 if (!MobUtils.isMobReferenceCell((ExtendedCell) kv)) { 418 continue; 419 } 420 if (!MobUtils.hasValidMobRefCellValue(kv)) { 421 continue; 422 } 423 int size = MobUtils.getMobValueLength(kv); 424 if (size <= mobCellThreshold) { 425 continue; 426 } 427 fileName = MobUtils.getMobFileName(kv); 428 if (fileName.isEmpty()) { 429 continue; 430 } 431 files.add(fileName); 432 Path familyPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(), 433 familyDescriptor.getNameAsString()); 434 assertTrue(fs.exists(new Path(familyPath, fileName))); 435 } 436 } while (hasMore); 437 438 scanner.close(); 439 440 return files.size(); 441 } 442 443}