001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.text.ParseException;
024import java.text.SimpleDateFormat;
025import java.util.ArrayList;
026import java.util.Calendar;
027import java.util.Collection;
028import java.util.Date;
029import java.util.List;
030import java.util.Optional;
031import java.util.UUID;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.RejectedExecutionException;
034import java.util.concurrent.RejectedExecutionHandler;
035import java.util.concurrent.SynchronousQueue;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.TagType;
048import org.apache.hadoop.hbase.TagUtil;
049import org.apache.hadoop.hbase.backup.HFileArchiver;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
051import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.io.HFileLink;
057import org.apache.hadoop.hbase.io.compress.Compression;
058import org.apache.hadoop.hbase.io.crypto.Encryption;
059import org.apache.hadoop.hbase.io.hfile.CacheConfig;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.io.hfile.HFileContext;
062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
063import org.apache.hadoop.hbase.master.locking.LockManager;
064import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
065import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
066import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
067import org.apache.hadoop.hbase.regionserver.BloomType;
068import org.apache.hadoop.hbase.regionserver.HStore;
069import org.apache.hadoop.hbase.regionserver.HStoreFile;
070import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.ChecksumType;
073import org.apache.hadoop.hbase.util.CommonFSUtils;
074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
075import org.apache.hadoop.hbase.util.ReflectionUtils;
076import org.apache.hadoop.hbase.util.Threads;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081/**
082 * The mob utilities
083 */
084@InterfaceAudience.Private
085public final class MobUtils {
086
087  private static final Logger LOG = LoggerFactory.getLogger(MobUtils.class);
088  private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7;
089  private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER;
090
091  private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
092      new ThreadLocal<SimpleDateFormat>() {
093    @Override
094    protected SimpleDateFormat initialValue() {
095      return new SimpleDateFormat("yyyyMMdd");
096    }
097  };
098
099  private static final byte[] REF_DELETE_MARKER_TAG_BYTES;
100  static {
101    List<Tag> tags = new ArrayList<>();
102    tags.add(MobConstants.MOB_REF_TAG);
103    REF_DELETE_MARKER_TAG_BYTES = TagUtil.fromList(tags);
104  }
105
106  /**
107   * Private constructor to keep this class from being instantiated.
108   */
109  private MobUtils() {
110  }
111
112  /**
113   * Formats a date to a string.
114   * @param date The date.
115   * @return The string format of the date, it's yyyymmdd.
116   */
117  public static String formatDate(Date date) {
118    return LOCAL_FORMAT.get().format(date);
119  }
120
121  /**
122   * Parses the string to a date.
123   * @param dateString The string format of a date, it's yyyymmdd.
124   * @return A date.
125   */
126  public static Date parseDate(String dateString) throws ParseException {
127    return LOCAL_FORMAT.get().parse(dateString);
128  }
129
130  /**
131   * Get the first day of the input date's month
132   * @param calendar Calendar object
133   * @param date The date to find out its first day of that month
134   * @return The first day in the month
135   */
136  public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) {
137
138    calendar.setTime(date);
139    calendar.set(Calendar.HOUR_OF_DAY, 0);
140    calendar.set(Calendar.MINUTE, 0);
141    calendar.set(Calendar.SECOND, 0);
142    calendar.set(Calendar.MILLISECOND, 0);
143    calendar.set(Calendar.DAY_OF_MONTH, 1);
144
145    Date firstDayInMonth = calendar.getTime();
146    return firstDayInMonth;
147  }
148
149  /**
150   * Get the first day of the input date's week
151   * @param calendar Calendar object
152   * @param date The date to find out its first day of that week
153   * @return The first day in the week
154   */
155  public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) {
156
157    calendar.setTime(date);
158    calendar.set(Calendar.HOUR_OF_DAY, 0);
159    calendar.set(Calendar.MINUTE, 0);
160    calendar.set(Calendar.SECOND, 0);
161    calendar.set(Calendar.MILLISECOND, 0);
162    calendar.setFirstDayOfWeek(Calendar.MONDAY);
163    calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
164
165    Date firstDayInWeek = calendar.getTime();
166    return firstDayInWeek;
167  }
168
169  /**
170   * Whether the current cell is a mob reference cell.
171   * @param cell The current cell.
172   * @return True if the cell has a mob reference tag, false if it doesn't.
173   */
174  public static boolean isMobReferenceCell(Cell cell) {
175    if (cell.getTagsLength() > 0) {
176      Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_REFERENCE_TAG_TYPE);
177      if (tag.isPresent()) {
178        return true;
179      }
180    }
181    return false;
182  }
183
184  /**
185   * Gets the table name tag.
186   * @param cell The current cell.
187   * @return The table name tag.
188   */
189  public static Tag getTableNameTag(Cell cell) {
190    if (cell.getTagsLength() > 0) {
191      Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
192      if (tag.isPresent()) {
193        return tag.get();
194      }
195    }
196    return null;
197  }
198
199  /**
200   * Whether the tag list has a mob reference tag.
201   * @param tags The tag list.
202   * @return True if the list has a mob reference tag, false if it doesn't.
203   */
204  public static boolean hasMobReferenceTag(List<Tag> tags) {
205    if (!tags.isEmpty()) {
206      for (Tag tag : tags) {
207        if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
208          return true;
209        }
210      }
211    }
212    return false;
213  }
214
215  /**
216   * Indicates whether it's a raw scan.
217   * The information is set in the attribute "hbase.mob.scan.raw" of scan.
218   * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
219   * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
220   * the mob file.
221   * @param scan The current scan.
222   * @return True if it's a raw scan.
223   */
224  public static boolean isRawMobScan(Scan scan) {
225    byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
226    try {
227      return raw != null && Bytes.toBoolean(raw);
228    } catch (IllegalArgumentException e) {
229      return false;
230    }
231  }
232
233  /**
234   * Indicates whether it's a reference only scan.
235   * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
236   * If it's a ref only scan, only the cells with ref tag are returned.
237   * @param scan The current scan.
238   * @return True if it's a ref only scan.
239   */
240  public static boolean isRefOnlyScan(Scan scan) {
241    byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
242    try {
243      return refOnly != null && Bytes.toBoolean(refOnly);
244    } catch (IllegalArgumentException e) {
245      return false;
246    }
247  }
248
249  /**
250   * Indicates whether the scan contains the information of caching blocks.
251   * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
252   * @param scan The current scan.
253   * @return True when the Scan attribute specifies to cache the MOB blocks.
254   */
255  public static boolean isCacheMobBlocks(Scan scan) {
256    byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
257    try {
258      return cache != null && Bytes.toBoolean(cache);
259    } catch (IllegalArgumentException e) {
260      return false;
261    }
262  }
263
264  /**
265   * Sets the attribute of caching blocks in the scan.
266   *
267   * @param scan
268   *          The current scan.
269   * @param cacheBlocks
270   *          True, set the attribute of caching blocks into the scan, the scanner with this scan
271   *          caches blocks.
272   *          False, the scanner doesn't cache blocks for this scan.
273   */
274  public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
275    scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
276  }
277
278  /**
279   * Cleans the expired mob files.
280   * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
281   * the minVersions of that column family is 0.
282   * @param fs The current file system.
283   * @param conf The current configuration.
284   * @param tableName The current table name.
285   * @param columnDescriptor The descriptor of the current column family.
286   * @param cacheConfig The cacheConfig that disables the block cache.
287   * @param current The current time.
288   */
289  public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
290      ColumnFamilyDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
291      throws IOException {
292    long timeToLive = columnDescriptor.getTimeToLive();
293    if (Integer.MAX_VALUE == timeToLive) {
294      // no need to clean, because the TTL is not set.
295      return;
296    }
297
298    Calendar calendar = Calendar.getInstance();
299    calendar.setTimeInMillis(current - timeToLive * 1000);
300    calendar.set(Calendar.HOUR_OF_DAY, 0);
301    calendar.set(Calendar.MINUTE, 0);
302    calendar.set(Calendar.SECOND, 0);
303
304    Date expireDate = calendar.getTime();
305
306    LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
307
308    FileStatus[] stats = null;
309    Path mobTableDir = CommonFSUtils.getTableDir(getMobHome(conf), tableName);
310    Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
311    try {
312      stats = fs.listStatus(path);
313    } catch (FileNotFoundException e) {
314      LOG.warn("Failed to find the mob file " + path, e);
315    }
316    if (null == stats) {
317      // no file found
318      return;
319    }
320    List<HStoreFile> filesToClean = new ArrayList<>();
321    int deletedFileCount = 0;
322    for (FileStatus file : stats) {
323      String fileName = file.getPath().getName();
324      try {
325        if (HFileLink.isHFileLink(file.getPath())) {
326          HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
327          fileName = hfileLink.getOriginPath().getName();
328        }
329
330        Date fileDate = parseDate(MobFileName.getDateFromName(fileName));
331
332        if (LOG.isDebugEnabled()) {
333          LOG.debug("Checking file " + fileName);
334        }
335        if (fileDate.getTime() < expireDate.getTime()) {
336          if (LOG.isDebugEnabled()) {
337            LOG.debug(fileName + " is an expired file");
338          }
339          filesToClean
340              .add(new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true));
341        }
342      } catch (Exception e) {
343        LOG.error("Cannot parse the fileName " + fileName, e);
344      }
345    }
346    if (!filesToClean.isEmpty()) {
347      try {
348        removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
349            filesToClean);
350        deletedFileCount = filesToClean.size();
351      } catch (IOException e) {
352        LOG.error("Failed to delete the mob files " + filesToClean, e);
353      }
354    }
355    LOG.info(deletedFileCount + " expired mob files are deleted");
356  }
357
358  /**
359   * Gets the root dir of the mob files.
360   * It's {HBASE_DIR}/mobdir.
361   * @param conf The current configuration.
362   * @return the root dir of the mob file.
363   */
364  public static Path getMobHome(Configuration conf) {
365    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
366    return getMobHome(hbaseDir);
367  }
368
369  /**
370   * Gets the root dir of the mob files under the qualified HBase root dir.
371   * It's {rootDir}/mobdir.
372   * @param rootDir The qualified path of HBase root directory.
373   * @return The root dir of the mob file.
374   */
375  public static Path getMobHome(Path rootDir) {
376    return new Path(rootDir, MobConstants.MOB_DIR_NAME);
377  }
378
379  /**
380   * Gets the qualified root dir of the mob files.
381   * @param conf The current configuration.
382   * @return The qualified root dir.
383   */
384  public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
385    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
386    Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
387    FileSystem fs = mobRootDir.getFileSystem(conf);
388    return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
389  }
390
391  /**
392   * Gets the table dir of the mob files under the qualified HBase root dir.
393   * It's {rootDir}/mobdir/data/${namespace}/${tableName}
394   * @param rootDir The qualified path of HBase root directory.
395   * @param tableName The name of table.
396   * @return The table dir of the mob file.
397   */
398  public static Path getMobTableDir(Path rootDir, TableName tableName) {
399    return CommonFSUtils.getTableDir(getMobHome(rootDir), tableName);
400  }
401
402  /**
403   * Gets the region dir of the mob files.
404   * It's {HBASE_DIR}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}.
405   * @param conf The current configuration.
406   * @param tableName The current table name.
407   * @return The region dir of the mob files.
408   */
409  public static Path getMobRegionPath(Configuration conf, TableName tableName) {
410    return getMobRegionPath(new Path(conf.get(HConstants.HBASE_DIR)), tableName);
411  }
412
413  /**
414   * Gets the region dir of the mob files under the specified root dir.
415   * It's {rootDir}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}.
416   * @param rootDir The qualified path of HBase root directory.
417   * @param tableName The current table name.
418   * @return The region dir of the mob files.
419   */
420  public static Path getMobRegionPath(Path rootDir, TableName tableName) {
421    Path tablePath = CommonFSUtils.getTableDir(getMobHome(rootDir), tableName);
422    RegionInfo regionInfo = getMobRegionInfo(tableName);
423    return new Path(tablePath, regionInfo.getEncodedName());
424  }
425
426  /**
427   * Gets the family dir of the mob files.
428   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
429   * @param conf The current configuration.
430   * @param tableName The current table name.
431   * @param familyName The current family name.
432   * @return The family dir of the mob files.
433   */
434  public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
435    return new Path(getMobRegionPath(conf, tableName), familyName);
436  }
437
438  /**
439   * Gets the family dir of the mob files.
440   * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
441   * @param regionPath The path of mob region which is a dummy one.
442   * @param familyName The current family name.
443   * @return The family dir of the mob files.
444   */
445  public static Path getMobFamilyPath(Path regionPath, String familyName) {
446    return new Path(regionPath, familyName);
447  }
448
449  /**
450   * Gets the RegionInfo of the mob files.
451   * This is a dummy region. The mob files are not saved in a region in HBase.
452   * This is only used in mob snapshot. It's internally used only.
453   * @param tableName
454   * @return A dummy mob region info.
455   */
456  public static RegionInfo getMobRegionInfo(TableName tableName) {
457    return RegionInfoBuilder.newBuilder(tableName)
458        .setStartKey(MobConstants.MOB_REGION_NAME_BYTES)
459        .setEndKey(HConstants.EMPTY_END_ROW)
460        .setSplit(false)
461        .setRegionId(0)
462        .build();
463  }
464
465  /**
466   * Gets whether the current RegionInfo is a mob one.
467   * @param regionInfo The current RegionInfo.
468   * @return If true, the current RegionInfo is a mob one.
469   */
470  public static boolean isMobRegionInfo(RegionInfo regionInfo) {
471    return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
472        .equals(regionInfo.getEncodedName());
473  }
474
475  /**
476   * Gets whether the current region name follows the pattern of a mob region name.
477   * @param tableName The current table name.
478   * @param regionName The current region name.
479   * @return True if the current region name follows the pattern of a mob region name.
480   */
481  public static boolean isMobRegionName(TableName tableName, byte[] regionName) {
482    return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName());
483  }
484
485  /**
486   * Gets the working directory of the mob compaction.
487   * @param root The root directory of the mob compaction.
488   * @param jobName The current job name.
489   * @return The directory of the mob compaction for the current job.
490   */
491  public static Path getCompactionWorkingPath(Path root, String jobName) {
492    return new Path(root, jobName);
493  }
494
495  /**
496   * Archives the mob files.
497   * @param conf The current configuration.
498   * @param fs The current file system.
499   * @param tableName The table name.
500   * @param tableDir The table directory.
501   * @param family The name of the column family.
502   * @param storeFiles The files to be deleted.
503   */
504  public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
505      Path tableDir, byte[] family, Collection<HStoreFile> storeFiles) throws IOException {
506    HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
507        storeFiles);
508  }
509
510  /**
511   * Creates a mob reference KeyValue.
512   * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
513   * @param cell The original Cell.
514   * @param fileName The mob file name where the mob reference KeyValue is written.
515   * @param tableNameTag The tag of the current table name. It's very important in
516   *                        cloning the snapshot.
517   * @return The mob reference KeyValue.
518   */
519  public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) {
520    // Append the tags to the KeyValue.
521    // The key is same, the value is the filename of the mob file
522    List<Tag> tags = new ArrayList<>();
523    // Add the ref tag as the 1st one.
524    tags.add(MobConstants.MOB_REF_TAG);
525    // Add the tag of the source table name, this table is where this mob file is flushed
526    // from.
527    // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
528    // find the original mob files by this table name. For details please see cloning
529    // snapshot for mob files.
530    tags.add(tableNameTag);
531    return createMobRefCell(cell, fileName, TagUtil.fromList(tags));
532  }
533
534  public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTags) {
535    byte[] refValue = Bytes.add(Bytes.toBytes(cell.getValueLength()), fileName);
536    return PrivateCellUtil.createCell(cell, refValue, TagUtil.concatTags(refCellTags, cell));
537  }
538
539  /**
540   * Creates a writer for the mob file in temp directory.
541   * @param conf The current configuration.
542   * @param fs The current file system.
543   * @param family The descriptor of the current column family.
544   * @param date The date string, its format is yyyymmmdd.
545   * @param basePath The basic path for a temp directory.
546   * @param maxKeyCount The key count.
547   * @param compression The compression algorithm.
548   * @param startKey The hex string of the start key.
549   * @param cacheConfig The current cache config.
550   * @param cryptoContext The encryption context.
551   * @param isCompaction If the writer is used in compaction.
552   * @return The writer for the mob file.
553   */
554  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
555      ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
556      Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
557      Encryption.Context cryptoContext, boolean isCompaction)
558      throws IOException {
559    MobFileName mobFileName = MobFileName.create(startKey, date,
560        UUID.randomUUID().toString().replaceAll("-", ""));
561    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
562        cacheConfig, cryptoContext, isCompaction);
563  }
564
565  /**
566   * Creates a writer for the ref file in temp directory.
567   * @param conf The current configuration.
568   * @param fs The current file system.
569   * @param family The descriptor of the current column family.
570   * @param basePath The basic path for a temp directory.
571   * @param maxKeyCount The key count.
572   * @param cacheConfig The current cache config.
573   * @param cryptoContext The encryption context.
574   * @param isCompaction If the writer is used in compaction.
575   * @return The writer for the mob file.
576   */
577  public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs,
578    ColumnFamilyDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
579    Encryption.Context cryptoContext, boolean isCompaction)
580    throws IOException {
581    return createWriter(conf, fs, family,
582      new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount,
583      family.getCompactionCompressionType(), cacheConfig, cryptoContext,
584      HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(),
585      family.getBloomFilterType(), isCompaction);
586  }
587
588  /**
589   * Creates a writer for the mob file in temp directory.
590   * @param conf The current configuration.
591   * @param fs The current file system.
592   * @param family The descriptor of the current column family.
593   * @param date The date string, its format is yyyymmmdd.
594   * @param basePath The basic path for a temp directory.
595   * @param maxKeyCount The key count.
596   * @param compression The compression algorithm.
597   * @param startKey The start key.
598   * @param cacheConfig The current cache config.
599   * @param cryptoContext The encryption context.
600   * @param isCompaction If the writer is used in compaction.
601   * @return The writer for the mob file.
602   */
603  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
604      ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
605      Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
606      Encryption.Context cryptoContext, boolean isCompaction)
607      throws IOException {
608    MobFileName mobFileName = MobFileName.create(startKey, date,
609        UUID.randomUUID().toString().replaceAll("-", ""));
610    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
611      cacheConfig, cryptoContext, isCompaction);
612  }
613
614  /**
615   * Creates a writer for the del file in temp directory.
616   * @param conf The current configuration.
617   * @param fs The current file system.
618   * @param family The descriptor of the current column family.
619   * @param date The date string, its format is yyyymmmdd.
620   * @param basePath The basic path for a temp directory.
621   * @param maxKeyCount The key count.
622   * @param compression The compression algorithm.
623   * @param startKey The start key.
624   * @param cacheConfig The current cache config.
625   * @param cryptoContext The encryption context.
626   * @return The writer for the del file.
627   */
628  public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem fs,
629      ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
630      Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
631      Encryption.Context cryptoContext)
632      throws IOException {
633    String suffix = UUID
634      .randomUUID().toString().replaceAll("-", "") + "_del";
635    MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
636    return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
637      cacheConfig, cryptoContext, true);
638  }
639
640  /**
641   * Creates a writer for the mob file in temp directory.
642   * @param conf The current configuration.
643   * @param fs The current file system.
644   * @param family The descriptor of the current column family.
645   * @param mobFileName The mob file name.
646   * @param basePath The basic path for a temp directory.
647   * @param maxKeyCount The key count.
648   * @param compression The compression algorithm.
649   * @param cacheConfig The current cache config.
650   * @param cryptoContext The encryption context.
651   * @param isCompaction If the writer is used in compaction.
652   * @return The writer for the mob file.
653   */
654  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
655      ColumnFamilyDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
656      Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
657      boolean isCompaction)
658      throws IOException {
659    return createWriter(conf, fs, family,
660      new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig,
661      cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf),
662      family.getBlocksize(), BloomType.NONE, isCompaction);
663  }
664
665  /**
666   * Creates a writer for the mob file in temp directory.
667   * @param conf The current configuration.
668   * @param fs The current file system.
669   * @param family The descriptor of the current column family.
670   * @param path The path for a temp directory.
671   * @param maxKeyCount The key count.
672   * @param compression The compression algorithm.
673   * @param cacheConfig The current cache config.
674   * @param cryptoContext The encryption context.
675   * @param checksumType The checksum type.
676   * @param bytesPerChecksum The bytes per checksum.
677   * @param blocksize The HFile block size.
678   * @param bloomType The bloom filter type.
679   * @param isCompaction If the writer is used in compaction.
680   * @return The writer for the mob file.
681   */
682  public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
683      ColumnFamilyDescriptor family, Path path, long maxKeyCount,
684      Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
685      ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType,
686      boolean isCompaction)
687      throws IOException {
688    if (compression == null) {
689      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
690    }
691    final CacheConfig writerCacheConf;
692    if (isCompaction) {
693      writerCacheConf = new CacheConfig(cacheConfig);
694      writerCacheConf.setCacheDataOnWrite(false);
695    } else {
696      writerCacheConf = cacheConfig;
697    }
698    HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
699        .withIncludesMvcc(true).withIncludesTags(true)
700        .withCompressTags(family.isCompressTags())
701        .withChecksumType(checksumType)
702        .withBytesPerCheckSum(bytesPerChecksum)
703        .withBlockSize(blocksize)
704        .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
705        .withEncryptionContext(cryptoContext)
706        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
707
708    StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs)
709        .withFilePath(path).withBloomType(bloomType)
710        .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
711    return w;
712  }
713
714  /**
715   * Commits the mob file.
716   * @param conf The current configuration.
717   * @param fs The current file system.
718   * @param sourceFile The path where the mob file is saved.
719   * @param targetPath The directory path where the source file is renamed to.
720   * @param cacheConfig The current cache config.
721   * @return The target file path the source file is renamed to.
722   */
723  public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
724      Path targetPath, CacheConfig cacheConfig) throws IOException {
725    if (sourceFile == null) {
726      return null;
727    }
728    Path dstPath = new Path(targetPath, sourceFile.getName());
729    validateMobFile(conf, fs, sourceFile, cacheConfig, true);
730    String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
731    LOG.info(msg);
732    Path parent = dstPath.getParent();
733    if (!fs.exists(parent)) {
734      fs.mkdirs(parent);
735    }
736    if (!fs.rename(sourceFile, dstPath)) {
737      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
738    }
739    return dstPath;
740  }
741
742  /**
743   * Validates a mob file by opening and closing it.
744   * @param conf The current configuration.
745   * @param fs The current file system.
746   * @param path The path where the mob file is saved.
747   * @param cacheConfig The current cache config.
748   */
749  private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
750      CacheConfig cacheConfig, boolean primaryReplica) throws IOException {
751    HStoreFile storeFile = null;
752    try {
753      storeFile = new HStoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica);
754      storeFile.initReader();
755    } catch (IOException e) {
756      LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
757      throw e;
758    } finally {
759      if (storeFile != null) {
760        storeFile.closeStoreFile(false);
761      }
762    }
763  }
764
765  /**
766   * Indicates whether the current mob ref cell has a valid value.
767   * A mob ref cell has a mob reference tag.
768   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
769   * The real mob value length takes 4 bytes.
770   * The remaining part is the mob file name.
771   * @param cell The mob ref cell.
772   * @return True if the cell has a valid value.
773   */
774  public static boolean hasValidMobRefCellValue(Cell cell) {
775    return cell.getValueLength() > Bytes.SIZEOF_INT;
776  }
777
778  /**
779   * Gets the mob value length from the mob ref cell.
780   * A mob ref cell has a mob reference tag.
781   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
782   * The real mob value length takes 4 bytes.
783   * The remaining part is the mob file name.
784   * @param cell The mob ref cell.
785   * @return The real mob value length.
786   */
787  public static int getMobValueLength(Cell cell) {
788    return PrivateCellUtil.getValueAsInt(cell);
789  }
790
791  /**
792   * Gets the mob file name from the mob ref cell.
793   * A mob ref cell has a mob reference tag.
794   * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
795   * The real mob value length takes 4 bytes.
796   * The remaining part is the mob file name.
797   * @param cell The mob ref cell.
798   * @return The mob file name.
799   */
800  public static String getMobFileName(Cell cell) {
801    return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
802        cell.getValueLength() - Bytes.SIZEOF_INT);
803  }
804
805  /**
806   * Gets the table name used in the table lock.
807   * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
808   * @param tn The table name.
809   * @return The table name used in table lock.
810   */
811  public static TableName getTableLockName(TableName tn) {
812    byte[] tableName = tn.getName();
813    return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
814  }
815
816  /**
817   * Performs the mob compaction.
818   * @param conf the Configuration
819   * @param fs the file system
820   * @param tableName the table the compact
821   * @param hcd the column descriptor
822   * @param pool the thread pool
823   * @param allFiles Whether add all mob files into the compaction.
824   */
825  public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
826        ColumnFamilyDescriptor hcd, ExecutorService pool, boolean allFiles,
827        LockManager.MasterLock lock)
828      throws IOException {
829    String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
830        PartitionedMobCompactor.class.getName());
831    // instantiate the mob compactor.
832    MobCompactor compactor = null;
833    try {
834      compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
835        Configuration.class, FileSystem.class, TableName.class, ColumnFamilyDescriptor.class,
836        ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool });
837    } catch (Exception e) {
838      throw new IOException("Unable to load configured mob file compactor '" + className + "'", e);
839    }
840    // compact only for mob-enabled column.
841    // obtain a write table lock before performing compaction to avoid race condition
842    // with major compaction in mob-enabled column.
843    try {
844      lock.acquire();
845      LOG.info("start MOB compaction of files for table='{}', column='{}', allFiles={}, " +
846          "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
847      compactor.compact(allFiles);
848    } catch (Exception e) {
849      LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
850          + " in the table " + tableName.getNameAsString(), e);
851    } finally {
852      LOG.info("end MOB compaction of files for table='{}', column='{}', allFiles={}, " +
853          "compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
854      lock.release();
855    }
856  }
857
858  /**
859   * Creates a thread pool.
860   * @param conf the Configuration
861   * @return A thread pool.
862   */
863  public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
864    int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
865        MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
866    if (maxThreads == 0) {
867      maxThreads = 1;
868    }
869    final SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
870    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
871      Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
872        @Override
873        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
874          try {
875            // waiting for a thread to pick up instead of throwing exceptions.
876            queue.put(r);
877          } catch (InterruptedException e) {
878            throw new RejectedExecutionException(e);
879          }
880        }
881      });
882    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
883    return pool;
884  }
885
886  /**
887   * Checks whether this table has mob-enabled columns.
888   * @param htd The current table descriptor.
889   * @return Whether this table has mob-enabled columns.
890   */
891  public static boolean hasMobColumns(TableDescriptor htd) {
892    ColumnFamilyDescriptor[] hcds = htd.getColumnFamilies();
893    for (ColumnFamilyDescriptor hcd : hcds) {
894      if (hcd.isMobEnabled()) {
895        return true;
896      }
897    }
898    return false;
899  }
900
901  /**
902   * Indicates whether return null value when the mob file is missing or corrupt.
903   * The information is set in the attribute "empty.value.on.mobcell.miss" of scan.
904   * @param scan The current scan.
905   * @return True if the readEmptyValueOnMobCellMiss is enabled.
906   */
907  public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) {
908    byte[] readEmptyValueOnMobCellMiss =
909      scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS);
910    try {
911      return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss);
912    } catch (IllegalArgumentException e) {
913      return false;
914    }
915  }
916
917  /**
918   * Creates a mob ref delete marker.
919   * @param cell The current delete marker.
920   * @return A delete marker with the ref tag.
921   */
922  public static Cell createMobRefDeleteMarker(Cell cell) {
923    return PrivateCellUtil.createCell(cell, TagUtil.concatTags(REF_DELETE_MARKER_TAG_BYTES, cell));
924  }
925
926  /**
927   * Checks if the mob file is expired.
928   * @param column The descriptor of the current column family.
929   * @param current The current time.
930   * @param fileDate The date string parsed from the mob file name.
931   * @return True if the mob file is expired.
932   */
933  public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long current,
934      String fileDate) {
935    if (column.getMinVersions() > 0) {
936      return false;
937    }
938    long timeToLive = column.getTimeToLive();
939    if (Integer.MAX_VALUE == timeToLive) {
940      return false;
941    }
942
943    Date expireDate = new Date(current - timeToLive * 1000);
944    expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
945    try {
946      Date date = parseDate(fileDate);
947      if (date.getTime() < expireDate.getTime()) {
948        return true;
949      }
950    } catch (ParseException e) {
951      LOG.warn("Failed to parse the date " + fileDate, e);
952      return false;
953    }
954    return false;
955  }
956
957  /**
958   * fill out partition id based on compaction policy and date, threshold...
959   * @param id Partition id to be filled out
960   * @param firstDayOfCurrentMonth The first day in the current month
961   * @param firstDayOfCurrentWeek The first day in the current week
962   * @param dateStr Date string from the mob file
963   * @param policy Mob compaction policy
964   * @param calendar Calendar object
965   * @param threshold Mob compaciton threshold configured
966   * @return true if the file needs to be excluded from compaction
967   */
968  public static boolean fillPartitionId(final CompactionPartitionId id,
969      final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr,
970      final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) {
971
972    boolean skipCompcation = false;
973    id.setThreshold(threshold);
974    if (threshold <= 0) {
975      id.setDate(dateStr);
976      return skipCompcation;
977    }
978
979    long finalThreshold;
980    Date date;
981    try {
982      date = MobUtils.parseDate(dateStr);
983    } catch (ParseException e)  {
984      LOG.warn("Failed to parse date " + dateStr, e);
985      id.setDate(dateStr);
986      return true;
987    }
988
989    /* The algorithm works as follows:
990     *    For monthly policy:
991     *       1). If the file's date is in past months, apply 4 * 7 * threshold
992     *       2). If the file's date is in past weeks, apply 7 * threshold
993     *       3). If the file's date is in current week, exclude it from the compaction
994     *    For weekly policy:
995     *       1). If the file's date is in past weeks, apply 7 * threshold
996     *       2). If the file's date in currently, apply threshold
997     *    For daily policy:
998     *       1). apply threshold
999     */
1000    if (policy == MobCompactPartitionPolicy.MONTHLY) {
1001      if (date.before(firstDayOfCurrentMonth)) {
1002        // Check overflow
1003        if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) {
1004          finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold;
1005        } else {
1006          finalThreshold = Long.MAX_VALUE;
1007        }
1008        id.setThreshold(finalThreshold);
1009
1010        // set to the date for the first day of that month
1011        id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date)));
1012        return skipCompcation;
1013      }
1014    }
1015
1016    if ((policy == MobCompactPartitionPolicy.MONTHLY) ||
1017        (policy == MobCompactPartitionPolicy.WEEKLY)) {
1018      // Check if it needs to apply weekly multiplier
1019      if (date.before(firstDayOfCurrentWeek)) {
1020        // Check overflow
1021        if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) {
1022          finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold;
1023        } else {
1024          finalThreshold = Long.MAX_VALUE;
1025        }
1026        id.setThreshold(finalThreshold);
1027
1028        id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date)));
1029        return skipCompcation;
1030      } else if (policy == MobCompactPartitionPolicy.MONTHLY) {
1031        skipCompcation = true;
1032      }
1033    }
1034
1035    // Rest is daily
1036    id.setDate(dateStr);
1037    return skipCompcation;
1038  }
1039}