001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mob; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.text.ParseException; 024import java.text.SimpleDateFormat; 025import java.util.ArrayList; 026import java.util.Calendar; 027import java.util.Collection; 028import java.util.Date; 029import java.util.List; 030import java.util.Optional; 031import java.util.UUID; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.RejectedExecutionException; 034import java.util.concurrent.RejectedExecutionHandler; 035import java.util.concurrent.SynchronousQueue; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.PrivateCellUtil; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.Tag; 047import org.apache.hadoop.hbase.TagType; 048import org.apache.hadoop.hbase.TagUtil; 049import org.apache.hadoop.hbase.backup.HFileArchiver; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 051import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionInfoBuilder; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.io.HFileLink; 057import org.apache.hadoop.hbase.io.compress.Compression; 058import org.apache.hadoop.hbase.io.crypto.Encryption; 059import org.apache.hadoop.hbase.io.hfile.CacheConfig; 060import org.apache.hadoop.hbase.io.hfile.HFile; 061import org.apache.hadoop.hbase.io.hfile.HFileContext; 062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 063import org.apache.hadoop.hbase.master.locking.LockManager; 064import org.apache.hadoop.hbase.mob.compactions.MobCompactor; 065import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; 066import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; 067import org.apache.hadoop.hbase.regionserver.BloomType; 068import org.apache.hadoop.hbase.regionserver.HStore; 069import org.apache.hadoop.hbase.regionserver.HStoreFile; 070import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.ChecksumType; 073import org.apache.hadoop.hbase.util.CommonFSUtils; 074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 075import org.apache.hadoop.hbase.util.ReflectionUtils; 076import org.apache.hadoop.hbase.util.Threads; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081/** 082 * The mob utilities 083 */ 084@InterfaceAudience.Private 085public final class MobUtils { 086 087 private static final Logger LOG = LoggerFactory.getLogger(MobUtils.class); 088 private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7; 089 private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER; 090 091 private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT = 092 new ThreadLocal<SimpleDateFormat>() { 093 @Override 094 protected SimpleDateFormat initialValue() { 095 return new SimpleDateFormat("yyyyMMdd"); 096 } 097 }; 098 099 private static final byte[] REF_DELETE_MARKER_TAG_BYTES; 100 static { 101 List<Tag> tags = new ArrayList<>(); 102 tags.add(MobConstants.MOB_REF_TAG); 103 REF_DELETE_MARKER_TAG_BYTES = TagUtil.fromList(tags); 104 } 105 106 /** 107 * Private constructor to keep this class from being instantiated. 108 */ 109 private MobUtils() { 110 } 111 112 /** 113 * Formats a date to a string. 114 * @param date The date. 115 * @return The string format of the date, it's yyyymmdd. 116 */ 117 public static String formatDate(Date date) { 118 return LOCAL_FORMAT.get().format(date); 119 } 120 121 /** 122 * Parses the string to a date. 123 * @param dateString The string format of a date, it's yyyymmdd. 124 * @return A date. 125 */ 126 public static Date parseDate(String dateString) throws ParseException { 127 return LOCAL_FORMAT.get().parse(dateString); 128 } 129 130 /** 131 * Get the first day of the input date's month 132 * @param calendar Calendar object 133 * @param date The date to find out its first day of that month 134 * @return The first day in the month 135 */ 136 public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) { 137 138 calendar.setTime(date); 139 calendar.set(Calendar.HOUR_OF_DAY, 0); 140 calendar.set(Calendar.MINUTE, 0); 141 calendar.set(Calendar.SECOND, 0); 142 calendar.set(Calendar.MILLISECOND, 0); 143 calendar.set(Calendar.DAY_OF_MONTH, 1); 144 145 Date firstDayInMonth = calendar.getTime(); 146 return firstDayInMonth; 147 } 148 149 /** 150 * Get the first day of the input date's week 151 * @param calendar Calendar object 152 * @param date The date to find out its first day of that week 153 * @return The first day in the week 154 */ 155 public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) { 156 157 calendar.setTime(date); 158 calendar.set(Calendar.HOUR_OF_DAY, 0); 159 calendar.set(Calendar.MINUTE, 0); 160 calendar.set(Calendar.SECOND, 0); 161 calendar.set(Calendar.MILLISECOND, 0); 162 calendar.setFirstDayOfWeek(Calendar.MONDAY); 163 calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY); 164 165 Date firstDayInWeek = calendar.getTime(); 166 return firstDayInWeek; 167 } 168 169 /** 170 * Whether the current cell is a mob reference cell. 171 * @param cell The current cell. 172 * @return True if the cell has a mob reference tag, false if it doesn't. 173 */ 174 public static boolean isMobReferenceCell(Cell cell) { 175 if (cell.getTagsLength() > 0) { 176 Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_REFERENCE_TAG_TYPE); 177 if (tag.isPresent()) { 178 return true; 179 } 180 } 181 return false; 182 } 183 184 /** 185 * Gets the table name tag. 186 * @param cell The current cell. 187 * @return The table name tag. 188 */ 189 public static Tag getTableNameTag(Cell cell) { 190 if (cell.getTagsLength() > 0) { 191 Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE); 192 if (tag.isPresent()) { 193 return tag.get(); 194 } 195 } 196 return null; 197 } 198 199 /** 200 * Whether the tag list has a mob reference tag. 201 * @param tags The tag list. 202 * @return True if the list has a mob reference tag, false if it doesn't. 203 */ 204 public static boolean hasMobReferenceTag(List<Tag> tags) { 205 if (!tags.isEmpty()) { 206 for (Tag tag : tags) { 207 if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) { 208 return true; 209 } 210 } 211 } 212 return false; 213 } 214 215 /** 216 * Indicates whether it's a raw scan. 217 * The information is set in the attribute "hbase.mob.scan.raw" of scan. 218 * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file. 219 * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in 220 * the mob file. 221 * @param scan The current scan. 222 * @return True if it's a raw scan. 223 */ 224 public static boolean isRawMobScan(Scan scan) { 225 byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW); 226 try { 227 return raw != null && Bytes.toBoolean(raw); 228 } catch (IllegalArgumentException e) { 229 return false; 230 } 231 } 232 233 /** 234 * Indicates whether it's a reference only scan. 235 * The information is set in the attribute "hbase.mob.scan.ref.only" of scan. 236 * If it's a ref only scan, only the cells with ref tag are returned. 237 * @param scan The current scan. 238 * @return True if it's a ref only scan. 239 */ 240 public static boolean isRefOnlyScan(Scan scan) { 241 byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY); 242 try { 243 return refOnly != null && Bytes.toBoolean(refOnly); 244 } catch (IllegalArgumentException e) { 245 return false; 246 } 247 } 248 249 /** 250 * Indicates whether the scan contains the information of caching blocks. 251 * The information is set in the attribute "hbase.mob.cache.blocks" of scan. 252 * @param scan The current scan. 253 * @return True when the Scan attribute specifies to cache the MOB blocks. 254 */ 255 public static boolean isCacheMobBlocks(Scan scan) { 256 byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS); 257 try { 258 return cache != null && Bytes.toBoolean(cache); 259 } catch (IllegalArgumentException e) { 260 return false; 261 } 262 } 263 264 /** 265 * Sets the attribute of caching blocks in the scan. 266 * 267 * @param scan 268 * The current scan. 269 * @param cacheBlocks 270 * True, set the attribute of caching blocks into the scan, the scanner with this scan 271 * caches blocks. 272 * False, the scanner doesn't cache blocks for this scan. 273 */ 274 public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { 275 scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks)); 276 } 277 278 /** 279 * Cleans the expired mob files. 280 * Cleans the files whose creation date is older than (current - columnFamily.ttl), and 281 * the minVersions of that column family is 0. 282 * @param fs The current file system. 283 * @param conf The current configuration. 284 * @param tableName The current table name. 285 * @param columnDescriptor The descriptor of the current column family. 286 * @param cacheConfig The cacheConfig that disables the block cache. 287 * @param current The current time. 288 */ 289 public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName, 290 ColumnFamilyDescriptor columnDescriptor, CacheConfig cacheConfig, long current) 291 throws IOException { 292 long timeToLive = columnDescriptor.getTimeToLive(); 293 if (Integer.MAX_VALUE == timeToLive) { 294 // no need to clean, because the TTL is not set. 295 return; 296 } 297 298 Calendar calendar = Calendar.getInstance(); 299 calendar.setTimeInMillis(current - timeToLive * 1000); 300 calendar.set(Calendar.HOUR_OF_DAY, 0); 301 calendar.set(Calendar.MINUTE, 0); 302 calendar.set(Calendar.SECOND, 0); 303 304 Date expireDate = calendar.getTime(); 305 306 LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); 307 308 FileStatus[] stats = null; 309 Path mobTableDir = CommonFSUtils.getTableDir(getMobHome(conf), tableName); 310 Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString()); 311 try { 312 stats = fs.listStatus(path); 313 } catch (FileNotFoundException e) { 314 LOG.warn("Failed to find the mob file " + path, e); 315 } 316 if (null == stats) { 317 // no file found 318 return; 319 } 320 List<HStoreFile> filesToClean = new ArrayList<>(); 321 int deletedFileCount = 0; 322 for (FileStatus file : stats) { 323 String fileName = file.getPath().getName(); 324 try { 325 if (HFileLink.isHFileLink(file.getPath())) { 326 HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); 327 fileName = hfileLink.getOriginPath().getName(); 328 } 329 330 Date fileDate = parseDate(MobFileName.getDateFromName(fileName)); 331 332 if (LOG.isDebugEnabled()) { 333 LOG.debug("Checking file " + fileName); 334 } 335 if (fileDate.getTime() < expireDate.getTime()) { 336 if (LOG.isDebugEnabled()) { 337 LOG.debug(fileName + " is an expired file"); 338 } 339 filesToClean 340 .add(new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true)); 341 } 342 } catch (Exception e) { 343 LOG.error("Cannot parse the fileName " + fileName, e); 344 } 345 } 346 if (!filesToClean.isEmpty()) { 347 try { 348 removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(), 349 filesToClean); 350 deletedFileCount = filesToClean.size(); 351 } catch (IOException e) { 352 LOG.error("Failed to delete the mob files " + filesToClean, e); 353 } 354 } 355 LOG.info(deletedFileCount + " expired mob files are deleted"); 356 } 357 358 /** 359 * Gets the root dir of the mob files. 360 * It's {HBASE_DIR}/mobdir. 361 * @param conf The current configuration. 362 * @return the root dir of the mob file. 363 */ 364 public static Path getMobHome(Configuration conf) { 365 Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); 366 return getMobHome(hbaseDir); 367 } 368 369 /** 370 * Gets the root dir of the mob files under the qualified HBase root dir. 371 * It's {rootDir}/mobdir. 372 * @param rootDir The qualified path of HBase root directory. 373 * @return The root dir of the mob file. 374 */ 375 public static Path getMobHome(Path rootDir) { 376 return new Path(rootDir, MobConstants.MOB_DIR_NAME); 377 } 378 379 /** 380 * Gets the qualified root dir of the mob files. 381 * @param conf The current configuration. 382 * @return The qualified root dir. 383 */ 384 public static Path getQualifiedMobRootDir(Configuration conf) throws IOException { 385 Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); 386 Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME); 387 FileSystem fs = mobRootDir.getFileSystem(conf); 388 return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 389 } 390 391 /** 392 * Gets the table dir of the mob files under the qualified HBase root dir. 393 * It's {rootDir}/mobdir/data/${namespace}/${tableName} 394 * @param rootDir The qualified path of HBase root directory. 395 * @param tableName The name of table. 396 * @return The table dir of the mob file. 397 */ 398 public static Path getMobTableDir(Path rootDir, TableName tableName) { 399 return CommonFSUtils.getTableDir(getMobHome(rootDir), tableName); 400 } 401 402 /** 403 * Gets the region dir of the mob files. 404 * It's {HBASE_DIR}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}. 405 * @param conf The current configuration. 406 * @param tableName The current table name. 407 * @return The region dir of the mob files. 408 */ 409 public static Path getMobRegionPath(Configuration conf, TableName tableName) { 410 return getMobRegionPath(new Path(conf.get(HConstants.HBASE_DIR)), tableName); 411 } 412 413 /** 414 * Gets the region dir of the mob files under the specified root dir. 415 * It's {rootDir}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}. 416 * @param rootDir The qualified path of HBase root directory. 417 * @param tableName The current table name. 418 * @return The region dir of the mob files. 419 */ 420 public static Path getMobRegionPath(Path rootDir, TableName tableName) { 421 Path tablePath = CommonFSUtils.getTableDir(getMobHome(rootDir), tableName); 422 RegionInfo regionInfo = getMobRegionInfo(tableName); 423 return new Path(tablePath, regionInfo.getEncodedName()); 424 } 425 426 /** 427 * Gets the family dir of the mob files. 428 * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. 429 * @param conf The current configuration. 430 * @param tableName The current table name. 431 * @param familyName The current family name. 432 * @return The family dir of the mob files. 433 */ 434 public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) { 435 return new Path(getMobRegionPath(conf, tableName), familyName); 436 } 437 438 /** 439 * Gets the family dir of the mob files. 440 * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. 441 * @param regionPath The path of mob region which is a dummy one. 442 * @param familyName The current family name. 443 * @return The family dir of the mob files. 444 */ 445 public static Path getMobFamilyPath(Path regionPath, String familyName) { 446 return new Path(regionPath, familyName); 447 } 448 449 /** 450 * Gets the RegionInfo of the mob files. 451 * This is a dummy region. The mob files are not saved in a region in HBase. 452 * This is only used in mob snapshot. It's internally used only. 453 * @param tableName 454 * @return A dummy mob region info. 455 */ 456 public static RegionInfo getMobRegionInfo(TableName tableName) { 457 return RegionInfoBuilder.newBuilder(tableName) 458 .setStartKey(MobConstants.MOB_REGION_NAME_BYTES) 459 .setEndKey(HConstants.EMPTY_END_ROW) 460 .setSplit(false) 461 .setRegionId(0) 462 .build(); 463 } 464 465 /** 466 * Gets whether the current RegionInfo is a mob one. 467 * @param regionInfo The current RegionInfo. 468 * @return If true, the current RegionInfo is a mob one. 469 */ 470 public static boolean isMobRegionInfo(RegionInfo regionInfo) { 471 return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName() 472 .equals(regionInfo.getEncodedName()); 473 } 474 475 /** 476 * Gets whether the current region name follows the pattern of a mob region name. 477 * @param tableName The current table name. 478 * @param regionName The current region name. 479 * @return True if the current region name follows the pattern of a mob region name. 480 */ 481 public static boolean isMobRegionName(TableName tableName, byte[] regionName) { 482 return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName()); 483 } 484 485 /** 486 * Gets the working directory of the mob compaction. 487 * @param root The root directory of the mob compaction. 488 * @param jobName The current job name. 489 * @return The directory of the mob compaction for the current job. 490 */ 491 public static Path getCompactionWorkingPath(Path root, String jobName) { 492 return new Path(root, jobName); 493 } 494 495 /** 496 * Archives the mob files. 497 * @param conf The current configuration. 498 * @param fs The current file system. 499 * @param tableName The table name. 500 * @param tableDir The table directory. 501 * @param family The name of the column family. 502 * @param storeFiles The files to be deleted. 503 */ 504 public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName, 505 Path tableDir, byte[] family, Collection<HStoreFile> storeFiles) throws IOException { 506 HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family, 507 storeFiles); 508 } 509 510 /** 511 * Creates a mob reference KeyValue. 512 * The value of the mob reference KeyValue is mobCellValueSize + mobFileName. 513 * @param cell The original Cell. 514 * @param fileName The mob file name where the mob reference KeyValue is written. 515 * @param tableNameTag The tag of the current table name. It's very important in 516 * cloning the snapshot. 517 * @return The mob reference KeyValue. 518 */ 519 public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) { 520 // Append the tags to the KeyValue. 521 // The key is same, the value is the filename of the mob file 522 List<Tag> tags = new ArrayList<>(); 523 // Add the ref tag as the 1st one. 524 tags.add(MobConstants.MOB_REF_TAG); 525 // Add the tag of the source table name, this table is where this mob file is flushed 526 // from. 527 // It's very useful in cloning the snapshot. When reading from the cloning table, we need to 528 // find the original mob files by this table name. For details please see cloning 529 // snapshot for mob files. 530 tags.add(tableNameTag); 531 return createMobRefCell(cell, fileName, TagUtil.fromList(tags)); 532 } 533 534 public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTags) { 535 byte[] refValue = Bytes.add(Bytes.toBytes(cell.getValueLength()), fileName); 536 return PrivateCellUtil.createCell(cell, refValue, TagUtil.concatTags(refCellTags, cell)); 537 } 538 539 /** 540 * Creates a writer for the mob file in temp directory. 541 * @param conf The current configuration. 542 * @param fs The current file system. 543 * @param family The descriptor of the current column family. 544 * @param date The date string, its format is yyyymmmdd. 545 * @param basePath The basic path for a temp directory. 546 * @param maxKeyCount The key count. 547 * @param compression The compression algorithm. 548 * @param startKey The hex string of the start key. 549 * @param cacheConfig The current cache config. 550 * @param cryptoContext The encryption context. 551 * @param isCompaction If the writer is used in compaction. 552 * @return The writer for the mob file. 553 */ 554 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 555 ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, 556 Compression.Algorithm compression, String startKey, CacheConfig cacheConfig, 557 Encryption.Context cryptoContext, boolean isCompaction) 558 throws IOException { 559 MobFileName mobFileName = MobFileName.create(startKey, date, 560 UUID.randomUUID().toString().replaceAll("-", "")); 561 return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, 562 cacheConfig, cryptoContext, isCompaction); 563 } 564 565 /** 566 * Creates a writer for the ref file in temp directory. 567 * @param conf The current configuration. 568 * @param fs The current file system. 569 * @param family The descriptor of the current column family. 570 * @param basePath The basic path for a temp directory. 571 * @param maxKeyCount The key count. 572 * @param cacheConfig The current cache config. 573 * @param cryptoContext The encryption context. 574 * @param isCompaction If the writer is used in compaction. 575 * @return The writer for the mob file. 576 */ 577 public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs, 578 ColumnFamilyDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig, 579 Encryption.Context cryptoContext, boolean isCompaction) 580 throws IOException { 581 return createWriter(conf, fs, family, 582 new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount, 583 family.getCompactionCompressionType(), cacheConfig, cryptoContext, 584 HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(), 585 family.getBloomFilterType(), isCompaction); 586 } 587 588 /** 589 * Creates a writer for the mob file in temp directory. 590 * @param conf The current configuration. 591 * @param fs The current file system. 592 * @param family The descriptor of the current column family. 593 * @param date The date string, its format is yyyymmmdd. 594 * @param basePath The basic path for a temp directory. 595 * @param maxKeyCount The key count. 596 * @param compression The compression algorithm. 597 * @param startKey The start key. 598 * @param cacheConfig The current cache config. 599 * @param cryptoContext The encryption context. 600 * @param isCompaction If the writer is used in compaction. 601 * @return The writer for the mob file. 602 */ 603 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 604 ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, 605 Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, 606 Encryption.Context cryptoContext, boolean isCompaction) 607 throws IOException { 608 MobFileName mobFileName = MobFileName.create(startKey, date, 609 UUID.randomUUID().toString().replaceAll("-", "")); 610 return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, 611 cacheConfig, cryptoContext, isCompaction); 612 } 613 614 /** 615 * Creates a writer for the del file in temp directory. 616 * @param conf The current configuration. 617 * @param fs The current file system. 618 * @param family The descriptor of the current column family. 619 * @param date The date string, its format is yyyymmmdd. 620 * @param basePath The basic path for a temp directory. 621 * @param maxKeyCount The key count. 622 * @param compression The compression algorithm. 623 * @param startKey The start key. 624 * @param cacheConfig The current cache config. 625 * @param cryptoContext The encryption context. 626 * @return The writer for the del file. 627 */ 628 public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem fs, 629 ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, 630 Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, 631 Encryption.Context cryptoContext) 632 throws IOException { 633 String suffix = UUID 634 .randomUUID().toString().replaceAll("-", "") + "_del"; 635 MobFileName mobFileName = MobFileName.create(startKey, date, suffix); 636 return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, 637 cacheConfig, cryptoContext, true); 638 } 639 640 /** 641 * Creates a writer for the mob file in temp directory. 642 * @param conf The current configuration. 643 * @param fs The current file system. 644 * @param family The descriptor of the current column family. 645 * @param mobFileName The mob file name. 646 * @param basePath The basic path for a temp directory. 647 * @param maxKeyCount The key count. 648 * @param compression The compression algorithm. 649 * @param cacheConfig The current cache config. 650 * @param cryptoContext The encryption context. 651 * @param isCompaction If the writer is used in compaction. 652 * @return The writer for the mob file. 653 */ 654 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 655 ColumnFamilyDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, 656 Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, 657 boolean isCompaction) 658 throws IOException { 659 return createWriter(conf, fs, family, 660 new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig, 661 cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), 662 family.getBlocksize(), BloomType.NONE, isCompaction); 663 } 664 665 /** 666 * Creates a writer for the mob file in temp directory. 667 * @param conf The current configuration. 668 * @param fs The current file system. 669 * @param family The descriptor of the current column family. 670 * @param path The path for a temp directory. 671 * @param maxKeyCount The key count. 672 * @param compression The compression algorithm. 673 * @param cacheConfig The current cache config. 674 * @param cryptoContext The encryption context. 675 * @param checksumType The checksum type. 676 * @param bytesPerChecksum The bytes per checksum. 677 * @param blocksize The HFile block size. 678 * @param bloomType The bloom filter type. 679 * @param isCompaction If the writer is used in compaction. 680 * @return The writer for the mob file. 681 */ 682 public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, 683 ColumnFamilyDescriptor family, Path path, long maxKeyCount, 684 Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, 685 ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType, 686 boolean isCompaction) 687 throws IOException { 688 if (compression == null) { 689 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; 690 } 691 final CacheConfig writerCacheConf; 692 if (isCompaction) { 693 writerCacheConf = new CacheConfig(cacheConfig); 694 writerCacheConf.setCacheDataOnWrite(false); 695 } else { 696 writerCacheConf = cacheConfig; 697 } 698 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 699 .withIncludesMvcc(true).withIncludesTags(true) 700 .withCompressTags(family.isCompressTags()) 701 .withChecksumType(checksumType) 702 .withBytesPerCheckSum(bytesPerChecksum) 703 .withBlockSize(blocksize) 704 .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) 705 .withEncryptionContext(cryptoContext) 706 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 707 708 StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs) 709 .withFilePath(path).withBloomType(bloomType) 710 .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); 711 return w; 712 } 713 714 /** 715 * Commits the mob file. 716 * @param conf The current configuration. 717 * @param fs The current file system. 718 * @param sourceFile The path where the mob file is saved. 719 * @param targetPath The directory path where the source file is renamed to. 720 * @param cacheConfig The current cache config. 721 * @return The target file path the source file is renamed to. 722 */ 723 public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile, 724 Path targetPath, CacheConfig cacheConfig) throws IOException { 725 if (sourceFile == null) { 726 return null; 727 } 728 Path dstPath = new Path(targetPath, sourceFile.getName()); 729 validateMobFile(conf, fs, sourceFile, cacheConfig, true); 730 String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; 731 LOG.info(msg); 732 Path parent = dstPath.getParent(); 733 if (!fs.exists(parent)) { 734 fs.mkdirs(parent); 735 } 736 if (!fs.rename(sourceFile, dstPath)) { 737 throw new IOException("Failed rename of " + sourceFile + " to " + dstPath); 738 } 739 return dstPath; 740 } 741 742 /** 743 * Validates a mob file by opening and closing it. 744 * @param conf The current configuration. 745 * @param fs The current file system. 746 * @param path The path where the mob file is saved. 747 * @param cacheConfig The current cache config. 748 */ 749 private static void validateMobFile(Configuration conf, FileSystem fs, Path path, 750 CacheConfig cacheConfig, boolean primaryReplica) throws IOException { 751 HStoreFile storeFile = null; 752 try { 753 storeFile = new HStoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica); 754 storeFile.initReader(); 755 } catch (IOException e) { 756 LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e); 757 throw e; 758 } finally { 759 if (storeFile != null) { 760 storeFile.closeStoreFile(false); 761 } 762 } 763 } 764 765 /** 766 * Indicates whether the current mob ref cell has a valid value. 767 * A mob ref cell has a mob reference tag. 768 * The value of a mob ref cell consists of two parts, real mob value length and mob file name. 769 * The real mob value length takes 4 bytes. 770 * The remaining part is the mob file name. 771 * @param cell The mob ref cell. 772 * @return True if the cell has a valid value. 773 */ 774 public static boolean hasValidMobRefCellValue(Cell cell) { 775 return cell.getValueLength() > Bytes.SIZEOF_INT; 776 } 777 778 /** 779 * Gets the mob value length from the mob ref cell. 780 * A mob ref cell has a mob reference tag. 781 * The value of a mob ref cell consists of two parts, real mob value length and mob file name. 782 * The real mob value length takes 4 bytes. 783 * The remaining part is the mob file name. 784 * @param cell The mob ref cell. 785 * @return The real mob value length. 786 */ 787 public static int getMobValueLength(Cell cell) { 788 return PrivateCellUtil.getValueAsInt(cell); 789 } 790 791 /** 792 * Gets the mob file name from the mob ref cell. 793 * A mob ref cell has a mob reference tag. 794 * The value of a mob ref cell consists of two parts, real mob value length and mob file name. 795 * The real mob value length takes 4 bytes. 796 * The remaining part is the mob file name. 797 * @param cell The mob ref cell. 798 * @return The mob file name. 799 */ 800 public static String getMobFileName(Cell cell) { 801 return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT, 802 cell.getValueLength() - Bytes.SIZEOF_INT); 803 } 804 805 /** 806 * Gets the table name used in the table lock. 807 * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock". 808 * @param tn The table name. 809 * @return The table name used in table lock. 810 */ 811 public static TableName getTableLockName(TableName tn) { 812 byte[] tableName = tn.getName(); 813 return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX)); 814 } 815 816 /** 817 * Performs the mob compaction. 818 * @param conf the Configuration 819 * @param fs the file system 820 * @param tableName the table the compact 821 * @param hcd the column descriptor 822 * @param pool the thread pool 823 * @param allFiles Whether add all mob files into the compaction. 824 */ 825 public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName, 826 ColumnFamilyDescriptor hcd, ExecutorService pool, boolean allFiles, 827 LockManager.MasterLock lock) 828 throws IOException { 829 String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, 830 PartitionedMobCompactor.class.getName()); 831 // instantiate the mob compactor. 832 MobCompactor compactor = null; 833 try { 834 compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { 835 Configuration.class, FileSystem.class, TableName.class, ColumnFamilyDescriptor.class, 836 ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool }); 837 } catch (Exception e) { 838 throw new IOException("Unable to load configured mob file compactor '" + className + "'", e); 839 } 840 // compact only for mob-enabled column. 841 // obtain a write table lock before performing compaction to avoid race condition 842 // with major compaction in mob-enabled column. 843 try { 844 lock.acquire(); 845 LOG.info("start MOB compaction of files for table='{}', column='{}', allFiles={}, " + 846 "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass()); 847 compactor.compact(allFiles); 848 } catch (Exception e) { 849 LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString() 850 + " in the table " + tableName.getNameAsString(), e); 851 } finally { 852 LOG.info("end MOB compaction of files for table='{}', column='{}', allFiles={}, " + 853 "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass()); 854 lock.release(); 855 } 856 } 857 858 /** 859 * Creates a thread pool. 860 * @param conf the Configuration 861 * @return A thread pool. 862 */ 863 public static ExecutorService createMobCompactorThreadPool(Configuration conf) { 864 int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX, 865 MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX); 866 if (maxThreads == 0) { 867 maxThreads = 1; 868 } 869 final SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); 870 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue, 871 Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() { 872 @Override 873 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 874 try { 875 // waiting for a thread to pick up instead of throwing exceptions. 876 queue.put(r); 877 } catch (InterruptedException e) { 878 throw new RejectedExecutionException(e); 879 } 880 } 881 }); 882 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); 883 return pool; 884 } 885 886 /** 887 * Checks whether this table has mob-enabled columns. 888 * @param htd The current table descriptor. 889 * @return Whether this table has mob-enabled columns. 890 */ 891 public static boolean hasMobColumns(TableDescriptor htd) { 892 ColumnFamilyDescriptor[] hcds = htd.getColumnFamilies(); 893 for (ColumnFamilyDescriptor hcd : hcds) { 894 if (hcd.isMobEnabled()) { 895 return true; 896 } 897 } 898 return false; 899 } 900 901 /** 902 * Indicates whether return null value when the mob file is missing or corrupt. 903 * The information is set in the attribute "empty.value.on.mobcell.miss" of scan. 904 * @param scan The current scan. 905 * @return True if the readEmptyValueOnMobCellMiss is enabled. 906 */ 907 public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) { 908 byte[] readEmptyValueOnMobCellMiss = 909 scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS); 910 try { 911 return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss); 912 } catch (IllegalArgumentException e) { 913 return false; 914 } 915 } 916 917 /** 918 * Creates a mob ref delete marker. 919 * @param cell The current delete marker. 920 * @return A delete marker with the ref tag. 921 */ 922 public static Cell createMobRefDeleteMarker(Cell cell) { 923 return PrivateCellUtil.createCell(cell, TagUtil.concatTags(REF_DELETE_MARKER_TAG_BYTES, cell)); 924 } 925 926 /** 927 * Checks if the mob file is expired. 928 * @param column The descriptor of the current column family. 929 * @param current The current time. 930 * @param fileDate The date string parsed from the mob file name. 931 * @return True if the mob file is expired. 932 */ 933 public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long current, 934 String fileDate) { 935 if (column.getMinVersions() > 0) { 936 return false; 937 } 938 long timeToLive = column.getTimeToLive(); 939 if (Integer.MAX_VALUE == timeToLive) { 940 return false; 941 } 942 943 Date expireDate = new Date(current - timeToLive * 1000); 944 expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); 945 try { 946 Date date = parseDate(fileDate); 947 if (date.getTime() < expireDate.getTime()) { 948 return true; 949 } 950 } catch (ParseException e) { 951 LOG.warn("Failed to parse the date " + fileDate, e); 952 return false; 953 } 954 return false; 955 } 956 957 /** 958 * fill out partition id based on compaction policy and date, threshold... 959 * @param id Partition id to be filled out 960 * @param firstDayOfCurrentMonth The first day in the current month 961 * @param firstDayOfCurrentWeek The first day in the current week 962 * @param dateStr Date string from the mob file 963 * @param policy Mob compaction policy 964 * @param calendar Calendar object 965 * @param threshold Mob compaciton threshold configured 966 * @return true if the file needs to be excluded from compaction 967 */ 968 public static boolean fillPartitionId(final CompactionPartitionId id, 969 final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr, 970 final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) { 971 972 boolean skipCompcation = false; 973 id.setThreshold(threshold); 974 if (threshold <= 0) { 975 id.setDate(dateStr); 976 return skipCompcation; 977 } 978 979 long finalThreshold; 980 Date date; 981 try { 982 date = MobUtils.parseDate(dateStr); 983 } catch (ParseException e) { 984 LOG.warn("Failed to parse date " + dateStr, e); 985 id.setDate(dateStr); 986 return true; 987 } 988 989 /* The algorithm works as follows: 990 * For monthly policy: 991 * 1). If the file's date is in past months, apply 4 * 7 * threshold 992 * 2). If the file's date is in past weeks, apply 7 * threshold 993 * 3). If the file's date is in current week, exclude it from the compaction 994 * For weekly policy: 995 * 1). If the file's date is in past weeks, apply 7 * threshold 996 * 2). If the file's date in currently, apply threshold 997 * For daily policy: 998 * 1). apply threshold 999 */ 1000 if (policy == MobCompactPartitionPolicy.MONTHLY) { 1001 if (date.before(firstDayOfCurrentMonth)) { 1002 // Check overflow 1003 if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) { 1004 finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold; 1005 } else { 1006 finalThreshold = Long.MAX_VALUE; 1007 } 1008 id.setThreshold(finalThreshold); 1009 1010 // set to the date for the first day of that month 1011 id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date))); 1012 return skipCompcation; 1013 } 1014 } 1015 1016 if ((policy == MobCompactPartitionPolicy.MONTHLY) || 1017 (policy == MobCompactPartitionPolicy.WEEKLY)) { 1018 // Check if it needs to apply weekly multiplier 1019 if (date.before(firstDayOfCurrentWeek)) { 1020 // Check overflow 1021 if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) { 1022 finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold; 1023 } else { 1024 finalThreshold = Long.MAX_VALUE; 1025 } 1026 id.setThreshold(finalThreshold); 1027 1028 id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date))); 1029 return skipCompcation; 1030 } else if (policy == MobCompactPartitionPolicy.MONTHLY) { 1031 skipCompcation = true; 1032 } 1033 } 1034 1035 // Rest is daily 1036 id.setDate(dateStr); 1037 return skipCompcation; 1038 } 1039}