001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob.compactions; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Calendar; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Date; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.UUID; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.RejectedExecutionException; 037import java.util.concurrent.RejectedExecutionHandler; 038import java.util.concurrent.SynchronousQueue; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellComparatorImpl; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseTestingUtility; 050import org.apache.hadoop.hbase.HColumnDescriptor; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.KeyValue.Type; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 056import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 057import org.apache.hadoop.hbase.io.hfile.CacheConfig; 058import org.apache.hadoop.hbase.io.hfile.HFileContext; 059import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 060import org.apache.hadoop.hbase.mob.MobConstants; 061import org.apache.hadoop.hbase.mob.MobFileName; 062import org.apache.hadoop.hbase.mob.MobUtils; 063import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; 064import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition; 065import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; 066import org.apache.hadoop.hbase.regionserver.BloomType; 067import org.apache.hadoop.hbase.regionserver.HStore; 068import org.apache.hadoop.hbase.regionserver.HStoreFile; 069import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 070import org.apache.hadoop.hbase.regionserver.ScanInfo; 071import org.apache.hadoop.hbase.regionserver.ScanType; 072import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 073import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 074import org.apache.hadoop.hbase.regionserver.StoreScanner; 075import org.apache.hadoop.hbase.testclassification.LargeTests; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.FSUtils; 078import org.apache.hadoop.hbase.util.Threads; 079import org.apache.hadoop.hdfs.DistributedFileSystem; 080import org.junit.AfterClass; 081import org.junit.BeforeClass; 082import org.junit.ClassRule; 083import org.junit.Rule; 084import org.junit.Test; 085import org.junit.experimental.categories.Category; 086import org.junit.rules.TestName; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090@Category(LargeTests.class) 091public class TestPartitionedMobCompactor { 092 093 @ClassRule 094 public static final HBaseClassTestRule CLASS_RULE = 095 HBaseClassTestRule.forClass(TestPartitionedMobCompactor.class); 096 097 private static final Logger LOG = LoggerFactory.getLogger(TestPartitionedMobCompactor.class); 098 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 099 private final static String family = "family"; 100 private final static String qf = "qf"; 101 private final long DAY_IN_MS = 1000 * 60 * 60 * 24; 102 private static byte[] KEYS = Bytes.toBytes("012"); 103 private HColumnDescriptor hcd = new HColumnDescriptor(family); 104 private Configuration conf = TEST_UTIL.getConfiguration(); 105 private CacheConfig cacheConf = new CacheConfig(conf); 106 private FileSystem fs; 107 private List<FileStatus> mobFiles = new ArrayList<>(); 108 private List<Path> delFiles = new ArrayList<>(); 109 private List<FileStatus> allFiles = new ArrayList<>(); 110 private Path basePath; 111 private String mobSuffix; 112 private String delSuffix; 113 private static ExecutorService pool; 114 115 @Rule 116 public TestName name = new TestName(); 117 118 @BeforeClass 119 public static void setUpBeforeClass() throws Exception { 120 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 121 // Inject our customized DistributedFileSystem 122 TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class, 123 DistributedFileSystem.class); 124 TEST_UTIL.startMiniCluster(1); 125 pool = createThreadPool(); 126 } 127 128 @AfterClass 129 public static void tearDownAfterClass() throws Exception { 130 pool.shutdown(); 131 TEST_UTIL.shutdownMiniCluster(); 132 } 133 134 private void init(String tableName) throws Exception { 135 fs = FileSystem.get(conf); 136 Path testDir = FSUtils.getRootDir(conf); 137 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); 138 basePath = new Path(new Path(mobTestDir, tableName), family); 139 mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); 140 delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; 141 allFiles.clear(); 142 mobFiles.clear(); 143 delFiles.clear(); 144 } 145 146 @Test 147 public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception { 148 String tableName = "testCompactionSelectAllFilesWeeklyPolicy"; 149 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 150 CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1); 151 } 152 153 @Test 154 public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception { 155 String tableName = "testCompactionSelectPartFilesWeeklyPolicy"; 156 testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, 157 new Date(), MobCompactPartitionPolicy.WEEKLY, 1); 158 } 159 160 @Test 161 public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception { 162 String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek"; 163 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 164 testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek, 165 MobCompactPartitionPolicy.WEEKLY, 7); 166 } 167 168 @Test 169 public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception { 170 String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek"; 171 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 172 testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, 173 false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7); 174 } 175 176 @Test 177 public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception { 178 String tableName = "testCompactionSelectAllFilesMonthlyPolicy"; 179 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 180 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 181 CompactionType.ALL_FILES, false, false, dateLastWeek, 182 MobCompactPartitionPolicy.MONTHLY, 7); 183 } 184 185 @Test 186 public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception { 187 String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy"; 188 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 189 CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1); 190 } 191 192 @Test 193 public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception { 194 String tableName = "testCompactionSelectPartFilesMonthlyPolicy"; 195 testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, 196 new Date(), MobCompactPartitionPolicy.MONTHLY, 1); 197 } 198 199 @Test 200 public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception { 201 String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek"; 202 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 203 Calendar calendar = Calendar.getInstance(); 204 Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date()); 205 CompactionType type = CompactionType.PART_FILES; 206 long mergeSizeMultiFactor = 7; 207 208 209 // The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going 210 // to be last month and the monthly policy is going to be applied here. 211 if (dateLastWeek.before(firstDayOfCurrentMonth)) { 212 type = CompactionType.ALL_FILES; 213 mergeSizeMultiFactor *= 4; 214 } 215 216 testCompactionAtMergeSize(tableName, 700, type, false, false, dateLastWeek, 217 MobCompactPartitionPolicy.MONTHLY, mergeSizeMultiFactor); 218 } 219 220 @Test 221 public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception { 222 String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek"; 223 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 224 225 testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, 226 false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7); 227 } 228 229 @Test 230 public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exception { 231 String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth"; 232 233 // back 5 weeks, it is going to be a past month 234 Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); 235 testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth, 236 MobCompactPartitionPolicy.MONTHLY, 28); 237 } 238 239 @Test 240 public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exception { 241 String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth"; 242 243 // back 5 weeks, it is going to be a past month 244 Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); 245 testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES, 246 false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28); 247 } 248 249 @Test 250 public void testCompactionSelectWithAllFiles() throws Exception { 251 String tableName = "testCompactionSelectWithAllFiles"; 252 // If there is only 1 file, it will not be compacted with _del files, so 253 // It wont be CompactionType.ALL_FILES in this case, do not create with _del files. 254 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 255 CompactionType.ALL_FILES, false, false); 256 } 257 258 @Test 259 public void testCompactionSelectWithPartFiles() throws Exception { 260 String tableName = "testCompactionSelectWithPartFiles"; 261 testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false); 262 } 263 264 @Test 265 public void testCompactionSelectWithForceAllFiles() throws Exception { 266 String tableName = "testCompactionSelectWithForceAllFiles"; 267 testCompactionAtMergeSize(tableName, Long.MAX_VALUE, CompactionType.ALL_FILES, true); 268 } 269 270 private void testCompactionAtMergeSize(final String tableName, 271 final long mergeSize, final CompactionType type, final boolean isForceAllFiles) 272 throws Exception { 273 testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, true); 274 } 275 276 private void testCompactionAtMergeSize(final String tableName, 277 final long mergeSize, final CompactionType type, final boolean isForceAllFiles, 278 final boolean createDelFiles) 279 throws Exception { 280 Date date = new Date(); 281 testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date); 282 } 283 284 private void testCompactionAtMergeSize(final String tableName, 285 final long mergeSize, final CompactionType type, final boolean isForceAllFiles, 286 final boolean createDelFiles, final Date date) 287 throws Exception { 288 testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date, 289 MobCompactPartitionPolicy.DAILY, 1); 290 } 291 292 private void testCompactionAtMergeSize(final String tableName, 293 final long mergeSize, final CompactionType type, final boolean isForceAllFiles, 294 final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy, 295 final long mergeSizeMultiFactor) 296 throws Exception { 297 resetConf(); 298 init(tableName); 299 int count = 10; 300 // create 10 mob files. 301 createStoreFiles(basePath, family, qf, count, Type.Put, date); 302 303 if (createDelFiles) { 304 // create 10 del files 305 createStoreFiles(basePath, family, qf, count, Type.Delete, date); 306 } 307 308 Calendar calendar = Calendar.getInstance(); 309 Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date()); 310 311 listFiles(); 312 List<String> expectedStartKeys = new ArrayList<>(); 313 for(FileStatus file : mobFiles) { 314 if(file.getLen() < mergeSize * mergeSizeMultiFactor) { 315 String fileName = file.getPath().getName(); 316 String startKey = fileName.substring(0, 32); 317 318 // If the policy is monthly and files are in current week, they will be skipped 319 // in minor compcation. 320 boolean skipCompaction = false; 321 if (policy == MobCompactPartitionPolicy.MONTHLY) { 322 String fileDateStr = MobFileName.getDateFromName(fileName); 323 Date fileDate; 324 try { 325 fileDate = MobUtils.parseDate(fileDateStr); 326 } catch (ParseException e) { 327 LOG.warn("Failed to parse date " + fileDateStr, e); 328 fileDate = new Date(); 329 } 330 if (!fileDate.before(firstDayOfCurrentWeek)) { 331 skipCompaction = true; 332 } 333 } 334 335 // If it is not an major mob compaction and del files are there, 336 // these mob files wont be compacted. 337 if (isForceAllFiles || (!createDelFiles && !skipCompaction)) { 338 expectedStartKeys.add(startKey); 339 } 340 } 341 } 342 343 // Set the policy 344 this.hcd.setMobCompactPartitionPolicy(policy); 345 // set the mob compaction mergeable threshold 346 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); 347 testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys); 348 // go back to the default daily policy 349 this.hcd.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.DAILY); 350 } 351 352 @Test 353 public void testCompactDelFilesWithDefaultBatchSize() throws Exception { 354 testCompactDelFilesAtBatchSize(name.getMethodName(), MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE, 355 MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 356 } 357 358 @Test 359 public void testCompactDelFilesWithSmallBatchSize() throws Exception { 360 testCompactDelFilesAtBatchSize(name.getMethodName(), 4, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 361 } 362 363 @Test 364 public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { 365 testCompactDelFilesAtBatchSize(name.getMethodName(), 4, 2); 366 } 367 368 @Test 369 public void testCompactFilesWithDstDirFull() throws Exception { 370 String tableName = name.getMethodName(); 371 fs = FileSystem.get(conf); 372 FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs; 373 Path testDir = FSUtils.getRootDir(conf); 374 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); 375 basePath = new Path(new Path(mobTestDir, tableName), family); 376 377 try { 378 int count = 2; 379 // create 2 mob files. 380 createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date()); 381 listFiles(); 382 383 TableName tName = TableName.valueOf(tableName); 384 MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool); 385 faultyFs.setThrowException(true); 386 try { 387 compactor.compact(allFiles, true); 388 } catch (IOException e) { 389 System.out.println("Expected exception, ignore"); 390 } 391 392 // Verify that all the files in tmp directory are cleaned up 393 Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); 394 FileStatus[] ls = faultyFs.listStatus(tempPath); 395 396 // Only .bulkload under this directory 397 assertTrue(ls.length == 1); 398 assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName())); 399 400 Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( 401 tName.getNamespaceAsString(), tName.getQualifierAsString()))); 402 403 // Nothing in bulkLoad directory 404 FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath); 405 assertTrue(lsBulkload.length == 0); 406 407 } finally { 408 faultyFs.setThrowException(false); 409 } 410 } 411 412 /** 413 * Create mulitple partition files 414 */ 415 private void createMobFile(Path basePath) throws IOException { 416 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 417 MobFileName mobFileName = null; 418 int ii = 0; 419 Date today = new Date(); 420 for (byte k0 : KEYS) { 421 byte[] startRow = Bytes.toBytes(ii++); 422 423 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), mobSuffix); 424 425 StoreFileWriter mobFileWriter = 426 new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) 427 .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 428 429 long now = System.currentTimeMillis(); 430 try { 431 for (int i = 0; i < 10; i++) { 432 byte[] key = Bytes.add(Bytes.toBytes(k0), Bytes.toBytes(i)); 433 byte[] dummyData = new byte[5000]; 434 new Random().nextBytes(dummyData); 435 mobFileWriter.append( 436 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, dummyData)); 437 } 438 } finally { 439 mobFileWriter.close(); 440 } 441 } 442 } 443 444 /** 445 * Create mulitple partition delete files 446 */ 447 private void createMobDelFile(Path basePath, int startKey) throws IOException { 448 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 449 MobFileName mobFileName = null; 450 Date today = new Date(); 451 452 byte[] startRow = Bytes.toBytes(startKey); 453 454 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), delSuffix); 455 456 StoreFileWriter mobFileWriter = 457 new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) 458 .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 459 460 long now = System.currentTimeMillis(); 461 try { 462 byte[] key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(0)); 463 byte[] dummyData = new byte[5000]; 464 new Random().nextBytes(dummyData); 465 mobFileWriter.append( 466 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); 467 key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(2)); 468 mobFileWriter.append( 469 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); 470 key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(4)); 471 mobFileWriter.append( 472 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); 473 474 } finally { 475 mobFileWriter.close(); 476 } 477 } 478 479 @Test 480 public void testCompactFilesWithoutDelFile() throws Exception { 481 String tableName = "testCompactFilesWithoutDelFile"; 482 resetConf(); 483 init(tableName); 484 485 createMobFile(basePath); 486 487 listFiles(); 488 489 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, 490 TableName.valueOf(tableName), hcd, pool) { 491 @Override 492 public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) 493 throws IOException { 494 if (files == null || files.isEmpty()) { 495 return null; 496 } 497 498 PartitionedMobCompactionRequest request = select(files, isForceAllFiles); 499 500 // Make sure that there is no del Partitions 501 assertTrue(request.getDelPartitions().size() == 0); 502 503 // Make sure that when there is no startKey/endKey for partition. 504 for (CompactionPartition p : request.getCompactionPartitions()) { 505 assertTrue(p.getStartKey() == null); 506 assertTrue(p.getEndKey() == null); 507 } 508 return null; 509 } 510 }; 511 512 compactor.compact(allFiles, true); 513 } 514 515 static class MyPartitionedMobCompactor extends PartitionedMobCompactor { 516 int delPartitionSize = 0; 517 int PartitionsIncludeDelFiles = 0; 518 CacheConfig cacheConfig = null; 519 520 MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, 521 ColumnFamilyDescriptor column, ExecutorService pool, final int delPartitionSize, 522 final CacheConfig cacheConf, final int PartitionsIncludeDelFiles) 523 throws IOException { 524 super(conf, fs, tableName, column, pool); 525 this.delPartitionSize = delPartitionSize; 526 this.cacheConfig = cacheConf; 527 this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles; 528 } 529 530 @Override public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) 531 throws IOException { 532 if (files == null || files.isEmpty()) { 533 return null; 534 } 535 PartitionedMobCompactionRequest request = select(files, isForceAllFiles); 536 537 assertTrue(request.getDelPartitions().size() == delPartitionSize); 538 if (request.getDelPartitions().size() > 0) { 539 for (CompactionPartition p : request.getCompactionPartitions()) { 540 assertTrue(p.getStartKey() != null); 541 assertTrue(p.getEndKey() != null); 542 } 543 } 544 545 try { 546 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 547 for (Path newDelPath : delPartition.listDelFiles()) { 548 HStoreFile sf = 549 new HStoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE, true); 550 // pre-create reader of a del file to avoid race condition when opening the reader in 551 // each partition. 552 sf.initReader(); 553 delPartition.addStoreFile(sf); 554 } 555 } 556 557 // Make sure that CompactionDelPartitions does not overlap 558 CompactionDelPartition prevDelP = null; 559 for (CompactionDelPartition delP : request.getDelPartitions()) { 560 assertTrue( 561 Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0); 562 563 if (prevDelP != null) { 564 assertTrue( 565 Bytes.compareTo(prevDelP.getId().getEndKey(), delP.getId().getStartKey()) < 0); 566 } 567 } 568 569 int affectedPartitions = 0; 570 571 // Make sure that only del files within key range for a partition is included in compaction. 572 // compact the mob files by partitions in parallel. 573 for (CompactionPartition partition : request.getCompactionPartitions()) { 574 List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); 575 if (!request.getDelPartitions().isEmpty()) { 576 if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), 577 partition.getEndKey()) > 0) || (Bytes.compareTo( 578 request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() 579 .getEndKey(), partition.getStartKey()) < 0))) { 580 581 if (delFiles.size() > 0) { 582 assertTrue(delFiles.size() == 1); 583 affectedPartitions += delFiles.size(); 584 assertTrue(Bytes.compareTo(partition.getStartKey(), 585 CellUtil.cloneRow(delFiles.get(0).getLastKey().get())) <= 0); 586 assertTrue(Bytes.compareTo(partition.getEndKey(), 587 CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey().get())) >= 0); 588 } 589 } 590 } 591 } 592 // The del file is only included in one partition 593 assertTrue(affectedPartitions == PartitionsIncludeDelFiles); 594 } finally { 595 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 596 for (HStoreFile storeFile : delPartition.getStoreFiles()) { 597 try { 598 storeFile.closeStoreFile(true); 599 } catch (IOException e) { 600 LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); 601 } 602 } 603 } 604 } 605 606 return null; 607 } 608 } 609 610 @Test 611 public void testCompactFilesWithOneDelFile() throws Exception { 612 String tableName = "testCompactFilesWithOneDelFile"; 613 resetConf(); 614 init(tableName); 615 616 // Create only del file. 617 createMobFile(basePath); 618 createMobDelFile(basePath, 2); 619 620 listFiles(); 621 622 MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, 623 TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1); 624 625 compactor.compact(allFiles, true); 626 } 627 628 @Test 629 public void testCompactFilesWithMultiDelFiles() throws Exception { 630 String tableName = "testCompactFilesWithMultiDelFiles"; 631 resetConf(); 632 init(tableName); 633 634 // Create only del file. 635 createMobFile(basePath); 636 createMobDelFile(basePath, 0); 637 createMobDelFile(basePath, 1); 638 createMobDelFile(basePath, 2); 639 640 listFiles(); 641 642 MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, 643 TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3); 644 compactor.compact(allFiles, true); 645 } 646 647 private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, 648 int delfileMaxCount) throws Exception { 649 resetConf(); 650 init(tableName); 651 // create 20 mob files. 652 createStoreFiles(basePath, family, qf, 20, Type.Put, new Date()); 653 // create 13 del files 654 createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date()); 655 listFiles(); 656 657 // set the max del file count 658 conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, delfileMaxCount); 659 // set the mob compaction batch size 660 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize); 661 testCompactDelFiles(tableName, 1, 13, false); 662 } 663 664 /** 665 * Tests the selectFiles 666 * @param tableName the table name 667 * @param type the expected compaction type 668 * @param isForceAllFiles whether all the mob files are selected 669 * @param expected the expected start keys 670 */ 671 private void testSelectFiles(String tableName, final CompactionType type, 672 final boolean isForceAllFiles, final List<String> expected) throws IOException { 673 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, 674 TableName.valueOf(tableName), hcd, pool) { 675 @Override 676 public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) 677 throws IOException { 678 if (files == null || files.isEmpty()) { 679 return null; 680 } 681 PartitionedMobCompactionRequest request = select(files, isForceAllFiles); 682 683 // Make sure that when there is no del files, there will be no startKey/endKey for partition. 684 if (request.getDelPartitions().size() == 0) { 685 for (CompactionPartition p : request.getCompactionPartitions()) { 686 assertTrue(p.getStartKey() == null); 687 assertTrue(p.getEndKey() == null); 688 } 689 } 690 691 // Make sure that CompactionDelPartitions does not overlap 692 CompactionDelPartition prevDelP = null; 693 for (CompactionDelPartition delP : request.getDelPartitions()) { 694 assertTrue(Bytes.compareTo(delP.getId().getStartKey(), 695 delP.getId().getEndKey()) <= 0); 696 697 if (prevDelP != null) { 698 assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(), 699 delP.getId().getStartKey()) < 0); 700 } 701 } 702 703 // Make sure that only del files within key range for a partition is included in compaction. 704 // compact the mob files by partitions in parallel. 705 for (CompactionPartition partition : request.getCompactionPartitions()) { 706 List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); 707 if (!request.getDelPartitions().isEmpty()) { 708 if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), 709 partition.getEndKey()) > 0) || (Bytes.compareTo( 710 request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() 711 .getEndKey(), partition.getStartKey()) < 0))) { 712 if (delFiles.size() > 0) { 713 assertTrue(Bytes.compareTo(partition.getStartKey(), 714 delFiles.get(0).getFirstKey().get().getRowArray()) >= 0); 715 assertTrue(Bytes.compareTo(partition.getEndKey(), 716 delFiles.get(delFiles.size() - 1).getLastKey().get().getRowArray()) <= 0); 717 } 718 } 719 } 720 } 721 722 // assert the compaction type 723 assertEquals(type, request.type); 724 // assert get the right partitions 725 compareCompactedPartitions(expected, request.compactionPartitions); 726 // assert get the right del files 727 compareDelFiles(request.getDelPartitions()); 728 return null; 729 } 730 }; 731 compactor.compact(allFiles, isForceAllFiles); 732 } 733 734 /** 735 * Tests the compacteDelFile 736 * @param tableName the table name 737 * @param expectedFileCount the expected file count 738 * @param expectedCellCount the expected cell count 739 * @param isForceAllFiles whether all the mob files are selected 740 */ 741 private void testCompactDelFiles(String tableName, final int expectedFileCount, 742 final int expectedCellCount, boolean isForceAllFiles) throws IOException { 743 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, 744 TableName.valueOf(tableName), hcd, pool) { 745 @Override 746 protected List<Path> performCompaction(PartitionedMobCompactionRequest request) 747 throws IOException { 748 List<Path> delFilePaths = new ArrayList<>(); 749 for (CompactionDelPartition delPartition: request.getDelPartitions()) { 750 for (Path p : delPartition.listDelFiles()) { 751 delFilePaths.add(p); 752 } 753 } 754 List<Path> newDelPaths = compactDelFiles(request, delFilePaths); 755 // assert the del files are merged. 756 assertEquals(expectedFileCount, newDelPaths.size()); 757 assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); 758 return null; 759 } 760 }; 761 compactor.compact(allFiles, isForceAllFiles); 762 } 763 764 /** 765 * Lists the files in the path 766 */ 767 private void listFiles() throws IOException { 768 for (FileStatus file : fs.listStatus(basePath)) { 769 allFiles.add(file); 770 if (file.getPath().getName().endsWith("_del")) { 771 delFiles.add(file.getPath()); 772 } else { 773 mobFiles.add(file); 774 } 775 } 776 } 777 778 /** 779 * Compares the compacted partitions. 780 * @param partitions the collection of CompactedPartitions 781 */ 782 private void compareCompactedPartitions(List<String> expected, 783 Collection<CompactionPartition> partitions) { 784 List<String> actualKeys = new ArrayList<>(); 785 for (CompactionPartition partition : partitions) { 786 actualKeys.add(partition.getPartitionId().getStartKey()); 787 } 788 Collections.sort(expected); 789 Collections.sort(actualKeys); 790 assertEquals(expected.size(), actualKeys.size()); 791 for (int i = 0; i < expected.size(); i++) { 792 assertEquals(expected.get(i), actualKeys.get(i)); 793 } 794 } 795 796 /** 797 * Compares the del files. 798 * @param delPartitions all del partitions 799 */ 800 private void compareDelFiles(List<CompactionDelPartition> delPartitions) { 801 Map<Path, Path> delMap = new HashMap<>(); 802 for (CompactionDelPartition delPartition : delPartitions) { 803 for (Path f : delPartition.listDelFiles()) { 804 delMap.put(f, f); 805 } 806 } 807 for (Path f : delFiles) { 808 assertTrue(delMap.containsKey(f)); 809 } 810 } 811 812 /** 813 * Creates store files. 814 * @param basePath the path to create file 815 * @family the family name 816 * @qualifier the column qualifier 817 * @count the store file number 818 * @type the key type 819 */ 820 private void createStoreFiles(Path basePath, String family, String qualifier, int count, 821 Type type, final Date date) throws IOException { 822 createStoreFiles(basePath, family, qualifier, count, type, false, date); 823 } 824 825 private void createStoreFiles(Path basePath, String family, String qualifier, int count, 826 Type type, boolean sameStartKey, final Date date) throws IOException { 827 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 828 String startKey = "row_"; 829 MobFileName mobFileName = null; 830 for (int i = 0; i < count; i++) { 831 byte[] startRow; 832 if (sameStartKey) { 833 // When creating multiple files under one partition, suffix needs to be different. 834 startRow = Bytes.toBytes(startKey); 835 mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); 836 delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; 837 } else { 838 startRow = Bytes.toBytes(startKey + i); 839 } 840 if(type.equals(Type.Delete)) { 841 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix); 842 } 843 if(type.equals(Type.Put)){ 844 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix); 845 } 846 StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) 847 .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 848 writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), 849 type, (i+1)*1000); 850 } 851 } 852 853 /** 854 * Writes data to store file. 855 * @param writer the store file writer 856 * @param row the row key 857 * @param family the family name 858 * @param qualifier the column qualifier 859 * @param type the key type 860 * @param size the size of value 861 */ 862 private static void writeStoreFile(final StoreFileWriter writer, byte[]row, byte[] family, 863 byte[] qualifier, Type type, int size) throws IOException { 864 long now = System.currentTimeMillis(); 865 try { 866 byte[] dummyData = new byte[size]; 867 new Random().nextBytes(dummyData); 868 writer.append(new KeyValue(row, family, qualifier, now, type, dummyData)); 869 } finally { 870 writer.close(); 871 } 872 } 873 874 /** 875 * Gets the number of del cell in the del files 876 * @param paths the del file paths 877 * @return the cell size 878 */ 879 private int countDelCellsInDelFiles(List<Path> paths) throws IOException { 880 List<HStoreFile> sfs = new ArrayList<>(); 881 int size = 0; 882 for (Path path : paths) { 883 HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true); 884 sfs.add(sf); 885 } 886 List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs, 887 false, true, false, false, HConstants.LATEST_TIMESTAMP)); 888 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 889 long ttl = HStore.determineTTLFromFamily(hcd); 890 ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparatorImpl.COMPARATOR); 891 StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners); 892 List<Cell> results = new ArrayList<>(); 893 boolean hasMore = true; 894 895 while (hasMore) { 896 hasMore = scanner.next(results); 897 size += results.size(); 898 results.clear(); 899 } 900 scanner.close(); 901 return size; 902 } 903 904 private static ExecutorService createThreadPool() { 905 int maxThreads = 10; 906 long keepAliveTime = 60; 907 final SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); 908 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, 909 TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), 910 new RejectedExecutionHandler() { 911 @Override 912 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 913 try { 914 // waiting for a thread to pick up instead of throwing exceptions. 915 queue.put(r); 916 } catch (InterruptedException e) { 917 throw new RejectedExecutionException(e); 918 } 919 } 920 }); 921 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); 922 return pool; 923 } 924 925 /** 926 * Resets the configuration. 927 */ 928 private void resetConf() { 929 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 930 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); 931 conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 932 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 933 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); 934 } 935 936 /** 937 * The customized Distributed File System Implementation 938 */ 939 static class FaultyDistributedFileSystem extends DistributedFileSystem { 940 private volatile boolean throwException = false; 941 942 public FaultyDistributedFileSystem() { 943 super(); 944 } 945 946 public void setThrowException(boolean throwException) { 947 this.throwException = throwException; 948 } 949 950 @Override 951 public boolean rename(Path src, Path dst) throws IOException { 952 if (throwException) { 953 throw new IOException("No more files allowed"); 954 } 955 return super.rename(src, dst); 956 } 957 } 958}