001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mob; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.text.ParseException; 024import java.text.SimpleDateFormat; 025import java.util.ArrayList; 026import java.util.Calendar; 027import java.util.Collection; 028import java.util.Date; 029import java.util.List; 030import java.util.Optional; 031import java.util.UUID; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.RejectedExecutionException; 034import java.util.concurrent.RejectedExecutionHandler; 035import java.util.concurrent.SynchronousQueue; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.CellComparator; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.PrivateCellUtil; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.Tag; 049import org.apache.hadoop.hbase.TagType; 050import org.apache.hadoop.hbase.TagUtil; 051import org.apache.hadoop.hbase.backup.HFileArchiver; 052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 053import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.Scan; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.io.HFileLink; 059import org.apache.hadoop.hbase.io.compress.Compression; 060import org.apache.hadoop.hbase.io.crypto.Encryption; 061import org.apache.hadoop.hbase.io.hfile.CacheConfig; 062import org.apache.hadoop.hbase.io.hfile.HFile; 063import org.apache.hadoop.hbase.io.hfile.HFileContext; 064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 065import org.apache.hadoop.hbase.master.locking.LockManager; 066import org.apache.hadoop.hbase.mob.compactions.MobCompactor; 067import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; 068import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; 069import org.apache.hadoop.hbase.regionserver.BloomType; 070import org.apache.hadoop.hbase.regionserver.HStore; 071import org.apache.hadoop.hbase.regionserver.HStoreFile; 072import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.ChecksumType; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.FSUtils; 077import org.apache.hadoop.hbase.util.ReflectionUtils; 078import org.apache.hadoop.hbase.util.Threads; 079import org.apache.yetus.audience.InterfaceAudience; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083/** 084 * The mob utilities 085 */ 086@InterfaceAudience.Private 087public final class MobUtils { 088 089 private static final Logger LOG = LoggerFactory.getLogger(MobUtils.class); 090 private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7; 091 private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER; 092 093 private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT = 094 new ThreadLocal<SimpleDateFormat>() { 095 @Override 096 protected SimpleDateFormat initialValue() { 097 return new SimpleDateFormat("yyyyMMdd"); 098 } 099 }; 100 101 private static final byte[] REF_DELETE_MARKER_TAG_BYTES; 102 static { 103 List<Tag> tags = new ArrayList<>(); 104 tags.add(MobConstants.MOB_REF_TAG); 105 REF_DELETE_MARKER_TAG_BYTES = TagUtil.fromList(tags); 106 } 107 108 /** 109 * Private constructor to keep this class from being instantiated. 110 */ 111 private MobUtils() { 112 } 113 114 /** 115 * Formats a date to a string. 116 * @param date The date. 117 * @return The string format of the date, it's yyyymmdd. 118 */ 119 public static String formatDate(Date date) { 120 return LOCAL_FORMAT.get().format(date); 121 } 122 123 /** 124 * Parses the string to a date. 125 * @param dateString The string format of a date, it's yyyymmdd. 126 * @return A date. 127 * @throws ParseException 128 */ 129 public static Date parseDate(String dateString) throws ParseException { 130 return LOCAL_FORMAT.get().parse(dateString); 131 } 132 133 /** 134 * Get the first day of the input date's month 135 * @param calendar Calendar object 136 * @param date The date to find out its first day of that month 137 * @return The first day in the month 138 */ 139 public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) { 140 141 calendar.setTime(date); 142 calendar.set(Calendar.HOUR_OF_DAY, 0); 143 calendar.set(Calendar.MINUTE, 0); 144 calendar.set(Calendar.SECOND, 0); 145 calendar.set(Calendar.MILLISECOND, 0); 146 calendar.set(Calendar.DAY_OF_MONTH, 1); 147 148 Date firstDayInMonth = calendar.getTime(); 149 return firstDayInMonth; 150 } 151 152 /** 153 * Get the first day of the input date's week 154 * @param calendar Calendar object 155 * @param date The date to find out its first day of that week 156 * @return The first day in the week 157 */ 158 public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) { 159 160 calendar.setTime(date); 161 calendar.set(Calendar.HOUR_OF_DAY, 0); 162 calendar.set(Calendar.MINUTE, 0); 163 calendar.set(Calendar.SECOND, 0); 164 calendar.set(Calendar.MILLISECOND, 0); 165 calendar.setFirstDayOfWeek(Calendar.MONDAY); 166 calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY); 167 168 Date firstDayInWeek = calendar.getTime(); 169 return firstDayInWeek; 170 } 171 172 /** 173 * Whether the current cell is a mob reference cell. 174 * @param cell The current cell. 175 * @return True if the cell has a mob reference tag, false if it doesn't. 176 */ 177 public static boolean isMobReferenceCell(Cell cell) { 178 if (cell.getTagsLength() > 0) { 179 Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_REFERENCE_TAG_TYPE); 180 if (tag.isPresent()) { 181 return true; 182 } 183 } 184 return false; 185 } 186 187 /** 188 * Gets the table name tag. 189 * @param cell The current cell. 190 * @return The table name tag. 191 */ 192 public static Tag getTableNameTag(Cell cell) { 193 if (cell.getTagsLength() > 0) { 194 Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE); 195 if (tag.isPresent()) { 196 return tag.get(); 197 } 198 } 199 return null; 200 } 201 202 /** 203 * Whether the tag list has a mob reference tag. 204 * @param tags The tag list. 205 * @return True if the list has a mob reference tag, false if it doesn't. 206 */ 207 public static boolean hasMobReferenceTag(List<Tag> tags) { 208 if (!tags.isEmpty()) { 209 for (Tag tag : tags) { 210 if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) { 211 return true; 212 } 213 } 214 } 215 return false; 216 } 217 218 /** 219 * Indicates whether it's a raw scan. 220 * The information is set in the attribute "hbase.mob.scan.raw" of scan. 221 * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file. 222 * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in 223 * the mob file. 224 * @param scan The current scan. 225 * @return True if it's a raw scan. 226 */ 227 public static boolean isRawMobScan(Scan scan) { 228 byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW); 229 try { 230 return raw != null && Bytes.toBoolean(raw); 231 } catch (IllegalArgumentException e) { 232 return false; 233 } 234 } 235 236 /** 237 * Indicates whether it's a reference only scan. 238 * The information is set in the attribute "hbase.mob.scan.ref.only" of scan. 239 * If it's a ref only scan, only the cells with ref tag are returned. 240 * @param scan The current scan. 241 * @return True if it's a ref only scan. 242 */ 243 public static boolean isRefOnlyScan(Scan scan) { 244 byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY); 245 try { 246 return refOnly != null && Bytes.toBoolean(refOnly); 247 } catch (IllegalArgumentException e) { 248 return false; 249 } 250 } 251 252 /** 253 * Indicates whether the scan contains the information of caching blocks. 254 * The information is set in the attribute "hbase.mob.cache.blocks" of scan. 255 * @param scan The current scan. 256 * @return True when the Scan attribute specifies to cache the MOB blocks. 257 */ 258 public static boolean isCacheMobBlocks(Scan scan) { 259 byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS); 260 try { 261 return cache != null && Bytes.toBoolean(cache); 262 } catch (IllegalArgumentException e) { 263 return false; 264 } 265 } 266 267 /** 268 * Sets the attribute of caching blocks in the scan. 269 * 270 * @param scan 271 * The current scan. 272 * @param cacheBlocks 273 * True, set the attribute of caching blocks into the scan, the scanner with this scan 274 * caches blocks. 275 * False, the scanner doesn't cache blocks for this scan. 276 */ 277 public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { 278 scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks)); 279 } 280 281 /** 282 * Cleans the expired mob files. 283 * Cleans the files whose creation date is older than (current - columnFamily.ttl), and 284 * the minVersions of that column family is 0. 285 * @param fs The current file system. 286 * @param conf The current configuration. 287 * @param tableName The current table name. 288 * @param columnDescriptor The descriptor of the current column family. 289 * @param cacheConfig The cacheConfig that disables the block cache. 290 * @param current The current time. 291 * @throws IOException 292 */ 293 public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName, 294 ColumnFamilyDescriptor columnDescriptor, CacheConfig cacheConfig, long current) 295 throws IOException { 296 long timeToLive = columnDescriptor.getTimeToLive(); 297 if (Integer.MAX_VALUE == timeToLive) { 298 // no need to clean, because the TTL is not set. 299 return; 300 } 301 302 Calendar calendar = Calendar.getInstance(); 303 calendar.setTimeInMillis(current - timeToLive * 1000); 304 calendar.set(Calendar.HOUR_OF_DAY, 0); 305 calendar.set(Calendar.MINUTE, 0); 306 calendar.set(Calendar.SECOND, 0); 307 308 Date expireDate = calendar.getTime(); 309 310 LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); 311 312 FileStatus[] stats = null; 313 Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName); 314 Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString()); 315 try { 316 stats = fs.listStatus(path); 317 } catch (FileNotFoundException e) { 318 LOG.warn("Failed to find the mob file " + path, e); 319 } 320 if (null == stats) { 321 // no file found 322 return; 323 } 324 List<HStoreFile> filesToClean = new ArrayList<>(); 325 int deletedFileCount = 0; 326 for (FileStatus file : stats) { 327 String fileName = file.getPath().getName(); 328 try { 329 if (HFileLink.isHFileLink(file.getPath())) { 330 HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 331 fileName = hfileLink.getOriginPath().getName(); 332 } 333 334 Date fileDate = parseDate(MobFileName.getDateFromName(fileName)); 335 336 if (LOG.isDebugEnabled()) { 337 LOG.debug("Checking file " + fileName); 338 } 339 if (fileDate.getTime() < expireDate.getTime()) { 340 if (LOG.isDebugEnabled()) { 341 LOG.debug(fileName + " is an expired file"); 342 } 343 filesToClean 344 .add(new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true)); 345 } 346 } catch (Exception e) { 347 LOG.error("Cannot parse the fileName " + fileName, e); 348 } 349 } 350 if (!filesToClean.isEmpty()) { 351 try { 352 removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(), 353 filesToClean); 354 deletedFileCount = filesToClean.size(); 355 } catch (IOException e) { 356 LOG.error("Failed to delete the mob files " + filesToClean, e); 357 } 358 } 359 LOG.info(deletedFileCount + " expired mob files are deleted"); 360 } 361 362 /** 363 * Gets the root dir of the mob files. 364 * It's {HBASE_DIR}/mobdir. 365 * @param conf The current configuration. 366 * @return the root dir of the mob file. 367 */ 368 public static Path getMobHome(Configuration conf) { 369 Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); 370 return getMobHome(hbaseDir); 371 } 372 373 /** 374 * Gets the root dir of the mob files under the qualified HBase root dir. 375 * It's {rootDir}/mobdir. 376 * @param rootDir The qualified path of HBase root directory. 377 * @return The root dir of the mob file. 378 */ 379 public static Path getMobHome(Path rootDir) { 380 return new Path(rootDir, MobConstants.MOB_DIR_NAME); 381 } 382 383 /** 384 * Gets the qualified root dir of the mob files. 385 * @param conf The current configuration. 386 * @return The qualified root dir. 387 * @throws IOException 388 */ 389 public static Path getQualifiedMobRootDir(Configuration conf) throws IOException { 390 Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); 391 Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME); 392 FileSystem fs = mobRootDir.getFileSystem(conf); 393 return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 394 } 395 396 /** 397 * Gets the table dir of the mob files under the qualified HBase root dir. 398 * It's {rootDir}/mobdir/data/${namespace}/${tableName} 399 * @param rootDir The qualified path of HBase root directory. 400 * @param tableName The name of table. 401 * @return The table dir of the mob file. 402 */ 403 public static Path getMobTableDir(Path rootDir, TableName tableName) { 404 return FSUtils.getTableDir(getMobHome(rootDir), tableName); 405 } 406 407 /** 408 * Gets the region dir of the mob files. 409 * It's {HBASE_DIR}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}. 410 * @param conf The current configuration. 411 * @param tableName The current table name. 412 * @return The region dir of the mob files. 413 */ 414 public static Path getMobRegionPath(Configuration conf, TableName tableName) { 415 return getMobRegionPath(new Path(conf.get(HConstants.HBASE_DIR)), tableName); 416 } 417 418 /** 419 * Gets the region dir of the mob files under the specified root dir. 420 * It's {rootDir}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}. 421 * @param rootDir The qualified path of HBase root directory. 422 * @param tableName The current table name. 423 * @return The region dir of the mob files. 424 */ 425 public static Path getMobRegionPath(Path rootDir, TableName tableName) { 426 Path tablePath = FSUtils.getTableDir(getMobHome(rootDir), tableName); 427 RegionInfo regionInfo = getMobRegionInfo(tableName); 428 return new Path(tablePath, regionInfo.getEncodedName()); 429 } 430 431 /** 432 * Gets the family dir of the mob files. 433 * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. 434 * @param conf The current configuration. 435 * @param tableName The current table name. 436 * @param familyName The current family name. 437 * @return The family dir of the mob files. 438 */ 439 public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) { 440 return new Path(getMobRegionPath(conf, tableName), familyName); 441 } 442 443 /** 444 * Gets the family dir of the mob files. 445 * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. 446 * @param regionPath The path of mob region which is a dummy one. 447 * @param familyName The current family name. 448 * @return The family dir of the mob files. 449 */ 450 public static Path getMobFamilyPath(Path regionPath, String familyName) { 451 return new Path(regionPath, familyName); 452 } 453 454 /** 455 * Gets the RegionInfo of the mob files. 456 * This is a dummy region. The mob files are not saved in a region in HBase. 457 * This is only used in mob snapshot. It's internally used only. 458 * @param tableName 459 * @return A dummy mob region info. 460 */ 461 public static RegionInfo getMobRegionInfo(TableName tableName) { 462 return RegionInfoBuilder.newBuilder(tableName) 463 .setStartKey(MobConstants.MOB_REGION_NAME_BYTES) 464 .setEndKey(HConstants.EMPTY_END_ROW) 465 .setSplit(false) 466 .setRegionId(0) 467 .build(); 468 } 469 470 /** 471 * Gets whether the current RegionInfo is a mob one. 472 * @param regionInfo The current RegionInfo. 473 * @return If true, the current RegionInfo is a mob one. 474 */ 475 public static boolean isMobRegionInfo(RegionInfo regionInfo) { 476 return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName() 477 .equals(regionInfo.getEncodedName()); 478 } 479 480 /** 481 * Gets whether the current region name follows the pattern of a mob region name. 482 * @param tableName The current table name. 483 * @param regionName The current region name. 484 * @return True if the current region name follows the pattern of a mob region name. 485 */ 486 public static boolean isMobRegionName(TableName tableName, byte[] regionName) { 487 return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName()); 488 } 489 490 /** 491 * Gets the working directory of the mob compaction. 492 * @param root The root directory of the mob compaction. 493 * @param jobName The current job name. 494 * @return The directory of the mob compaction for the current job. 495 */ 496 public static Path getCompactionWorkingPath(Path root, String jobName) { 497 return new Path(root, jobName); 498 } 499 500 /** 501 * Archives the mob files. 502 * @param conf The current configuration. 503 * @param fs The current file system. 504 * @param tableName The table name. 505 * @param tableDir The table directory. 506 * @param family The name of the column family. 507 * @param storeFiles The files to be deleted. 508 * @throws IOException 509 */ 510 public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName, 511 Path tableDir, byte[] family, Collection<HStoreFile> storeFiles) throws IOException { 512 HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family, 513 storeFiles); 514 } 515 516 /** 517 * Creates a mob reference KeyValue. 518 * The value of the mob reference KeyValue is mobCellValueSize + mobFileName. 519 * @param cell The original Cell. 520 * @param fileName The mob file name where the mob reference KeyValue is written. 521 * @param tableNameTag The tag of the current table name. It's very important in 522 * cloning the snapshot. 523 * @return The mob reference KeyValue. 524 */ 525 public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) { 526 // Append the tags to the KeyValue. 527 // The key is same, the value is the filename of the mob file 528 List<Tag> tags = new ArrayList<>(); 529 // Add the ref tag as the 1st one. 530 tags.add(MobConstants.MOB_REF_TAG); 531 // Add the tag of the source table name, this table is where this mob file is flushed 532 // from. 533 // It's very useful in cloning the snapshot. When reading from the cloning table, we need to 534 // find the original mob files by this table name. For details please see cloning 535 // snapshot for mob files. 536 tags.add(tableNameTag); 537 return createMobRefCell(cell, fileName, TagUtil.fromList(tags)); 538 } 539 540 public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTags) { 541 byte[] refValue = Bytes.add(Bytes.toBytes(cell.getValueLength()), fileName); 542 return PrivateCellUtil.createCell(cell, refValue, TagUtil.concatTags(refCellTags, cell)); 543 } 544 545 /** 546 * Creates a writer for the mob file in temp directory. 547 * @param conf The current configuration. 548 * @param fs The current file system. 549 * @param family The descriptor of the current column family. 550 * @param date The date string, its format is yyyymmmdd. 551 * @param basePath The basic path for a temp directory. 552 * @param maxKeyCount The key count. 553 * @param compression The compression algorithm. 554 * @param startKey The hex string of the start key. 555 * @param cacheConfig The current cache config. 556 * @param cryptoContext The encryption context. 557 * @param isCompaction If the writer is used in compaction. 558 * @return The writer for the mob file. 559 * @throws IOException 560 */ 561 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 562 ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, 563 Compression.Algorithm compression, String startKey, CacheConfig cacheConfig, 564 Encryption.Context cryptoContext, boolean isCompaction) 565 throws IOException { 566 MobFileName mobFileName = MobFileName.create(startKey, date, 567 UUID.randomUUID().toString().replaceAll("-", "")); 568 return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, 569 cacheConfig, cryptoContext, isCompaction); 570 } 571 572 /** 573 * Creates a writer for the ref file in temp directory. 574 * @param conf The current configuration. 575 * @param fs The current file system. 576 * @param family The descriptor of the current column family. 577 * @param basePath The basic path for a temp directory. 578 * @param maxKeyCount The key count. 579 * @param cacheConfig The current cache config. 580 * @param cryptoContext The encryption context. 581 * @param isCompaction If the writer is used in compaction. 582 * @return The writer for the mob file. 583 * @throws IOException 584 */ 585 public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs, 586 ColumnFamilyDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig, 587 Encryption.Context cryptoContext, boolean isCompaction) 588 throws IOException { 589 return createWriter(conf, fs, family, 590 new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount, 591 family.getCompactionCompressionType(), cacheConfig, cryptoContext, 592 HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(), 593 family.getBloomFilterType(), isCompaction); 594 } 595 596 /** 597 * Creates a writer for the mob file in temp directory. 598 * @param conf The current configuration. 599 * @param fs The current file system. 600 * @param family The descriptor of the current column family. 601 * @param date The date string, its format is yyyymmmdd. 602 * @param basePath The basic path for a temp directory. 603 * @param maxKeyCount The key count. 604 * @param compression The compression algorithm. 605 * @param startKey The start key. 606 * @param cacheConfig The current cache config. 607 * @param cryptoContext The encryption context. 608 * @param isCompaction If the writer is used in compaction. 609 * @return The writer for the mob file. 610 * @throws IOException 611 */ 612 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 613 ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, 614 Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, 615 Encryption.Context cryptoContext, boolean isCompaction) 616 throws IOException { 617 MobFileName mobFileName = MobFileName.create(startKey, date, 618 UUID.randomUUID().toString().replaceAll("-", "")); 619 return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, 620 cacheConfig, cryptoContext, isCompaction); 621 } 622 623 /** 624 * Creates a writer for the del file in temp directory. 625 * @param conf The current configuration. 626 * @param fs The current file system. 627 * @param family The descriptor of the current column family. 628 * @param date The date string, its format is yyyymmmdd. 629 * @param basePath The basic path for a temp directory. 630 * @param maxKeyCount The key count. 631 * @param compression The compression algorithm. 632 * @param startKey The start key. 633 * @param cacheConfig The current cache config. 634 * @param cryptoContext The encryption context. 635 * @return The writer for the del file. 636 * @throws IOException 637 */ 638 public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem fs, 639 ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, 640 Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, 641 Encryption.Context cryptoContext) 642 throws IOException { 643 String suffix = UUID 644 .randomUUID().toString().replaceAll("-", "") + "_del"; 645 MobFileName mobFileName = MobFileName.create(startKey, date, suffix); 646 return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, 647 cacheConfig, cryptoContext, true); 648 } 649 650 /** 651 * Creates a writer for the mob file in temp directory. 652 * @param conf The current configuration. 653 * @param fs The current file system. 654 * @param family The descriptor of the current column family. 655 * @param mobFileName The mob file name. 656 * @param basePath The basic path for a temp directory. 657 * @param maxKeyCount The key count. 658 * @param compression The compression algorithm. 659 * @param cacheConfig The current cache config. 660 * @param cryptoContext The encryption context. 661 * @param isCompaction If the writer is used in compaction. 662 * @return The writer for the mob file. 663 * @throws IOException 664 */ 665 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 666 ColumnFamilyDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, 667 Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, 668 boolean isCompaction) 669 throws IOException { 670 return createWriter(conf, fs, family, 671 new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig, 672 cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), 673 family.getBlocksize(), BloomType.NONE, isCompaction); 674 } 675 676 /** 677 * Creates a writer for the mob file in temp directory. 678 * @param conf The current configuration. 679 * @param fs The current file system. 680 * @param family The descriptor of the current column family. 681 * @param path The path for a temp directory. 682 * @param maxKeyCount The key count. 683 * @param compression The compression algorithm. 684 * @param cacheConfig The current cache config. 685 * @param cryptoContext The encryption context. 686 * @param checksumType The checksum type. 687 * @param bytesPerChecksum The bytes per checksum. 688 * @param blocksize The HFile block size. 689 * @param bloomType The bloom filter type. 690 * @param isCompaction If the writer is used in compaction. 691 * @return The writer for the mob file. 692 * @throws IOException 693 */ 694 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 695 ColumnFamilyDescriptor family, Path path, long maxKeyCount, 696 Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, 697 ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType, 698 boolean isCompaction) 699 throws IOException { 700 if (compression == null) { 701 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 702 } 703 final CacheConfig writerCacheConf; 704 if (isCompaction) { 705 writerCacheConf = new CacheConfig(cacheConfig); 706 writerCacheConf.setCacheDataOnWrite(false); 707 } else { 708 writerCacheConf = cacheConfig; 709 } 710 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 711 .withIncludesMvcc(true).withIncludesTags(true) 712 .withCompressTags(family.isCompressTags()) 713 .withChecksumType(checksumType) 714 .withBytesPerCheckSum(bytesPerChecksum) 715 .withBlockSize(blocksize) 716 .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) 717 .withEncryptionContext(cryptoContext) 718 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 719 720 StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs) 721 .withFilePath(path) 722 .withComparator(CellComparator.getInstance()).withBloomType(bloomType) 723 .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); 724 return w; 725 } 726 727 /** 728 * Commits the mob file. 729 * @param conf The current configuration. 730 * @param fs The current file system. 731 * @param sourceFile The path where the mob file is saved. 732 * @param targetPath The directory path where the source file is renamed to. 733 * @param cacheConfig The current cache config. 734 * @return The target file path the source file is renamed to. 735 * @throws IOException 736 */ 737 public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile, 738 Path targetPath, CacheConfig cacheConfig) throws IOException { 739 if (sourceFile == null) { 740 return null; 741 } 742 Path dstPath = new Path(targetPath, sourceFile.getName()); 743 validateMobFile(conf, fs, sourceFile, cacheConfig, true); 744 String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; 745 LOG.info(msg); 746 Path parent = dstPath.getParent(); 747 if (!fs.exists(parent)) { 748 fs.mkdirs(parent); 749 } 750 if (!fs.rename(sourceFile, dstPath)) { 751 throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); 752 } 753 return dstPath; 754 } 755 756 /** 757 * Validates a mob file by opening and closing it. 758 * @param conf The current configuration. 759 * @param fs The current file system. 760 * @param path The path where the mob file is saved. 761 * @param cacheConfig The current cache config. 762 */ 763 private static void validateMobFile(Configuration conf, FileSystem fs, Path path, 764 CacheConfig cacheConfig, boolean primaryReplica) throws IOException { 765 HStoreFile storeFile = null; 766 try { 767 storeFile = new HStoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica); 768 storeFile.initReader(); 769 } catch (IOException e) { 770 LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e); 771 throw e; 772 } finally { 773 if (storeFile != null) { 774 storeFile.closeStoreFile(false); 775 } 776 } 777 } 778 779 /** 780 * Indicates whether the current mob ref cell has a valid value. 781 * A mob ref cell has a mob reference tag. 782 * The value of a mob ref cell consists of two parts, real mob value length and mob file name. 783 * The real mob value length takes 4 bytes. 784 * The remaining part is the mob file name. 785 * @param cell The mob ref cell. 786 * @return True if the cell has a valid value. 787 */ 788 public static boolean hasValidMobRefCellValue(Cell cell) { 789 return cell.getValueLength() > Bytes.SIZEOF_INT; 790 } 791 792 /** 793 * Gets the mob value length from the mob ref cell. 794 * A mob ref cell has a mob reference tag. 795 * The value of a mob ref cell consists of two parts, real mob value length and mob file name. 796 * The real mob value length takes 4 bytes. 797 * The remaining part is the mob file name. 798 * @param cell The mob ref cell. 799 * @return The real mob value length. 800 */ 801 public static int getMobValueLength(Cell cell) { 802 return PrivateCellUtil.getValueAsInt(cell); 803 } 804 805 /** 806 * Gets the mob file name from the mob ref cell. 807 * A mob ref cell has a mob reference tag. 808 * The value of a mob ref cell consists of two parts, real mob value length and mob file name. 809 * The real mob value length takes 4 bytes. 810 * The remaining part is the mob file name. 811 * @param cell The mob ref cell. 812 * @return The mob file name. 813 */ 814 public static String getMobFileName(Cell cell) { 815 return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT, 816 cell.getValueLength() - Bytes.SIZEOF_INT); 817 } 818 819 /** 820 * Gets the table name used in the table lock. 821 * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock". 822 * @param tn The table name. 823 * @return The table name used in table lock. 824 */ 825 public static TableName getTableLockName(TableName tn) { 826 byte[] tableName = tn.getName(); 827 return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX)); 828 } 829 830 /** 831 * Performs the mob compaction. 832 * @param conf the Configuration 833 * @param fs the file system 834 * @param tableName the table the compact 835 * @param hcd the column descriptor 836 * @param pool the thread pool 837 * @param allFiles Whether add all mob files into the compaction. 838 */ 839 public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName, 840 ColumnFamilyDescriptor hcd, ExecutorService pool, boolean allFiles, LockManager.MasterLock lock) 841 throws IOException { 842 String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, 843 PartitionedMobCompactor.class.getName()); 844 // instantiate the mob compactor. 845 MobCompactor compactor = null; 846 try { 847 compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { 848 Configuration.class, FileSystem.class, TableName.class, ColumnFamilyDescriptor.class, 849 ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool }); 850 } catch (Exception e) { 851 throw new IOException("Unable to load configured mob file compactor '" + className + "'", e); 852 } 853 // compact only for mob-enabled column. 854 // obtain a write table lock before performing compaction to avoid race condition 855 // with major compaction in mob-enabled column. 856 try { 857 lock.acquire(); 858 LOG.info("start MOB compaction of files for table='{}', column='{}', allFiles={}, " + 859 "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass()); 860 compactor.compact(allFiles); 861 } catch (Exception e) { 862 LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString() 863 + " in the table " + tableName.getNameAsString(), e); 864 } finally { 865 LOG.info("end MOB compaction of files for table='{}', column='{}', allFiles={}, " + 866 "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass()); 867 lock.release(); 868 } 869 } 870 871 /** 872 * Creates a thread pool. 873 * @param conf the Configuration 874 * @return A thread pool. 875 */ 876 public static ExecutorService createMobCompactorThreadPool(Configuration conf) { 877 int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX, 878 MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX); 879 if (maxThreads == 0) { 880 maxThreads = 1; 881 } 882 final SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); 883 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue, 884 Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() { 885 @Override 886 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 887 try { 888 // waiting for a thread to pick up instead of throwing exceptions. 889 queue.put(r); 890 } catch (InterruptedException e) { 891 throw new RejectedExecutionException(e); 892 } 893 } 894 }); 895 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); 896 return pool; 897 } 898 899 /** 900 * Checks whether this table has mob-enabled columns. 901 * @param htd The current table descriptor. 902 * @return Whether this table has mob-enabled columns. 903 */ 904 public static boolean hasMobColumns(TableDescriptor htd) { 905 ColumnFamilyDescriptor[] hcds = htd.getColumnFamilies(); 906 for (ColumnFamilyDescriptor hcd : hcds) { 907 if (hcd.isMobEnabled()) { 908 return true; 909 } 910 } 911 return false; 912 } 913 914 /** 915 * Indicates whether return null value when the mob file is missing or corrupt. 916 * The information is set in the attribute "empty.value.on.mobcell.miss" of scan. 917 * @param scan The current scan. 918 * @return True if the readEmptyValueOnMobCellMiss is enabled. 919 */ 920 public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) { 921 byte[] readEmptyValueOnMobCellMiss = 922 scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS); 923 try { 924 return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss); 925 } catch (IllegalArgumentException e) { 926 return false; 927 } 928 } 929 930 /** 931 * Creates a mob ref delete marker. 932 * @param cell The current delete marker. 933 * @return A delete marker with the ref tag. 934 */ 935 public static Cell createMobRefDeleteMarker(Cell cell) { 936 return PrivateCellUtil.createCell(cell, TagUtil.concatTags(REF_DELETE_MARKER_TAG_BYTES, cell)); 937 } 938 939 /** 940 * Checks if the mob file is expired. 941 * @param column The descriptor of the current column family. 942 * @param current The current time. 943 * @param fileDate The date string parsed from the mob file name. 944 * @return True if the mob file is expired. 945 */ 946 public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long current, String fileDate) { 947 if (column.getMinVersions() > 0) { 948 return false; 949 } 950 long timeToLive = column.getTimeToLive(); 951 if (Integer.MAX_VALUE == timeToLive) { 952 return false; 953 } 954 955 Date expireDate = new Date(current - timeToLive * 1000); 956 expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); 957 try { 958 Date date = parseDate(fileDate); 959 if (date.getTime() < expireDate.getTime()) { 960 return true; 961 } 962 } catch (ParseException e) { 963 LOG.warn("Failed to parse the date " + fileDate, e); 964 return false; 965 } 966 return false; 967 } 968 969 /** 970 * fill out partition id based on compaction policy and date, threshold... 971 * @param id Partition id to be filled out 972 * @param firstDayOfCurrentMonth The first day in the current month 973 * @param firstDayOfCurrentWeek The first day in the current week 974 * @param dateStr Date string from the mob file 975 * @param policy Mob compaction policy 976 * @param calendar Calendar object 977 * @param threshold Mob compaciton threshold configured 978 * @return true if the file needs to be excluded from compaction 979 */ 980 public static boolean fillPartitionId(final CompactionPartitionId id, 981 final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr, 982 final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) { 983 984 boolean skipCompcation = false; 985 id.setThreshold(threshold); 986 if (threshold <= 0) { 987 id.setDate(dateStr); 988 return skipCompcation; 989 } 990 991 long finalThreshold; 992 Date date; 993 try { 994 date = MobUtils.parseDate(dateStr); 995 } catch (ParseException e) { 996 LOG.warn("Failed to parse date " + dateStr, e); 997 id.setDate(dateStr); 998 return true; 999 } 1000 1001 /* The algorithm works as follows: 1002 * For monthly policy: 1003 * 1). If the file's date is in past months, apply 4 * 7 * threshold 1004 * 2). If the file's date is in past weeks, apply 7 * threshold 1005 * 3). If the file's date is in current week, exclude it from the compaction 1006 * For weekly policy: 1007 * 1). If the file's date is in past weeks, apply 7 * threshold 1008 * 2). If the file's date in currently, apply threshold 1009 * For daily policy: 1010 * 1). apply threshold 1011 */ 1012 if (policy == MobCompactPartitionPolicy.MONTHLY) { 1013 if (date.before(firstDayOfCurrentMonth)) { 1014 // Check overflow 1015 if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) { 1016 finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold; 1017 } else { 1018 finalThreshold = Long.MAX_VALUE; 1019 } 1020 id.setThreshold(finalThreshold); 1021 1022 // set to the date for the first day of that month 1023 id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date))); 1024 return skipCompcation; 1025 } 1026 } 1027 1028 if ((policy == MobCompactPartitionPolicy.MONTHLY) || 1029 (policy == MobCompactPartitionPolicy.WEEKLY)) { 1030 // Check if it needs to apply weekly multiplier 1031 if (date.before(firstDayOfCurrentWeek)) { 1032 // Check overflow 1033 if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) { 1034 finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold; 1035 } else { 1036 finalThreshold = Long.MAX_VALUE; 1037 } 1038 id.setThreshold(finalThreshold); 1039 1040 id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date))); 1041 return skipCompcation; 1042 } else if (policy == MobCompactPartitionPolicy.MONTHLY) { 1043 skipCompcation = true; 1044 } 1045 } 1046 1047 // Rest is daily 1048 id.setDate(dateStr); 1049 return skipCompcation; 1050 } 1051}