001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob.compactions; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Calendar; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Date; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.RejectedExecutionException; 036import java.util.concurrent.RejectedExecutionHandler; 037import java.util.concurrent.SynchronousQueue; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellComparatorImpl; 046import org.apache.hadoop.hbase.CellUtil; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseTestingUtility; 049import org.apache.hadoop.hbase.HColumnDescriptor; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.KeyValue.Type; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 056import org.apache.hadoop.hbase.io.hfile.CacheConfig; 057import org.apache.hadoop.hbase.io.hfile.HFileContext; 058import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 059import org.apache.hadoop.hbase.mob.MobConstants; 060import org.apache.hadoop.hbase.mob.MobFileName; 061import org.apache.hadoop.hbase.mob.MobUtils; 062import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType; 063import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionDelPartition; 064import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition; 065import org.apache.hadoop.hbase.regionserver.BloomType; 066import org.apache.hadoop.hbase.regionserver.HStore; 067import org.apache.hadoop.hbase.regionserver.HStoreFile; 068import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 069import org.apache.hadoop.hbase.regionserver.ScanInfo; 070import org.apache.hadoop.hbase.regionserver.ScanType; 071import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 072import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 073import org.apache.hadoop.hbase.regionserver.StoreScanner; 074import org.apache.hadoop.hbase.testclassification.LargeTests; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.CommonFSUtils; 077import org.apache.hadoop.hbase.util.Threads; 078import org.apache.hadoop.hdfs.DistributedFileSystem; 079import org.junit.AfterClass; 080import org.junit.BeforeClass; 081import org.junit.ClassRule; 082import org.junit.Rule; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.junit.rules.TestName; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089@Category(LargeTests.class) 090public class TestPartitionedMobCompactor { 091 092 @ClassRule 093 public static final HBaseClassTestRule CLASS_RULE = 094 HBaseClassTestRule.forClass(TestPartitionedMobCompactor.class); 095 096 private static final Logger LOG = LoggerFactory.getLogger(TestPartitionedMobCompactor.class); 097 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 098 private final static String family = "family"; 099 private final static String qf = "qf"; 100 private final long DAY_IN_MS = 1000 * 60 * 60 * 24; 101 private static byte[] KEYS = Bytes.toBytes("012"); 102 private HColumnDescriptor hcd = new HColumnDescriptor(family); 103 private Configuration conf = TEST_UTIL.getConfiguration(); 104 private CacheConfig cacheConf = new CacheConfig(conf); 105 private FileSystem fs; 106 private List<FileStatus> mobFiles = new ArrayList<>(); 107 private List<Path> delFiles = new ArrayList<>(); 108 private List<FileStatus> allFiles = new ArrayList<>(); 109 private Path basePath; 110 private String mobSuffix; 111 private String delSuffix; 112 private static ExecutorService pool; 113 114 @Rule 115 public TestName name = new TestName(); 116 117 @BeforeClass 118 public static void setUpBeforeClass() throws Exception { 119 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 120 // Inject our customized DistributedFileSystem 121 TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class, 122 DistributedFileSystem.class); 123 TEST_UTIL.startMiniCluster(1); 124 pool = createThreadPool(); 125 } 126 127 @AfterClass 128 public static void tearDownAfterClass() throws Exception { 129 pool.shutdown(); 130 TEST_UTIL.shutdownMiniCluster(); 131 } 132 133 private void init(String tableName) throws Exception { 134 fs = FileSystem.get(conf); 135 Path testDir = CommonFSUtils.getRootDir(conf); 136 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); 137 basePath = new Path(new Path(mobTestDir, tableName), family); 138 mobSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", ""); 139 delSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", "") + "_del"; 140 allFiles.clear(); 141 mobFiles.clear(); 142 delFiles.clear(); 143 } 144 145 @Test 146 public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception { 147 String tableName = "testCompactionSelectAllFilesWeeklyPolicy"; 148 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 149 CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1); 150 } 151 152 @Test 153 public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception { 154 String tableName = "testCompactionSelectPartFilesWeeklyPolicy"; 155 testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, 156 new Date(), MobCompactPartitionPolicy.WEEKLY, 1); 157 } 158 159 @Test 160 public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception { 161 String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek"; 162 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 163 testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek, 164 MobCompactPartitionPolicy.WEEKLY, 7); 165 } 166 167 @Test 168 public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception { 169 String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek"; 170 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 171 testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, 172 false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7); 173 } 174 175 @Test 176 public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception { 177 String tableName = "testCompactionSelectAllFilesMonthlyPolicy"; 178 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 179 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 180 CompactionType.ALL_FILES, false, false, dateLastWeek, 181 MobCompactPartitionPolicy.MONTHLY, 7); 182 } 183 184 @Test 185 public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception { 186 String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy"; 187 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 188 CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1); 189 } 190 191 @Test 192 public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception { 193 String tableName = "testCompactionSelectPartFilesMonthlyPolicy"; 194 testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, 195 new Date(), MobCompactPartitionPolicy.MONTHLY, 1); 196 } 197 198 @Test 199 public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception { 200 String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek"; 201 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 202 Calendar calendar = Calendar.getInstance(); 203 Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date()); 204 CompactionType type = CompactionType.PART_FILES; 205 long mergeSizeMultiFactor = 7; 206 207 208 // The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going 209 // to be last month and the monthly policy is going to be applied here. 210 if (dateLastWeek.before(firstDayOfCurrentMonth)) { 211 type = CompactionType.ALL_FILES; 212 mergeSizeMultiFactor *= 4; 213 } 214 215 testCompactionAtMergeSize(tableName, 700, type, false, false, dateLastWeek, 216 MobCompactPartitionPolicy.MONTHLY, mergeSizeMultiFactor); 217 } 218 219 @Test 220 public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception { 221 String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek"; 222 Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); 223 224 testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, 225 false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7); 226 } 227 228 @Test 229 public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exception { 230 String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth"; 231 232 // back 5 weeks, it is going to be a past month 233 Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); 234 testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth, 235 MobCompactPartitionPolicy.MONTHLY, 28); 236 } 237 238 @Test 239 public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exception { 240 String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth"; 241 242 // back 5 weeks, it is going to be a past month 243 Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); 244 testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES, 245 false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28); 246 } 247 248 @Test 249 public void testCompactionSelectWithAllFiles() throws Exception { 250 String tableName = "testCompactionSelectWithAllFiles"; 251 // If there is only 1 file, it will not be compacted with _del files, so 252 // It wont be CompactionType.ALL_FILES in this case, do not create with _del files. 253 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, 254 CompactionType.ALL_FILES, false, false); 255 } 256 257 @Test 258 public void testCompactionSelectWithPartFiles() throws Exception { 259 String tableName = "testCompactionSelectWithPartFiles"; 260 testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false); 261 } 262 263 @Test 264 public void testCompactionSelectWithForceAllFiles() throws Exception { 265 String tableName = "testCompactionSelectWithForceAllFiles"; 266 testCompactionAtMergeSize(tableName, Long.MAX_VALUE, CompactionType.ALL_FILES, true); 267 } 268 269 private void testCompactionAtMergeSize(final String tableName, 270 final long mergeSize, final CompactionType type, final boolean isForceAllFiles) 271 throws Exception { 272 testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, true); 273 } 274 275 private void testCompactionAtMergeSize(final String tableName, 276 final long mergeSize, final CompactionType type, final boolean isForceAllFiles, 277 final boolean createDelFiles) 278 throws Exception { 279 Date date = new Date(); 280 testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date); 281 } 282 283 private void testCompactionAtMergeSize(final String tableName, 284 final long mergeSize, final CompactionType type, final boolean isForceAllFiles, 285 final boolean createDelFiles, final Date date) 286 throws Exception { 287 testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date, 288 MobCompactPartitionPolicy.DAILY, 1); 289 } 290 291 private void testCompactionAtMergeSize(final String tableName, 292 final long mergeSize, final CompactionType type, final boolean isForceAllFiles, 293 final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy, 294 final long mergeSizeMultiFactor) 295 throws Exception { 296 resetConf(); 297 init(tableName); 298 int count = 10; 299 // create 10 mob files. 300 createStoreFiles(basePath, family, qf, count, Type.Put, date); 301 302 if (createDelFiles) { 303 // create 10 del files 304 createStoreFiles(basePath, family, qf, count, Type.Delete, date); 305 } 306 307 Calendar calendar = Calendar.getInstance(); 308 Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date()); 309 310 listFiles(); 311 List<String> expectedStartKeys = new ArrayList<>(); 312 for(FileStatus file : mobFiles) { 313 if(file.getLen() < mergeSize * mergeSizeMultiFactor) { 314 String fileName = file.getPath().getName(); 315 String startKey = fileName.substring(0, 32); 316 317 // If the policy is monthly and files are in current week, they will be skipped 318 // in minor compcation. 319 boolean skipCompaction = false; 320 if (policy == MobCompactPartitionPolicy.MONTHLY) { 321 String fileDateStr = MobFileName.getDateFromName(fileName); 322 Date fileDate; 323 try { 324 fileDate = MobUtils.parseDate(fileDateStr); 325 } catch (ParseException e) { 326 LOG.warn("Failed to parse date " + fileDateStr, e); 327 fileDate = new Date(); 328 } 329 if (!fileDate.before(firstDayOfCurrentWeek)) { 330 skipCompaction = true; 331 } 332 } 333 334 // If it is not an major mob compaction and del files are there, 335 // these mob files wont be compacted. 336 if (isForceAllFiles || (!createDelFiles && !skipCompaction)) { 337 expectedStartKeys.add(startKey); 338 } 339 } 340 } 341 342 // Set the policy 343 this.hcd.setMobCompactPartitionPolicy(policy); 344 // set the mob compaction mergeable threshold 345 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); 346 testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys); 347 // go back to the default daily policy 348 this.hcd.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.DAILY); 349 } 350 351 @Test 352 public void testCompactDelFilesWithDefaultBatchSize() throws Exception { 353 testCompactDelFilesAtBatchSize(name.getMethodName(), MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE, 354 MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 355 } 356 357 @Test 358 public void testCompactDelFilesWithSmallBatchSize() throws Exception { 359 testCompactDelFilesAtBatchSize(name.getMethodName(), 4, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 360 } 361 362 @Test 363 public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { 364 testCompactDelFilesAtBatchSize(name.getMethodName(), 4, 2); 365 } 366 367 @Test 368 public void testCompactFilesWithDstDirFull() throws Exception { 369 String tableName = name.getMethodName(); 370 fs = FileSystem.get(conf); 371 FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs; 372 Path testDir = CommonFSUtils.getRootDir(conf); 373 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); 374 basePath = new Path(new Path(mobTestDir, tableName), family); 375 376 try { 377 int count = 2; 378 // create 2 mob files. 379 createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date()); 380 listFiles(); 381 382 TableName tName = TableName.valueOf(tableName); 383 MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool); 384 faultyFs.setThrowException(true); 385 try { 386 compactor.compact(allFiles, true); 387 } catch (IOException e) { 388 System.out.println("Expected exception, ignore"); 389 } 390 391 // Verify that all the files in tmp directory are cleaned up 392 Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); 393 FileStatus[] ls = faultyFs.listStatus(tempPath); 394 395 // Only .bulkload under this directory 396 assertTrue(ls.length == 1); 397 assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName())); 398 399 Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( 400 tName.getNamespaceAsString(), tName.getQualifierAsString()))); 401 402 // Nothing in bulkLoad directory 403 FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath); 404 assertTrue(lsBulkload.length == 0); 405 406 } finally { 407 faultyFs.setThrowException(false); 408 } 409 } 410 411 /** 412 * Create mulitple partition files 413 */ 414 private void createMobFile(Path basePath) throws IOException { 415 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 416 MobFileName mobFileName = null; 417 int ii = 0; 418 Date today = new Date(); 419 for (byte k0 : KEYS) { 420 byte[] startRow = Bytes.toBytes(ii++); 421 422 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), mobSuffix); 423 424 StoreFileWriter mobFileWriter = 425 new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) 426 .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 427 428 long now = System.currentTimeMillis(); 429 try { 430 for (int i = 0; i < 10; i++) { 431 byte[] key = Bytes.add(Bytes.toBytes(k0), Bytes.toBytes(i)); 432 byte[] dummyData = new byte[5000]; 433 new Random().nextBytes(dummyData); 434 mobFileWriter.append( 435 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, dummyData)); 436 } 437 } finally { 438 mobFileWriter.close(); 439 } 440 } 441 } 442 443 /** 444 * Create mulitple partition delete files 445 */ 446 private void createMobDelFile(Path basePath, int startKey) throws IOException { 447 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 448 MobFileName mobFileName = null; 449 Date today = new Date(); 450 451 byte[] startRow = Bytes.toBytes(startKey); 452 453 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), delSuffix); 454 455 StoreFileWriter mobFileWriter = 456 new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) 457 .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 458 459 long now = System.currentTimeMillis(); 460 try { 461 byte[] key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(0)); 462 byte[] dummyData = new byte[5000]; 463 new Random().nextBytes(dummyData); 464 mobFileWriter.append( 465 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); 466 key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(2)); 467 mobFileWriter.append( 468 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); 469 key = Bytes.add(Bytes.toBytes(KEYS[startKey]), Bytes.toBytes(4)); 470 mobFileWriter.append( 471 new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Delete, dummyData)); 472 473 } finally { 474 mobFileWriter.close(); 475 } 476 } 477 478 @Test 479 public void testCompactFilesWithoutDelFile() throws Exception { 480 String tableName = "testCompactFilesWithoutDelFile"; 481 resetConf(); 482 init(tableName); 483 484 createMobFile(basePath); 485 486 listFiles(); 487 488 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, 489 TableName.valueOf(tableName), hcd, pool) { 490 @Override 491 public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) 492 throws IOException { 493 if (files == null || files.isEmpty()) { 494 return null; 495 } 496 497 PartitionedMobCompactionRequest request = select(files, isForceAllFiles); 498 499 // Make sure that there is no del Partitions 500 assertTrue(request.getDelPartitions().size() == 0); 501 502 // Make sure that when there is no startKey/endKey for partition. 503 for (CompactionPartition p : request.getCompactionPartitions()) { 504 assertTrue(p.getStartKey() == null); 505 assertTrue(p.getEndKey() == null); 506 } 507 return null; 508 } 509 }; 510 511 compactor.compact(allFiles, true); 512 } 513 514 static class MyPartitionedMobCompactor extends PartitionedMobCompactor { 515 int delPartitionSize = 0; 516 int PartitionsIncludeDelFiles = 0; 517 CacheConfig cacheConfig = null; 518 519 MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, 520 ColumnFamilyDescriptor column, ExecutorService pool, final int delPartitionSize, 521 final CacheConfig cacheConf, final int PartitionsIncludeDelFiles) 522 throws IOException { 523 super(conf, fs, tableName, column, pool); 524 this.delPartitionSize = delPartitionSize; 525 this.cacheConfig = cacheConf; 526 this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles; 527 } 528 529 @Override public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) 530 throws IOException { 531 if (files == null || files.isEmpty()) { 532 return null; 533 } 534 PartitionedMobCompactionRequest request = select(files, isForceAllFiles); 535 536 assertTrue(request.getDelPartitions().size() == delPartitionSize); 537 if (request.getDelPartitions().size() > 0) { 538 for (CompactionPartition p : request.getCompactionPartitions()) { 539 assertTrue(p.getStartKey() != null); 540 assertTrue(p.getEndKey() != null); 541 } 542 } 543 544 try { 545 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 546 for (Path newDelPath : delPartition.listDelFiles()) { 547 HStoreFile sf = 548 new HStoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE, true); 549 // pre-create reader of a del file to avoid race condition when opening the reader in 550 // each partition. 551 sf.initReader(); 552 delPartition.addStoreFile(sf); 553 } 554 } 555 556 // Make sure that CompactionDelPartitions does not overlap 557 CompactionDelPartition prevDelP = null; 558 for (CompactionDelPartition delP : request.getDelPartitions()) { 559 assertTrue( 560 Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0); 561 562 if (prevDelP != null) { 563 assertTrue( 564 Bytes.compareTo(prevDelP.getId().getEndKey(), delP.getId().getStartKey()) < 0); 565 } 566 } 567 568 int affectedPartitions = 0; 569 570 // Make sure that only del files within key range for a partition is included in compaction. 571 // compact the mob files by partitions in parallel. 572 for (CompactionPartition partition : request.getCompactionPartitions()) { 573 List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); 574 if (!request.getDelPartitions().isEmpty()) { 575 if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), 576 partition.getEndKey()) > 0) || (Bytes.compareTo( 577 request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() 578 .getEndKey(), partition.getStartKey()) < 0))) { 579 580 if (delFiles.size() > 0) { 581 assertTrue(delFiles.size() == 1); 582 affectedPartitions += delFiles.size(); 583 assertTrue(Bytes.compareTo(partition.getStartKey(), 584 CellUtil.cloneRow(delFiles.get(0).getLastKey().get())) <= 0); 585 assertTrue(Bytes.compareTo(partition.getEndKey(), 586 CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey().get())) >= 0); 587 } 588 } 589 } 590 } 591 // The del file is only included in one partition 592 assertTrue(affectedPartitions == PartitionsIncludeDelFiles); 593 } finally { 594 for (CompactionDelPartition delPartition : request.getDelPartitions()) { 595 for (HStoreFile storeFile : delPartition.getStoreFiles()) { 596 try { 597 storeFile.closeStoreFile(true); 598 } catch (IOException e) { 599 LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e); 600 } 601 } 602 } 603 } 604 605 return null; 606 } 607 } 608 609 @Test 610 public void testCompactFilesWithOneDelFile() throws Exception { 611 String tableName = "testCompactFilesWithOneDelFile"; 612 resetConf(); 613 init(tableName); 614 615 // Create only del file. 616 createMobFile(basePath); 617 createMobDelFile(basePath, 2); 618 619 listFiles(); 620 621 MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, 622 TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1); 623 624 compactor.compact(allFiles, true); 625 } 626 627 @Test 628 public void testCompactFilesWithMultiDelFiles() throws Exception { 629 String tableName = "testCompactFilesWithMultiDelFiles"; 630 resetConf(); 631 init(tableName); 632 633 // Create only del file. 634 createMobFile(basePath); 635 createMobDelFile(basePath, 0); 636 createMobDelFile(basePath, 1); 637 createMobDelFile(basePath, 2); 638 639 listFiles(); 640 641 MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, 642 TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3); 643 compactor.compact(allFiles, true); 644 } 645 646 private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, 647 int delfileMaxCount) throws Exception { 648 resetConf(); 649 init(tableName); 650 // create 20 mob files. 651 createStoreFiles(basePath, family, qf, 20, Type.Put, new Date()); 652 // create 13 del files 653 createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date()); 654 listFiles(); 655 656 // set the max del file count 657 conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, delfileMaxCount); 658 // set the mob compaction batch size 659 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize); 660 testCompactDelFiles(tableName, 1, 13, false); 661 } 662 663 /** 664 * Tests the selectFiles 665 * @param tableName the table name 666 * @param type the expected compaction type 667 * @param isForceAllFiles whether all the mob files are selected 668 * @param expected the expected start keys 669 */ 670 private void testSelectFiles(String tableName, final CompactionType type, 671 final boolean isForceAllFiles, final List<String> expected) throws IOException { 672 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, 673 TableName.valueOf(tableName), hcd, pool) { 674 @Override 675 public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles) 676 throws IOException { 677 if (files == null || files.isEmpty()) { 678 return null; 679 } 680 PartitionedMobCompactionRequest request = select(files, isForceAllFiles); 681 682 // Make sure that when there is no del files, there will be no startKey/endKey for partition. 683 if (request.getDelPartitions().size() == 0) { 684 for (CompactionPartition p : request.getCompactionPartitions()) { 685 assertTrue(p.getStartKey() == null); 686 assertTrue(p.getEndKey() == null); 687 } 688 } 689 690 // Make sure that CompactionDelPartitions does not overlap 691 CompactionDelPartition prevDelP = null; 692 for (CompactionDelPartition delP : request.getDelPartitions()) { 693 assertTrue(Bytes.compareTo(delP.getId().getStartKey(), 694 delP.getId().getEndKey()) <= 0); 695 696 if (prevDelP != null) { 697 assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(), 698 delP.getId().getStartKey()) < 0); 699 } 700 } 701 702 // Make sure that only del files within key range for a partition is included in compaction. 703 // compact the mob files by partitions in parallel. 704 for (CompactionPartition partition : request.getCompactionPartitions()) { 705 List<HStoreFile> delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); 706 if (!request.getDelPartitions().isEmpty()) { 707 if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), 708 partition.getEndKey()) > 0) || (Bytes.compareTo( 709 request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() 710 .getEndKey(), partition.getStartKey()) < 0))) { 711 if (delFiles.size() > 0) { 712 assertTrue(Bytes.compareTo(partition.getStartKey(), 713 delFiles.get(0).getFirstKey().get().getRowArray()) >= 0); 714 assertTrue(Bytes.compareTo(partition.getEndKey(), 715 delFiles.get(delFiles.size() - 1).getLastKey().get().getRowArray()) <= 0); 716 } 717 } 718 } 719 } 720 721 // assert the compaction type 722 assertEquals(type, request.type); 723 // assert get the right partitions 724 compareCompactedPartitions(expected, request.compactionPartitions); 725 // assert get the right del files 726 compareDelFiles(request.getDelPartitions()); 727 return null; 728 } 729 }; 730 compactor.compact(allFiles, isForceAllFiles); 731 } 732 733 /** 734 * Tests the compacteDelFile 735 * @param tableName the table name 736 * @param expectedFileCount the expected file count 737 * @param expectedCellCount the expected cell count 738 * @param isForceAllFiles whether all the mob files are selected 739 */ 740 private void testCompactDelFiles(String tableName, final int expectedFileCount, 741 final int expectedCellCount, boolean isForceAllFiles) throws IOException { 742 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, 743 TableName.valueOf(tableName), hcd, pool) { 744 @Override 745 protected List<Path> performCompaction(PartitionedMobCompactionRequest request) 746 throws IOException { 747 List<Path> delFilePaths = new ArrayList<>(); 748 for (CompactionDelPartition delPartition: request.getDelPartitions()) { 749 for (Path p : delPartition.listDelFiles()) { 750 delFilePaths.add(p); 751 } 752 } 753 List<Path> newDelPaths = compactDelFiles(request, delFilePaths); 754 // assert the del files are merged. 755 assertEquals(expectedFileCount, newDelPaths.size()); 756 assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); 757 return null; 758 } 759 }; 760 compactor.compact(allFiles, isForceAllFiles); 761 } 762 763 /** 764 * Lists the files in the path 765 */ 766 private void listFiles() throws IOException { 767 for (FileStatus file : fs.listStatus(basePath)) { 768 allFiles.add(file); 769 if (file.getPath().getName().endsWith("_del")) { 770 delFiles.add(file.getPath()); 771 } else { 772 mobFiles.add(file); 773 } 774 } 775 } 776 777 /** 778 * Compares the compacted partitions. 779 * @param partitions the collection of CompactedPartitions 780 */ 781 private void compareCompactedPartitions(List<String> expected, 782 Collection<CompactionPartition> partitions) { 783 List<String> actualKeys = new ArrayList<>(); 784 for (CompactionPartition partition : partitions) { 785 actualKeys.add(partition.getPartitionId().getStartKey()); 786 } 787 Collections.sort(expected); 788 Collections.sort(actualKeys); 789 assertEquals(expected.size(), actualKeys.size()); 790 for (int i = 0; i < expected.size(); i++) { 791 assertEquals(expected.get(i), actualKeys.get(i)); 792 } 793 } 794 795 /** 796 * Compares the del files. 797 * @param delPartitions all del partitions 798 */ 799 private void compareDelFiles(List<CompactionDelPartition> delPartitions) { 800 Map<Path, Path> delMap = new HashMap<>(); 801 for (CompactionDelPartition delPartition : delPartitions) { 802 for (Path f : delPartition.listDelFiles()) { 803 delMap.put(f, f); 804 } 805 } 806 for (Path f : delFiles) { 807 assertTrue(delMap.containsKey(f)); 808 } 809 } 810 811 /** 812 * Creates store files. 813 * @param basePath the path to create file 814 * @family the family name 815 * @qualifier the column qualifier 816 * @count the store file number 817 * @type the key type 818 */ 819 private void createStoreFiles(Path basePath, String family, String qualifier, int count, 820 Type type, final Date date) throws IOException { 821 createStoreFiles(basePath, family, qualifier, count, type, false, date); 822 } 823 824 private void createStoreFiles(Path basePath, String family, String qualifier, int count, 825 Type type, boolean sameStartKey, final Date date) throws IOException { 826 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); 827 String startKey = "row_"; 828 MobFileName mobFileName = null; 829 for (int i = 0; i < count; i++) { 830 byte[] startRow; 831 if (sameStartKey) { 832 // When creating multiple files under one partition, suffix needs to be different. 833 startRow = Bytes.toBytes(startKey); 834 mobSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", ""); 835 delSuffix = TEST_UTIL.getRandomUUID().toString().replaceAll("-", "") + "_del"; 836 } else { 837 startRow = Bytes.toBytes(startKey + i); 838 } 839 if(type.equals(Type.Delete)) { 840 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix); 841 } 842 if(type.equals(Type.Put)){ 843 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix); 844 } 845 StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) 846 .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); 847 writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), 848 type, (i+1)*1000); 849 } 850 } 851 852 /** 853 * Writes data to store file. 854 * @param writer the store file writer 855 * @param row the row key 856 * @param family the family name 857 * @param qualifier the column qualifier 858 * @param type the key type 859 * @param size the size of value 860 */ 861 private static void writeStoreFile(final StoreFileWriter writer, byte[]row, byte[] family, 862 byte[] qualifier, Type type, int size) throws IOException { 863 long now = System.currentTimeMillis(); 864 try { 865 byte[] dummyData = new byte[size]; 866 new Random().nextBytes(dummyData); 867 writer.append(new KeyValue(row, family, qualifier, now, type, dummyData)); 868 } finally { 869 writer.close(); 870 } 871 } 872 873 /** 874 * Gets the number of del cell in the del files 875 * @param paths the del file paths 876 * @return the cell size 877 */ 878 private int countDelCellsInDelFiles(List<Path> paths) throws IOException { 879 List<HStoreFile> sfs = new ArrayList<>(); 880 int size = 0; 881 for (Path path : paths) { 882 HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true); 883 sfs.add(sf); 884 } 885 List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs, 886 false, true, false, false, HConstants.LATEST_TIMESTAMP)); 887 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); 888 long ttl = HStore.determineTTLFromFamily(hcd); 889 ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparatorImpl.COMPARATOR); 890 StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners); 891 List<Cell> results = new ArrayList<>(); 892 boolean hasMore = true; 893 894 while (hasMore) { 895 hasMore = scanner.next(results); 896 size += results.size(); 897 results.clear(); 898 } 899 scanner.close(); 900 return size; 901 } 902 903 private static ExecutorService createThreadPool() { 904 int maxThreads = 10; 905 long keepAliveTime = 60; 906 final SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); 907 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, 908 TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), 909 new RejectedExecutionHandler() { 910 @Override 911 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 912 try { 913 // waiting for a thread to pick up instead of throwing exceptions. 914 queue.put(r); 915 } catch (InterruptedException e) { 916 throw new RejectedExecutionException(e); 917 } 918 } 919 }); 920 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); 921 return pool; 922 } 923 924 /** 925 * Resets the configuration. 926 */ 927 private void resetConf() { 928 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 929 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); 930 conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); 931 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 932 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); 933 } 934 935 /** 936 * The customized Distributed File System Implementation 937 */ 938 static class FaultyDistributedFileSystem extends DistributedFileSystem { 939 private volatile boolean throwException = false; 940 941 public FaultyDistributedFileSystem() { 942 super(); 943 } 944 945 public void setThrowException(boolean throwException) { 946 this.throwException = throwException; 947 } 948 949 @Override 950 public boolean rename(Path src, Path dst) throws IOException { 951 if (throwException) { 952 throw new IOException("No more files allowed"); 953 } 954 return super.rename(src, dst); 955 } 956 } 957}