001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.text.ParseException;
024import java.text.SimpleDateFormat;
025import java.util.ArrayList;
026import java.util.Calendar;
027import java.util.Collection;
028import java.util.Date;
029import java.util.List;
030import java.util.Optional;
031import java.util.UUID;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.RejectedExecutionException;
034import java.util.concurrent.RejectedExecutionHandler;
035import java.util.concurrent.SynchronousQueue;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.CellComparator;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.PrivateCellUtil;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.Tag;
049import org.apache.hadoop.hbase.TagType;
050import org.apache.hadoop.hbase.TagUtil;
051import org.apache.hadoop.hbase.backup.HFileArchiver;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
053import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionInfoBuilder;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.io.HFileLink;
059import org.apache.hadoop.hbase.io.compress.Compression;
060import org.apache.hadoop.hbase.io.crypto.Encryption;
061import org.apache.hadoop.hbase.io.hfile.CacheConfig;
062import org.apache.hadoop.hbase.io.hfile.HFile;
063import org.apache.hadoop.hbase.io.hfile.HFileContext;
064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
065import org.apache.hadoop.hbase.master.locking.LockManager;
066import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
067import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
068import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
069import org.apache.hadoop.hbase.regionserver.BloomType;
070import org.apache.hadoop.hbase.regionserver.HStore;
071import org.apache.hadoop.hbase.regionserver.HStoreFile;
072import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.ChecksumType;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.FSUtils;
077import org.apache.hadoop.hbase.util.ReflectionUtils;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083/**
084 * The mob utilities
085 */
086@InterfaceAudience.Private
087public final class MobUtils {
088
089  private static final Logger LOG = LoggerFactory.getLogger(MobUtils.class);
090  private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7;
091  private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER;
092
093  private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
094      new ThreadLocal<SimpleDateFormat>() {
095    @Override
096    protected SimpleDateFormat initialValue() {
097      return new SimpleDateFormat("yyyyMMdd");
098    }
099  };
100
101  private static final byte[] REF_DELETE_MARKER_TAG_BYTES;
102  static {
103    List<Tag> tags = new ArrayList<>();
104    tags.add(MobConstants.MOB_REF_TAG);
105    REF_DELETE_MARKER_TAG_BYTES = TagUtil.fromList(tags);
106  }
107
108  /**
109   * Private constructor to keep this class from being instantiated.
110   */
111  private MobUtils() {
112  }
113
114  /**
115   * Formats a date to a string.
116   * @param date The date.
117   * @return The string format of the date, it's yyyymmdd.
118   */
119  public static String formatDate(Date date) {
120    return LOCAL_FORMAT.get().format(date);
121  }
122
123  /**
124   * Parses the string to a date.
125   * @param dateString The string format of a date, it's yyyymmdd.
126   * @return A date.
127   * @throws ParseException
128   */
129  public static Date parseDate(String dateString) throws ParseException {
130    return LOCAL_FORMAT.get().parse(dateString);
131  }
132
133  /**
134   * Get the first day of the input date's month
135   * @param calendar Calendar object
136   * @param date The date to find out its first day of that month
137   * @return The first day in the month
138   */
139  public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) {
140
141    calendar.setTime(date);
142    calendar.set(Calendar.HOUR_OF_DAY, 0);
143    calendar.set(Calendar.MINUTE, 0);
144    calendar.set(Calendar.SECOND, 0);
145    calendar.set(Calendar.MILLISECOND, 0);
146    calendar.set(Calendar.DAY_OF_MONTH, 1);
147
148    Date firstDayInMonth = calendar.getTime();
149    return firstDayInMonth;
150  }
151
152  /**
153   * Get the first day of the input date's week
154   * @param calendar Calendar object
155   * @param date The date to find out its first day of that week
156   * @return The first day in the week
157   */
158  public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) {
159
160    calendar.setTime(date);
161    calendar.set(Calendar.HOUR_OF_DAY, 0);
162    calendar.set(Calendar.MINUTE, 0);
163    calendar.set(Calendar.SECOND, 0);
164    calendar.set(Calendar.MILLISECOND, 0);
165    calendar.setFirstDayOfWeek(Calendar.MONDAY);
166    calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
167
168    Date firstDayInWeek = calendar.getTime();
169    return firstDayInWeek;
170  }
171
172  /**
173   * Whether the current cell is a mob reference cell.
174   * @param cell The current cell.
175   * @return True if the cell has a mob reference tag, false if it doesn't.
176   */
177  public static boolean isMobReferenceCell(Cell cell) {
178    if (cell.getTagsLength() > 0) {
179      Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_REFERENCE_TAG_TYPE);
180      if (tag.isPresent()) {
181        return true;
182      }
183    }
184    return false;
185  }
186
187  /**
188   * Gets the table name tag.
189   * @param cell The current cell.
190   * @return The table name tag.
191   */
192  public static Tag getTableNameTag(Cell cell) {
193    if (cell.getTagsLength() > 0) {
194      Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
195      if (tag.isPresent()) {
196        return tag.get();
197      }
198    }
199    return null;
200  }
201
202  /**
203   * Whether the tag list has a mob reference tag.
204   * @param tags The tag list.
205   * @return True if the list has a mob reference tag, false if it doesn't.
206   */
207  public static boolean hasMobReferenceTag(List<Tag> tags) {
208    if (!tags.isEmpty()) {
209      for (Tag tag : tags) {
210        if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
211          return true;
212        }
213      }
214    }
215    return false;
216  }
217
218  /**
219   * Indicates whether it's a raw scan.
220   * The information is set in the attribute "hbase.mob.scan.raw" of scan.
221   * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
222   * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
223   * the mob file.
224   * @param scan The current scan.
225   * @return True if it's a raw scan.
226   */
227  public static boolean isRawMobScan(Scan scan) {
228    byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
229    try {
230      return raw != null && Bytes.toBoolean(raw);
231    } catch (IllegalArgumentException e) {
232      return false;
233    }
234  }
235
236  /**
237   * Indicates whether it's a reference only scan.
238   * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
239   * If it's a ref only scan, only the cells with ref tag are returned.
240   * @param scan The current scan.
241   * @return True if it's a ref only scan.
242   */
243  public static boolean isRefOnlyScan(Scan scan) {
244    byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
245    try {
246      return refOnly != null && Bytes.toBoolean(refOnly);
247    } catch (IllegalArgumentException e) {
248      return false;
249    }
250  }
251
252  /**
253   * Indicates whether the scan contains the information of caching blocks.
254   * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
255   * @param scan The current scan.
256   * @return True when the Scan attribute specifies to cache the MOB blocks.
257   */
258  public static boolean isCacheMobBlocks(Scan scan) {
259    byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
260    try {
261      return cache != null && Bytes.toBoolean(cache);
262    } catch (IllegalArgumentException e) {
263      return false;
264    }
265  }
266
267  /**
268   * Sets the attribute of caching blocks in the scan.
269   *
270   * @param scan
271   *          The current scan.
272   * @param cacheBlocks
273   *          True, set the attribute of caching blocks into the scan, the scanner with this scan
274   *          caches blocks.
275   *          False, the scanner doesn't cache blocks for this scan.
276   */
277  public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
278    scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
279  }
280
281  /**
282   * Cleans the expired mob files.
283   * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
284   * the minVersions of that column family is 0.
285   * @param fs The current file system.
286   * @param conf The current configuration.
287   * @param tableName The current table name.
288   * @param columnDescriptor The descriptor of the current column family.
289   * @param cacheConfig The cacheConfig that disables the block cache.
290   * @param current The current time.
291   * @throws IOException
292   */
293  public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
294      ColumnFamilyDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
295      throws IOException {
296    long timeToLive = columnDescriptor.getTimeToLive();
297    if (Integer.MAX_VALUE == timeToLive) {
298      // no need to clean, because the TTL is not set.
299      return;
300    }
301
302    Calendar calendar = Calendar.getInstance();
303    calendar.setTimeInMillis(current - timeToLive * 1000);
304    calendar.set(Calendar.HOUR_OF_DAY, 0);
305    calendar.set(Calendar.MINUTE, 0);
306    calendar.set(Calendar.SECOND, 0);
307
308    Date expireDate = calendar.getTime();
309
310    LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
311
312    FileStatus[] stats = null;
313    Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
314    Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
315    try {
316      stats = fs.listStatus(path);
317    } catch (FileNotFoundException e) {
318      LOG.warn("Failed to find the mob file " + path, e);
319    }
320    if (null == stats) {
321      // no file found
322      return;
323    }
324    List<HStoreFile> filesToClean = new ArrayList<>();
325    int deletedFileCount = 0;
326    for (FileStatus file : stats) {
327      String fileName = file.getPath().getName();
328      try {
329        if (HFileLink.isHFileLink(file.getPath())) {
330          HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
331          fileName = hfileLink.getOriginPath().getName();
332        }
333
334        Date fileDate = parseDate(MobFileName.getDateFromName(fileName));
335
336        if (LOG.isDebugEnabled()) {
337          LOG.debug("Checking file " + fileName);
338        }
339        if (fileDate.getTime() < expireDate.getTime()) {
340          if (LOG.isDebugEnabled()) {
341            LOG.debug(fileName + " is an expired file");
342          }
343          filesToClean
344              .add(new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true));
345        }
346      } catch (Exception e) {
347        LOG.error("Cannot parse the fileName " + fileName, e);
348      }
349    }
350    if (!filesToClean.isEmpty()) {
351      try {
352        removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
353            filesToClean);
354        deletedFileCount = filesToClean.size();
355      } catch (IOException e) {
356        LOG.error("Failed to delete the mob files " + filesToClean, e);
357      }
358    }
359    LOG.info(deletedFileCount + " expired mob files are deleted");
360  }
361
362  /**
363   * Gets the root dir of the mob files.
364   * It's {HBASE_DIR}/mobdir.
365   * @param conf The current configuration.
366   * @return the root dir of the mob file.
367   */
368  public static Path getMobHome(Configuration conf) {
369    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
370    return getMobHome(hbaseDir);
371  }
372
373  /**
374   * Gets the root dir of the mob files under the qualified HBase root dir.
375   * It's {rootDir}/mobdir.
376   * @param rootDir The qualified path of HBase root directory.
377   * @return The root dir of the mob file.
378   */
379  public static Path getMobHome(Path rootDir) {
380    return new Path(rootDir, MobConstants.MOB_DIR_NAME);
381  }
382
383  /**
384   * Gets the qualified root dir of the mob files.
385   * @param conf The current configuration.
386   * @return The qualified root dir.
387   * @throws IOException
388   */
389  public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
390    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
391    Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
392    FileSystem fs = mobRootDir.getFileSystem(conf);
393    return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
394  }
395
396  /**
397   * Gets the table dir of the mob files under the qualified HBase root dir.
398   * It's {rootDir}/mobdir/data/${namespace}/${tableName}
399   * @param rootDir The qualified path of HBase root directory.
400   * @param tableName The name of table.
401   * @return The table dir of the mob file.
402   */
403  public static Path getMobTableDir(Path rootDir, TableName tableName) {
404    return FSUtils.getTableDir(getMobHome(rootDir), tableName);
405  }
406
407  /**
408   * Gets the region dir of the mob files.
409   * It's {HBASE_DIR}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}.
410   * @param conf The current configuration.
411   * @param tableName The current table name.
412   * @return The region dir of the mob files.
413   */
414  public static Path getMobRegionPath(Configuration conf, TableName tableName) {
415    return getMobRegionPath(new Path(conf.get(HConstants.HBASE_DIR)), tableName);
416  }
417
418  /**
419   * Gets the region dir of the mob files under the specified root dir.
420   * It's {rootDir}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}.
421   * @param rootDir The qualified path of HBase root directory.
422   * @param tableName The current table name.
423   * @return The region dir of the mob files.
424   */
425  public static Path getMobRegionPath(Path rootDir, TableName tableName) {
426    Path tablePath = FSUtils.getTableDir(getMobHome(rootDir), tableName);
427    RegionInfo regionInfo = getMobRegionInfo(tableName);
428    return new Path(tablePath, regionInfo.getEncodedName());
429  }
430
431  /**
432   * Gets the family dir of the mob files.
433   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
434   * @param conf The current configuration.
435   * @param tableName The current table name.
436   * @param familyName The current family name.
437   * @return The family dir of the mob files.
438   */
439  public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
440    return new Path(getMobRegionPath(conf, tableName), familyName);
441  }
442
443  /**
444   * Gets the family dir of the mob files.
445   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
446   * @param regionPath The path of mob region which is a dummy one.
447   * @param familyName The current family name.
448   * @return The family dir of the mob files.
449   */
450  public static Path getMobFamilyPath(Path regionPath, String familyName) {
451    return new Path(regionPath, familyName);
452  }
453
454  /**
455   * Gets the RegionInfo of the mob files.
456   * This is a dummy region. The mob files are not saved in a region in HBase.
457   * This is only used in mob snapshot. It's internally used only.
458   * @param tableName
459   * @return A dummy mob region info.
460   */
461  public static RegionInfo getMobRegionInfo(TableName tableName) {
462    return RegionInfoBuilder.newBuilder(tableName)
463        .setStartKey(MobConstants.MOB_REGION_NAME_BYTES)
464        .setEndKey(HConstants.EMPTY_END_ROW)
465        .setSplit(false)
466        .setRegionId(0)
467        .build();
468  }
469
470  /**
471   * Gets whether the current RegionInfo is a mob one.
472   * @param regionInfo The current RegionInfo.
473   * @return If true, the current RegionInfo is a mob one.
474   */
475  public static boolean isMobRegionInfo(RegionInfo regionInfo) {
476    return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
477        .equals(regionInfo.getEncodedName());
478  }
479
480  /**
481   * Gets whether the current region name follows the pattern of a mob region name.
482   * @param tableName The current table name.
483   * @param regionName The current region name.
484   * @return True if the current region name follows the pattern of a mob region name.
485   */
486  public static boolean isMobRegionName(TableName tableName, byte[] regionName) {
487    return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName());
488  }
489
490  /**
491   * Gets the working directory of the mob compaction.
492   * @param root The root directory of the mob compaction.
493   * @param jobName The current job name.
494   * @return The directory of the mob compaction for the current job.
495   */
496  public static Path getCompactionWorkingPath(Path root, String jobName) {
497    return new Path(root, jobName);
498  }
499
500  /**
501   * Archives the mob files.
502   * @param conf The current configuration.
503   * @param fs The current file system.
504   * @param tableName The table name.
505   * @param tableDir The table directory.
506   * @param family The name of the column family.
507   * @param storeFiles The files to be deleted.
508   * @throws IOException
509   */
510  public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
511      Path tableDir, byte[] family, Collection<HStoreFile> storeFiles) throws IOException {
512    HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
513        storeFiles);
514  }
515
516  /**
517   * Creates a mob reference KeyValue.
518   * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
519   * @param cell The original Cell.
520   * @param fileName The mob file name where the mob reference KeyValue is written.
521   * @param tableNameTag The tag of the current table name. It's very important in
522   *                        cloning the snapshot.
523   * @return The mob reference KeyValue.
524   */
525  public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) {
526    // Append the tags to the KeyValue.
527    // The key is same, the value is the filename of the mob file
528    List<Tag> tags = new ArrayList<>();
529    // Add the ref tag as the 1st one.
530    tags.add(MobConstants.MOB_REF_TAG);
531    // Add the tag of the source table name, this table is where this mob file is flushed
532    // from.
533    // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
534    // find the original mob files by this table name. For details please see cloning
535    // snapshot for mob files.
536    tags.add(tableNameTag);
537    return createMobRefCell(cell, fileName, TagUtil.fromList(tags));
538  }
539
540  public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTags) {
541    byte[] refValue = Bytes.add(Bytes.toBytes(cell.getValueLength()), fileName);
542    return PrivateCellUtil.createCell(cell, refValue, TagUtil.concatTags(refCellTags, cell));
543  }
544
545  /**
546   * Creates a writer for the mob file in temp directory.
547   * @param conf The current configuration.
548   * @param fs The current file system.
549   * @param family The descriptor of the current column family.
550   * @param date The date string, its format is yyyymmmdd.
551   * @param basePath The basic path for a temp directory.
552   * @param maxKeyCount The key count.
553   * @param compression The compression algorithm.
554   * @param startKey The hex string of the start key.
555   * @param cacheConfig The current cache config.
556   * @param cryptoContext The encryption context.
557   * @param isCompaction If the writer is used in compaction.
558   * @return The writer for the mob file.
559   * @throws IOException
560   */
561  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
562      ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
563      Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
564      Encryption.Context cryptoContext, boolean isCompaction)
565      throws IOException {
566    MobFileName mobFileName = MobFileName.create(startKey, date,
567        UUID.randomUUID().toString().replaceAll("-", ""));
568    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
569        cacheConfig, cryptoContext, isCompaction);
570  }
571
572  /**
573   * Creates a writer for the ref file in temp directory.
574   * @param conf The current configuration.
575   * @param fs The current file system.
576   * @param family The descriptor of the current column family.
577   * @param basePath The basic path for a temp directory.
578   * @param maxKeyCount The key count.
579   * @param cacheConfig The current cache config.
580   * @param cryptoContext The encryption context.
581   * @param isCompaction If the writer is used in compaction.
582   * @return The writer for the mob file.
583   * @throws IOException
584   */
585  public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs,
586    ColumnFamilyDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
587    Encryption.Context cryptoContext, boolean isCompaction)
588    throws IOException {
589    return createWriter(conf, fs, family,
590      new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount,
591      family.getCompactionCompressionType(), cacheConfig, cryptoContext,
592      HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(),
593      family.getBloomFilterType(), isCompaction);
594  }
595
596  /**
597   * Creates a writer for the mob file in temp directory.
598   * @param conf The current configuration.
599   * @param fs The current file system.
600   * @param family The descriptor of the current column family.
601   * @param date The date string, its format is yyyymmmdd.
602   * @param basePath The basic path for a temp directory.
603   * @param maxKeyCount The key count.
604   * @param compression The compression algorithm.
605   * @param startKey The start key.
606   * @param cacheConfig The current cache config.
607   * @param cryptoContext The encryption context.
608   * @param isCompaction If the writer is used in compaction.
609   * @return The writer for the mob file.
610   * @throws IOException
611   */
612  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
613      ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
614      Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
615      Encryption.Context cryptoContext, boolean isCompaction)
616      throws IOException {
617    MobFileName mobFileName = MobFileName.create(startKey, date,
618        UUID.randomUUID().toString().replaceAll("-", ""));
619    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
620      cacheConfig, cryptoContext, isCompaction);
621  }
622
623  /**
624   * Creates a writer for the del file in temp directory.
625   * @param conf The current configuration.
626   * @param fs The current file system.
627   * @param family The descriptor of the current column family.
628   * @param date The date string, its format is yyyymmmdd.
629   * @param basePath The basic path for a temp directory.
630   * @param maxKeyCount The key count.
631   * @param compression The compression algorithm.
632   * @param startKey The start key.
633   * @param cacheConfig The current cache config.
634   * @param cryptoContext The encryption context.
635   * @return The writer for the del file.
636   * @throws IOException
637   */
638  public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem fs,
639      ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
640      Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
641      Encryption.Context cryptoContext)
642      throws IOException {
643    String suffix = UUID
644      .randomUUID().toString().replaceAll("-", "") + "_del";
645    MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
646    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
647      cacheConfig, cryptoContext, true);
648  }
649
650  /**
651   * Creates a writer for the mob file in temp directory.
652   * @param conf The current configuration.
653   * @param fs The current file system.
654   * @param family The descriptor of the current column family.
655   * @param mobFileName The mob file name.
656   * @param basePath The basic path for a temp directory.
657   * @param maxKeyCount The key count.
658   * @param compression The compression algorithm.
659   * @param cacheConfig The current cache config.
660   * @param cryptoContext The encryption context.
661   * @param isCompaction If the writer is used in compaction.
662   * @return The writer for the mob file.
663   * @throws IOException
664   */
665  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
666                                             ColumnFamilyDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
667      Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
668      boolean isCompaction)
669      throws IOException {
670    return createWriter(conf, fs, family,
671      new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig,
672      cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf),
673      family.getBlocksize(), BloomType.NONE, isCompaction);
674  }
675
676  /**
677   * Creates a writer for the mob file in temp directory.
678   * @param conf The current configuration.
679   * @param fs The current file system.
680   * @param family The descriptor of the current column family.
681   * @param path The path for a temp directory.
682   * @param maxKeyCount The key count.
683   * @param compression The compression algorithm.
684   * @param cacheConfig The current cache config.
685   * @param cryptoContext The encryption context.
686   * @param checksumType The checksum type.
687   * @param bytesPerChecksum The bytes per checksum.
688   * @param blocksize The HFile block size.
689   * @param bloomType The bloom filter type.
690   * @param isCompaction If the writer is used in compaction.
691   * @return The writer for the mob file.
692   * @throws IOException
693   */
694  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
695      ColumnFamilyDescriptor family, Path path, long maxKeyCount,
696      Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
697      ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType,
698      boolean isCompaction)
699      throws IOException {
700    if (compression == null) {
701      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
702    }
703    final CacheConfig writerCacheConf;
704    if (isCompaction) {
705      writerCacheConf = new CacheConfig(cacheConfig);
706      writerCacheConf.setCacheDataOnWrite(false);
707    } else {
708      writerCacheConf = cacheConfig;
709    }
710    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
711        .withIncludesMvcc(true).withIncludesTags(true)
712        .withCompressTags(family.isCompressTags())
713        .withChecksumType(checksumType)
714        .withBytesPerCheckSum(bytesPerChecksum)
715        .withBlockSize(blocksize)
716        .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
717        .withEncryptionContext(cryptoContext)
718        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
719
720    StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs)
721        .withFilePath(path)
722        .withComparator(CellComparator.getInstance()).withBloomType(bloomType)
723        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
724    return w;
725  }
726
727  /**
728   * Commits the mob file.
729   * @param conf The current configuration.
730   * @param fs The current file system.
731   * @param sourceFile The path where the mob file is saved.
732   * @param targetPath The directory path where the source file is renamed to.
733   * @param cacheConfig The current cache config.
734   * @return The target file path the source file is renamed to.
735   * @throws IOException
736   */
737  public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
738      Path targetPath, CacheConfig cacheConfig) throws IOException {
739    if (sourceFile == null) {
740      return null;
741    }
742    Path dstPath = new Path(targetPath, sourceFile.getName());
743    validateMobFile(conf, fs, sourceFile, cacheConfig, true);
744    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
745    LOG.info(msg);
746    Path parent = dstPath.getParent();
747    if (!fs.exists(parent)) {
748      fs.mkdirs(parent);
749    }
750    if (!fs.rename(sourceFile, dstPath)) {
751      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
752    }
753    return dstPath;
754  }
755
756  /**
757   * Validates a mob file by opening and closing it.
758   * @param conf The current configuration.
759   * @param fs The current file system.
760   * @param path The path where the mob file is saved.
761   * @param cacheConfig The current cache config.
762   */
763  private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
764      CacheConfig cacheConfig, boolean primaryReplica) throws IOException {
765    HStoreFile storeFile = null;
766    try {
767      storeFile = new HStoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica);
768      storeFile.initReader();
769    } catch (IOException e) {
770      LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
771      throw e;
772    } finally {
773      if (storeFile != null) {
774        storeFile.closeStoreFile(false);
775      }
776    }
777  }
778
779  /**
780   * Indicates whether the current mob ref cell has a valid value.
781   * A mob ref cell has a mob reference tag.
782   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
783   * The real mob value length takes 4 bytes.
784   * The remaining part is the mob file name.
785   * @param cell The mob ref cell.
786   * @return True if the cell has a valid value.
787   */
788  public static boolean hasValidMobRefCellValue(Cell cell) {
789    return cell.getValueLength() > Bytes.SIZEOF_INT;
790  }
791
792  /**
793   * Gets the mob value length from the mob ref cell.
794   * A mob ref cell has a mob reference tag.
795   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
796   * The real mob value length takes 4 bytes.
797   * The remaining part is the mob file name.
798   * @param cell The mob ref cell.
799   * @return The real mob value length.
800   */
801  public static int getMobValueLength(Cell cell) {
802    return PrivateCellUtil.getValueAsInt(cell);
803  }
804
805  /**
806   * Gets the mob file name from the mob ref cell.
807   * A mob ref cell has a mob reference tag.
808   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
809   * The real mob value length takes 4 bytes.
810   * The remaining part is the mob file name.
811   * @param cell The mob ref cell.
812   * @return The mob file name.
813   */
814  public static String getMobFileName(Cell cell) {
815    return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
816        cell.getValueLength() - Bytes.SIZEOF_INT);
817  }
818
819  /**
820   * Gets the table name used in the table lock.
821   * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
822   * @param tn The table name.
823   * @return The table name used in table lock.
824   */
825  public static TableName getTableLockName(TableName tn) {
826    byte[] tableName = tn.getName();
827    return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
828  }
829
830  /**
831   * Performs the mob compaction.
832   * @param conf the Configuration
833   * @param fs the file system
834   * @param tableName the table the compact
835   * @param hcd the column descriptor
836   * @param pool the thread pool
837   * @param allFiles Whether add all mob files into the compaction.
838   */
839  public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
840                                     ColumnFamilyDescriptor hcd, ExecutorService pool, boolean allFiles, LockManager.MasterLock lock)
841      throws IOException {
842    String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
843        PartitionedMobCompactor.class.getName());
844    // instantiate the mob compactor.
845    MobCompactor compactor = null;
846    try {
847      compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
848        Configuration.class, FileSystem.class, TableName.class, ColumnFamilyDescriptor.class,
849        ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool });
850    } catch (Exception e) {
851      throw new IOException("Unable to load configured mob file compactor '" + className + "'", e);
852    }
853    // compact only for mob-enabled column.
854    // obtain a write table lock before performing compaction to avoid race condition
855    // with major compaction in mob-enabled column.
856    try {
857      lock.acquire();
858      LOG.info("start MOB compaction of files for table='{}', column='{}', allFiles={}, " +
859          "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
860      compactor.compact(allFiles);
861    } catch (Exception e) {
862      LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
863          + " in the table " + tableName.getNameAsString(), e);
864    } finally {
865      LOG.info("end MOB compaction of files for table='{}', column='{}', allFiles={}, " +
866          "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
867      lock.release();
868    }
869  }
870
871  /**
872   * Creates a thread pool.
873   * @param conf the Configuration
874   * @return A thread pool.
875   */
876  public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
877    int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
878        MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
879    if (maxThreads == 0) {
880      maxThreads = 1;
881    }
882    final SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
883    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
884      Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
885        @Override
886        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
887          try {
888            // waiting for a thread to pick up instead of throwing exceptions.
889            queue.put(r);
890          } catch (InterruptedException e) {
891            throw new RejectedExecutionException(e);
892          }
893        }
894      });
895    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
896    return pool;
897  }
898
899  /**
900   * Checks whether this table has mob-enabled columns.
901   * @param htd The current table descriptor.
902   * @return Whether this table has mob-enabled columns.
903   */
904  public static boolean hasMobColumns(TableDescriptor htd) {
905    ColumnFamilyDescriptor[] hcds = htd.getColumnFamilies();
906    for (ColumnFamilyDescriptor hcd : hcds) {
907      if (hcd.isMobEnabled()) {
908        return true;
909      }
910    }
911    return false;
912  }
913
914  /**
915   * Indicates whether return null value when the mob file is missing or corrupt.
916   * The information is set in the attribute "empty.value.on.mobcell.miss" of scan.
917   * @param scan The current scan.
918   * @return True if the readEmptyValueOnMobCellMiss is enabled.
919   */
920  public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) {
921    byte[] readEmptyValueOnMobCellMiss =
922      scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS);
923    try {
924      return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss);
925    } catch (IllegalArgumentException e) {
926      return false;
927    }
928  }
929
930  /**
931   * Creates a mob ref delete marker.
932   * @param cell The current delete marker.
933   * @return A delete marker with the ref tag.
934   */
935  public static Cell createMobRefDeleteMarker(Cell cell) {
936    return PrivateCellUtil.createCell(cell, TagUtil.concatTags(REF_DELETE_MARKER_TAG_BYTES, cell));
937  }
938
939  /**
940   * Checks if the mob file is expired.
941   * @param column The descriptor of the current column family.
942   * @param current The current time.
943   * @param fileDate The date string parsed from the mob file name.
944   * @return True if the mob file is expired.
945   */
946  public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long current, String fileDate) {
947    if (column.getMinVersions() > 0) {
948      return false;
949    }
950    long timeToLive = column.getTimeToLive();
951    if (Integer.MAX_VALUE == timeToLive) {
952      return false;
953    }
954
955    Date expireDate = new Date(current - timeToLive * 1000);
956    expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
957    try {
958      Date date = parseDate(fileDate);
959      if (date.getTime() < expireDate.getTime()) {
960        return true;
961      }
962    } catch (ParseException e) {
963      LOG.warn("Failed to parse the date " + fileDate, e);
964      return false;
965    }
966    return false;
967  }
968
969  /**
970   * fill out partition id based on compaction policy and date, threshold...
971   * @param id Partition id to be filled out
972   * @param firstDayOfCurrentMonth The first day in the current month
973   * @param firstDayOfCurrentWeek The first day in the current week
974   * @param dateStr Date string from the mob file
975   * @param policy Mob compaction policy
976   * @param calendar Calendar object
977   * @param threshold Mob compaciton threshold configured
978   * @return true if the file needs to be excluded from compaction
979   */
980  public static boolean fillPartitionId(final CompactionPartitionId id,
981      final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr,
982      final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) {
983
984    boolean skipCompcation = false;
985    id.setThreshold(threshold);
986    if (threshold <= 0) {
987      id.setDate(dateStr);
988      return skipCompcation;
989    }
990
991    long finalThreshold;
992    Date date;
993    try {
994      date = MobUtils.parseDate(dateStr);
995    } catch (ParseException e)  {
996      LOG.warn("Failed to parse date " + dateStr, e);
997      id.setDate(dateStr);
998      return true;
999    }
1000
1001    /* The algorithm works as follows:
1002     *    For monthly policy:
1003     *       1). If the file's date is in past months, apply 4 * 7 * threshold
1004     *       2). If the file's date is in past weeks, apply 7 * threshold
1005     *       3). If the file's date is in current week, exclude it from the compaction
1006     *    For weekly policy:
1007     *       1). If the file's date is in past weeks, apply 7 * threshold
1008     *       2). If the file's date in currently, apply threshold
1009     *    For daily policy:
1010     *       1). apply threshold
1011     */
1012    if (policy == MobCompactPartitionPolicy.MONTHLY) {
1013      if (date.before(firstDayOfCurrentMonth)) {
1014        // Check overflow
1015        if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) {
1016          finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold;
1017        } else {
1018          finalThreshold = Long.MAX_VALUE;
1019        }
1020        id.setThreshold(finalThreshold);
1021
1022        // set to the date for the first day of that month
1023        id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date)));
1024        return skipCompcation;
1025      }
1026    }
1027
1028    if ((policy == MobCompactPartitionPolicy.MONTHLY) ||
1029        (policy == MobCompactPartitionPolicy.WEEKLY)) {
1030      // Check if it needs to apply weekly multiplier
1031      if (date.before(firstDayOfCurrentWeek)) {
1032        // Check overflow
1033        if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) {
1034          finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold;
1035        } else {
1036          finalThreshold = Long.MAX_VALUE;
1037        }
1038        id.setThreshold(finalThreshold);
1039
1040        id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date)));
1041        return skipCompcation;
1042      } else if (policy == MobCompactPartitionPolicy.MONTHLY) {
1043        skipCompcation = true;
1044      }
1045    }
1046
1047    // Rest is daily
1048    id.setDate(dateStr);
1049    return skipCompcation;
1050  }
1051}