001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.UUID;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.function.Consumer;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.CellBuilderType;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.ExtendedCell;
040import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Tag;
044import org.apache.hadoop.hbase.TagType;
045import org.apache.hadoop.hbase.TagUtil;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.FilterList;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
052import org.apache.hadoop.hbase.mob.MobCell;
053import org.apache.hadoop.hbase.mob.MobConstants;
054import org.apache.hadoop.hbase.mob.MobFile;
055import org.apache.hadoop.hbase.mob.MobFileCache;
056import org.apache.hadoop.hbase.mob.MobFileName;
057import org.apache.hadoop.hbase.mob.MobStoreEngine;
058import org.apache.hadoop.hbase.mob.MobUtils;
059import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
061import org.apache.hadoop.hbase.util.HFileArchiveUtil;
062import org.apache.hadoop.hbase.util.IdLock;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * The store implementation to save MOBs (medium objects), it extends the HStore. When a descriptor
069 * of a column family has the value "IS_MOB", it means this column family is a mob one. When a
070 * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is almost
071 * the same with the HStore except using different types of scanners. In the method of getScanner,
072 * the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a additional
073 * seeks in the mob files should be performed after the seek to HBase is done. The store implements
074 * how we save MOBs by extending HStore. When a descriptor of a column family has the value
075 * "IS_MOB", it means this column family is a mob one. When a HRegion instantiate a store for this
076 * column family, the HMobStore is created. HMobStore is almost the same with the HStore except
077 * using different types of scanners. In the method of getScanner, the MobStoreScanner and
078 * MobReversedStoreScanner are returned. In these scanners, a additional seeks in the mob files
079 * should be performed after the seek in HBase is done.
080 */
081@InterfaceAudience.Private
082public class HMobStore extends HStore {
083  private static final Logger LOG = LoggerFactory.getLogger(HMobStore.class);
084  private MobFileCache mobFileCache;
085  private Path homePath;
086  private Path mobFamilyPath;
087  private AtomicLong cellsCountCompactedToMob = new AtomicLong();
088  private AtomicLong cellsCountCompactedFromMob = new AtomicLong();
089  private AtomicLong cellsSizeCompactedToMob = new AtomicLong();
090  private AtomicLong cellsSizeCompactedFromMob = new AtomicLong();
091  private AtomicLong mobFlushCount = new AtomicLong();
092  private AtomicLong mobFlushedCellsCount = new AtomicLong();
093  private AtomicLong mobFlushedCellsSize = new AtomicLong();
094  private AtomicLong mobScanCellsCount = new AtomicLong();
095  private AtomicLong mobScanCellsSize = new AtomicLong();
096  private Map<TableName, List<Path>> map = new ConcurrentHashMap<>();
097  private final IdLock keyLock = new IdLock();
098  // When we add a MOB reference cell to the HFile, we will add 2 tags along with it
099  // 1. A ref tag with type TagType.MOB_REFERENCE_TAG_TYPE. This just denote this this cell is not
100  // original one but a ref to another MOB Cell.
101  // 2. Table name tag. It's very useful in cloning the snapshot. When reading from the cloning
102  // table, we need to find the original mob files by this table name. For details please see
103  // cloning snapshot for mob files.
104  private final byte[] refCellTags;
105  private StoreFileTracker mobStoreSFT = null;
106
107  public HMobStore(final HRegion region, final ColumnFamilyDescriptor family,
108    final Configuration confParam, boolean warmup) throws IOException {
109    super(region, family, confParam, warmup);
110    this.mobFileCache = region.getMobFileCache();
111    this.homePath = MobUtils.getMobHome(conf);
112    this.mobFamilyPath =
113      MobUtils.getMobFamilyPath(conf, this.getTableName(), getColumnFamilyName());
114    List<Path> locations = new ArrayList<>(2);
115    locations.add(mobFamilyPath);
116    TableName tn = region.getTableDescriptor().getTableName();
117    locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
118      MobUtils.getMobRegionInfo(tn).getEncodedName(), getColumnFamilyName()));
119    map.put(tn, locations);
120    List<Tag> tags = new ArrayList<>(2);
121    tags.add(MobConstants.MOB_REF_TAG);
122    Tag tableNameTag =
123      new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, getTableName().getName());
124    tags.add(tableNameTag);
125    this.refCellTags = TagUtil.fromList(tags);
126  }
127
128  /**
129   * Gets current config.
130   */
131  public Configuration getConfiguration() {
132    return this.conf;
133  }
134
135  /**
136   * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
137   * the mob files should be performed after the seek in HBase is done.
138   */
139  @Override
140  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
141    NavigableSet<byte[]> targetCols, long readPt) throws IOException {
142    if (MobUtils.isRefOnlyScan(scan)) {
143      Filter refOnlyFilter = new MobReferenceOnlyFilter();
144      Filter filter = scan.getFilter();
145      if (filter != null) {
146        scan.setFilter(new FilterList(filter, refOnlyFilter));
147      } else {
148        scan.setFilter(refOnlyFilter);
149      }
150    }
151    return scan.isReversed()
152      ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt)
153      : new MobStoreScanner(this, scanInfo, scan, targetCols, readPt);
154  }
155
156  /**
157   * Creates the mob store engine.
158   */
159  @Override
160  protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
161    CellComparator cellComparator) throws IOException {
162    MobStoreEngine engine = new MobStoreEngine();
163    engine.createComponentsOnce(conf, store, cellComparator);
164    return engine;
165  }
166
167  /**
168   * Gets the temp directory.
169   * @return The temp directory.
170   */
171  private Path getTempDir() {
172    return new Path(homePath, MobConstants.TEMP_DIR_NAME);
173  }
174
175  /**
176   * Creates the writer for the mob file in temp directory.
177   * @param date         The latest date of written cells.
178   * @param maxKeyCount  The key count.
179   * @param compression  The compression algorithm.
180   * @param startKey     The start key.
181   * @param isCompaction If the writer is used in compaction.
182   * @return The writer for the mob file.
183   */
184  public StoreFileWriter createWriterInTmp(Date date, long maxKeyCount,
185    Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException {
186    if (startKey == null) {
187      startKey = HConstants.EMPTY_START_ROW;
188    }
189    Path path = getTempDir();
190    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
191      isCompaction, null);
192  }
193
194  /**
195   * Creates the writer for the mob file in the mob family directory.
196   * @param date         The latest date of written cells.
197   * @param maxKeyCount  The key count.
198   * @param compression  The compression algorithm.
199   * @param startKey     The start key.
200   * @param isCompaction If the writer is used in compaction.
201   * @return The writer for the mob file.
202   */
203  public StoreFileWriter createWriter(Date date, long maxKeyCount,
204    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
205    Consumer<Path> writerCreationTracker) throws IOException {
206    if (startKey == null) {
207      startKey = HConstants.EMPTY_START_ROW;
208    }
209    Path path = getPath();
210    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
211      isCompaction, writerCreationTracker);
212  }
213
214  /**
215   * Creates the writer for the mob file in temp directory.
216   * @param date         The date string, its format is yyyymmmdd.
217   * @param basePath     The basic path for a temp directory.
218   * @param maxKeyCount  The key count.
219   * @param compression  The compression algorithm.
220   * @param startKey     The start key.
221   * @param isCompaction If the writer is used in compaction.
222   * @return The writer for the mob file.
223   */
224  public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
225    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
226    Consumer<Path> writerCreationTracker) throws IOException {
227    MobFileName mobFileName =
228      MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""),
229        getHRegion().getRegionInfo().getEncodedName());
230    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction,
231      writerCreationTracker);
232  }
233
234  /**
235   * Creates the writer for the mob file in temp directory.
236   * @param mobFileName  The mob file name.
237   * @param basePath     The basic path for a temp directory.
238   * @param maxKeyCount  The key count.
239   * @param compression  The compression algorithm.
240   * @param isCompaction If the writer is used in compaction.
241   * @return The writer for the mob file.
242   */
243
244  public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
245    Compression.Algorithm compression, boolean isCompaction, Consumer<Path> writerCreationTracker)
246    throws IOException {
247    return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(),
248      new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(),
249      getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf),
250      StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE,
251      isCompaction, writerCreationTracker);
252  }
253
254  /**
255   * Commits the mob file.
256   * @param sourceFile The source file.
257   * @param targetPath The directory path where the source file is renamed to.
258   */
259  public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
260    if (sourceFile == null) {
261      return;
262    }
263    Path dstPath = new Path(targetPath, sourceFile.getName());
264    validateMobFile(sourceFile);
265    if (sourceFile.equals(targetPath)) {
266      LOG.info("File is already in the destination dir: {}", sourceFile);
267      return;
268    }
269    LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath);
270    Path parent = dstPath.getParent();
271    if (!getFileSystem().exists(parent)) {
272      getFileSystem().mkdirs(parent);
273    }
274    if (!getFileSystem().rename(sourceFile, dstPath)) {
275      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
276    }
277  }
278
279  /**
280   * Validates a mob file by opening and closing it.
281   * @param path the path to the mob file
282   */
283  private void validateMobFile(Path path) throws IOException {
284    HStoreFile storeFile = null;
285    try {
286      StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, getStoreContext());
287      storeFile = new HStoreFile(getFileSystem(), path, conf, getCacheConfig(), BloomType.NONE,
288        isPrimaryReplicaStore(), sft);
289      storeFile.initReader();
290    } catch (IOException e) {
291      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
292      throw e;
293    } finally {
294      if (storeFile != null) {
295        storeFile.closeStoreFile(false);
296      }
297    }
298  }
299
300  /**
301   * Reads the cell from the mob file, and the read point does not count. This is used for
302   * DefaultMobStoreCompactor where we can read empty value for the missing cell.
303   * @param reference   The cell found in the HBase, its value is a path to a mob file.
304   * @param cacheBlocks Whether the scanner should cache blocks.
305   * @return The cell found in the mob file.
306   */
307  public MobCell resolve(ExtendedCell reference, boolean cacheBlocks) throws IOException {
308    return resolve(reference, cacheBlocks, -1, true);
309  }
310
311  /**
312   * Reads the cell from the mob file with readEmptyValueOnMobCellMiss
313   * @param reference                   The cell found in the HBase, its value is a path to a mob
314   *                                    file.
315   * @param cacheBlocks                 Whether the scanner should cache blocks.
316   * @param readEmptyValueOnMobCellMiss should return empty mob cell if reference can not be
317   *                                    resolved.
318   * @return The cell found in the mob file.
319   */
320  public MobCell resolve(ExtendedCell reference, boolean cacheBlocks,
321    boolean readEmptyValueOnMobCellMiss) throws IOException {
322    return resolve(reference, cacheBlocks, -1, readEmptyValueOnMobCellMiss);
323  }
324
325  /**
326   * Reads the cell from the mob file.
327   * @param reference                   The cell found in the HBase, its value is a path to a mob
328   *                                    file.
329   * @param cacheBlocks                 Whether the scanner should cache blocks.
330   * @param readPt                      the read point.
331   * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or
332   *                                    corrupt.
333   * @return The cell found in the mob file.
334   */
335  public MobCell resolve(ExtendedCell reference, boolean cacheBlocks, long readPt,
336    boolean readEmptyValueOnMobCellMiss) throws IOException {
337    MobCell mobCell = null;
338    if (MobUtils.hasValidMobRefCellValue(reference)) {
339      String fileName = MobUtils.getMobFileName(reference);
340      Optional<TableName> tableName = MobUtils.getTableName(reference);
341      if (tableName.isPresent()) {
342        List<Path> locations = getLocations(tableName.get());
343        mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
344          readEmptyValueOnMobCellMiss);
345      }
346    }
347    if (mobCell == null) {
348      LOG.warn("The Cell result is null, assemble a new Cell with the same row,family,"
349        + "qualifier,timestamp,type and tags but with an empty value to return.");
350      ExtendedCell cell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
351        .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
352        .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
353          reference.getFamilyLength())
354        .setQualifier(reference.getQualifierArray(), reference.getQualifierOffset(),
355          reference.getQualifierLength())
356        .setTimestamp(reference.getTimestamp()).setType(reference.getTypeByte())
357        .setValue(HConstants.EMPTY_BYTE_ARRAY)
358        .setTags(reference.getTagsArray(), reference.getTagsOffset(), reference.getTagsLength())
359        .build();
360      mobCell = new MobCell(cell);
361    }
362    return mobCell;
363  }
364
365  /**
366   * @param tableName to look up locations for, can not be null
367   * @return a list of location in order of working dir, archive dir. will not be null.
368   */
369  public List<Path> getLocations(TableName tableName) throws IOException {
370    List<Path> locations = map.get(tableName);
371    if (locations == null) {
372      IdLock.Entry lockEntry = keyLock.getLockEntry(tableName.hashCode());
373      try {
374        locations = map.get(tableName);
375        if (locations == null) {
376          locations = new ArrayList<>(2);
377          locations.add(MobUtils.getMobFamilyPath(conf, tableName,
378            getColumnFamilyDescriptor().getNameAsString()));
379          locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tableName,
380            MobUtils.getMobRegionInfo(tableName).getEncodedName(),
381            getColumnFamilyDescriptor().getNameAsString()));
382          map.put(tableName, locations);
383        }
384      } finally {
385        keyLock.releaseLockEntry(lockEntry);
386      }
387    }
388    return locations;
389  }
390
391  /**
392   * Reads the cell from a mob file. The mob file might be located in different directories. 1. The
393   * working directory. 2. The archive directory. Reads the cell from the files located in both of
394   * the above directories.
395   * @param locations                   The possible locations where the mob files are saved.
396   * @param fileName                    The file to be read.
397   * @param search                      The cell to be searched.
398   * @param cacheMobBlocks              Whether the scanner should cache blocks.
399   * @param readPt                      the read point.
400   * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or
401   *                                    corrupt.
402   * @return The found cell. Null if there's no such a cell.
403   */
404  private MobCell readCell(List<Path> locations, String fileName, ExtendedCell search,
405    boolean cacheMobBlocks, long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
406    FileSystem fs = getFileSystem();
407    IOException ioe = null;
408    for (Path location : locations) {
409      MobFile file = null;
410      Path path = new Path(location, fileName);
411      try {
412        file = mobFileCache.openFile(fs, path, getCacheConfig(), this.getStoreContext());
413        return readPt != -1
414          ? file.readCell(search, cacheMobBlocks, readPt)
415          : file.readCell(search, cacheMobBlocks);
416      } catch (IOException e) {
417        mobFileCache.evictFile(fileName);
418        ioe = e;
419        if (
420          (e instanceof FileNotFoundException) || (e.getCause() instanceof FileNotFoundException)
421        ) {
422          LOG.debug("Fail to read the cell, the mob file " + path + " doesn't exist", e);
423        } else if (e instanceof CorruptHFileException) {
424          LOG.error("The mob file " + path + " is corrupt", e);
425          break;
426        } else {
427          throw e;
428        }
429      } finally {
430        if (file != null) {
431          mobFileCache.closeFile(file);
432        }
433      }
434    }
435    LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
436      + " or it is corrupt");
437    if (readEmptyValueOnMobCellMiss) {
438      return null;
439    } else if (
440      (ioe instanceof FileNotFoundException) || (ioe.getCause() instanceof FileNotFoundException)
441    ) {
442      // The region is re-opened when FileNotFoundException is thrown.
443      // This is not necessary when MOB files cannot be found, because the store files
444      // in a region only contain the references to MOB files and a re-open on a region
445      // doesn't help fix the lost MOB files.
446      throw new DoNotRetryIOException(ioe);
447    } else {
448      throw ioe;
449    }
450  }
451
452  /**
453   * Gets the mob file path.
454   * @return The mob file path.
455   */
456  public Path getPath() {
457    return mobFamilyPath;
458  }
459
460  public void updateCellsCountCompactedToMob(long count) {
461    cellsCountCompactedToMob.addAndGet(count);
462  }
463
464  public long getCellsCountCompactedToMob() {
465    return cellsCountCompactedToMob.get();
466  }
467
468  public void updateCellsCountCompactedFromMob(long count) {
469    cellsCountCompactedFromMob.addAndGet(count);
470  }
471
472  public long getCellsCountCompactedFromMob() {
473    return cellsCountCompactedFromMob.get();
474  }
475
476  public void updateCellsSizeCompactedToMob(long size) {
477    cellsSizeCompactedToMob.addAndGet(size);
478  }
479
480  public long getCellsSizeCompactedToMob() {
481    return cellsSizeCompactedToMob.get();
482  }
483
484  public void updateCellsSizeCompactedFromMob(long size) {
485    cellsSizeCompactedFromMob.addAndGet(size);
486  }
487
488  public long getCellsSizeCompactedFromMob() {
489    return cellsSizeCompactedFromMob.get();
490  }
491
492  public void updateMobFlushCount() {
493    mobFlushCount.incrementAndGet();
494  }
495
496  public long getMobFlushCount() {
497    return mobFlushCount.get();
498  }
499
500  public void updateMobFlushedCellsCount(long count) {
501    mobFlushedCellsCount.addAndGet(count);
502  }
503
504  public long getMobFlushedCellsCount() {
505    return mobFlushedCellsCount.get();
506  }
507
508  public void updateMobFlushedCellsSize(long size) {
509    mobFlushedCellsSize.addAndGet(size);
510  }
511
512  public long getMobFlushedCellsSize() {
513    return mobFlushedCellsSize.get();
514  }
515
516  public void updateMobScanCellsCount(long count) {
517    mobScanCellsCount.addAndGet(count);
518  }
519
520  public long getMobScanCellsCount() {
521    return mobScanCellsCount.get();
522  }
523
524  public void updateMobScanCellsSize(long size) {
525    mobScanCellsSize.addAndGet(size);
526  }
527
528  public long getMobScanCellsSize() {
529    return mobScanCellsSize.get();
530  }
531
532  public byte[] getRefCellTags() {
533    return this.refCellTags;
534  }
535
536}