001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.UUID;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.function.Consumer;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellBuilderType;
038import org.apache.hadoop.hbase.CellComparator;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Tag;
044import org.apache.hadoop.hbase.TagType;
045import org.apache.hadoop.hbase.TagUtil;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.FilterList;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
052import org.apache.hadoop.hbase.mob.MobCell;
053import org.apache.hadoop.hbase.mob.MobConstants;
054import org.apache.hadoop.hbase.mob.MobFile;
055import org.apache.hadoop.hbase.mob.MobFileCache;
056import org.apache.hadoop.hbase.mob.MobFileName;
057import org.apache.hadoop.hbase.mob.MobStoreEngine;
058import org.apache.hadoop.hbase.mob.MobUtils;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.util.IdLock;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * The store implementation to save MOBs (medium objects), it extends the HStore. When a descriptor
067 * of a column family has the value "IS_MOB", it means this column family is a mob one. When a
068 * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is almost
069 * the same with the HStore except using different types of scanners. In the method of getScanner,
070 * the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a additional
071 * seeks in the mob files should be performed after the seek to HBase is done. The store implements
072 * how we save MOBs by extending HStore. When a descriptor of a column family has the value
073 * "IS_MOB", it means this column family is a mob one. When a HRegion instantiate a store for this
074 * column family, the HMobStore is created. HMobStore is almost the same with the HStore except
075 * using different types of scanners. In the method of getScanner, the MobStoreScanner and
076 * MobReversedStoreScanner are returned. In these scanners, a additional seeks in the mob files
077 * should be performed after the seek in HBase is done.
078 */
079@InterfaceAudience.Private
080public class HMobStore extends HStore {
081  private static final Logger LOG = LoggerFactory.getLogger(HMobStore.class);
082  private MobFileCache mobFileCache;
083  private Path homePath;
084  private Path mobFamilyPath;
085  private AtomicLong cellsCountCompactedToMob = new AtomicLong();
086  private AtomicLong cellsCountCompactedFromMob = new AtomicLong();
087  private AtomicLong cellsSizeCompactedToMob = new AtomicLong();
088  private AtomicLong cellsSizeCompactedFromMob = new AtomicLong();
089  private AtomicLong mobFlushCount = new AtomicLong();
090  private AtomicLong mobFlushedCellsCount = new AtomicLong();
091  private AtomicLong mobFlushedCellsSize = new AtomicLong();
092  private AtomicLong mobScanCellsCount = new AtomicLong();
093  private AtomicLong mobScanCellsSize = new AtomicLong();
094  private Map<TableName, List<Path>> map = new ConcurrentHashMap<>();
095  private final IdLock keyLock = new IdLock();
096  // When we add a MOB reference cell to the HFile, we will add 2 tags along with it
097  // 1. A ref tag with type TagType.MOB_REFERENCE_TAG_TYPE. This just denote this this cell is not
098  // original one but a ref to another MOB Cell.
099  // 2. Table name tag. It's very useful in cloning the snapshot. When reading from the cloning
100  // table, we need to find the original mob files by this table name. For details please see
101  // cloning snapshot for mob files.
102  private final byte[] refCellTags;
103
104  public HMobStore(final HRegion region, final ColumnFamilyDescriptor family,
105    final Configuration confParam, boolean warmup) throws IOException {
106    super(region, family, confParam, warmup);
107    this.mobFileCache = region.getMobFileCache();
108    this.homePath = MobUtils.getMobHome(conf);
109    this.mobFamilyPath =
110      MobUtils.getMobFamilyPath(conf, this.getTableName(), family.getNameAsString());
111    List<Path> locations = new ArrayList<>(2);
112    locations.add(mobFamilyPath);
113    TableName tn = region.getTableDescriptor().getTableName();
114    locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
115      MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
116    map.put(tn, locations);
117    List<Tag> tags = new ArrayList<>(2);
118    tags.add(MobConstants.MOB_REF_TAG);
119    Tag tableNameTag =
120      new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, getTableName().getName());
121    tags.add(tableNameTag);
122    this.refCellTags = TagUtil.fromList(tags);
123  }
124
125  /**
126   * Gets current config.
127   */
128  public Configuration getConfiguration() {
129    return this.conf;
130  }
131
132  /**
133   * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
134   * the mob files should be performed after the seek in HBase is done.
135   */
136  @Override
137  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
138    NavigableSet<byte[]> targetCols, long readPt) throws IOException {
139    if (MobUtils.isRefOnlyScan(scan)) {
140      Filter refOnlyFilter = new MobReferenceOnlyFilter();
141      Filter filter = scan.getFilter();
142      if (filter != null) {
143        scan.setFilter(new FilterList(filter, refOnlyFilter));
144      } else {
145        scan.setFilter(refOnlyFilter);
146      }
147    }
148    return scan.isReversed()
149      ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt)
150      : new MobStoreScanner(this, scanInfo, scan, targetCols, readPt);
151  }
152
153  /**
154   * Creates the mob store engine.
155   */
156  @Override
157  protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
158    CellComparator cellComparator) throws IOException {
159    MobStoreEngine engine = new MobStoreEngine();
160    engine.createComponentsOnce(conf, store, cellComparator);
161    return engine;
162  }
163
164  /**
165   * Gets the temp directory.
166   * @return The temp directory.
167   */
168  private Path getTempDir() {
169    return new Path(homePath, MobConstants.TEMP_DIR_NAME);
170  }
171
172  /**
173   * Creates the writer for the mob file in temp directory.
174   * @param date         The latest date of written cells.
175   * @param maxKeyCount  The key count.
176   * @param compression  The compression algorithm.
177   * @param startKey     The start key.
178   * @param isCompaction If the writer is used in compaction.
179   * @return The writer for the mob file. n
180   */
181  public StoreFileWriter createWriterInTmp(Date date, long maxKeyCount,
182    Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException {
183    if (startKey == null) {
184      startKey = HConstants.EMPTY_START_ROW;
185    }
186    Path path = getTempDir();
187    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
188      isCompaction, null);
189  }
190
191  /**
192   * Creates the writer for the mob file in the mob family directory.
193   * @param date         The latest date of written cells.
194   * @param maxKeyCount  The key count.
195   * @param compression  The compression algorithm.
196   * @param startKey     The start key.
197   * @param isCompaction If the writer is used in compaction.
198   * @return The writer for the mob file. n
199   */
200  public StoreFileWriter createWriter(Date date, long maxKeyCount,
201    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
202    Consumer<Path> writerCreationTracker) throws IOException {
203    if (startKey == null) {
204      startKey = HConstants.EMPTY_START_ROW;
205    }
206    Path path = getPath();
207    return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
208      isCompaction, writerCreationTracker);
209  }
210
211  /**
212   * Creates the writer for the mob file in temp directory.
213   * @param date         The date string, its format is yyyymmmdd.
214   * @param basePath     The basic path for a temp directory.
215   * @param maxKeyCount  The key count.
216   * @param compression  The compression algorithm.
217   * @param startKey     The start key.
218   * @param isCompaction If the writer is used in compaction.
219   * @return The writer for the mob file. n
220   */
221  public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
222    Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
223    Consumer<Path> writerCreationTracker) throws IOException {
224    MobFileName mobFileName =
225      MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""),
226        getHRegion().getRegionInfo().getEncodedName());
227    return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction,
228      writerCreationTracker);
229  }
230
231  /**
232   * Creates the writer for the mob file in temp directory.
233   * @param mobFileName  The mob file name.
234   * @param basePath     The basic path for a temp directory.
235   * @param maxKeyCount  The key count.
236   * @param compression  The compression algorithm.
237   * @param isCompaction If the writer is used in compaction.
238   * @return The writer for the mob file. n
239   */
240
241  public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
242    Compression.Algorithm compression, boolean isCompaction, Consumer<Path> writerCreationTracker)
243    throws IOException {
244    return MobUtils.createWriter(conf, getFileSystem(), getColumnFamilyDescriptor(),
245      new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, getCacheConfig(),
246      getStoreContext().getEncryptionContext(), StoreUtils.getChecksumType(conf),
247      StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(), BloomType.NONE,
248      isCompaction, writerCreationTracker);
249  }
250
251  /**
252   * Commits the mob file.
253   * @param sourceFile The source file.
254   * @param targetPath The directory path where the source file is renamed to. n
255   */
256  public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
257    if (sourceFile == null) {
258      return;
259    }
260    Path dstPath = new Path(targetPath, sourceFile.getName());
261    validateMobFile(sourceFile);
262    if (sourceFile.equals(targetPath)) {
263      LOG.info("File is already in the destination dir: {}", sourceFile);
264      return;
265    }
266    LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile, dstPath);
267    Path parent = dstPath.getParent();
268    if (!getFileSystem().exists(parent)) {
269      getFileSystem().mkdirs(parent);
270    }
271    if (!getFileSystem().rename(sourceFile, dstPath)) {
272      throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
273    }
274  }
275
276  /**
277   * Validates a mob file by opening and closing it.
278   * @param path the path to the mob file
279   */
280  private void validateMobFile(Path path) throws IOException {
281    HStoreFile storeFile = null;
282    try {
283      storeFile = new HStoreFile(getFileSystem(), path, conf, getCacheConfig(), BloomType.NONE,
284        isPrimaryReplicaStore());
285      storeFile.initReader();
286    } catch (IOException e) {
287      LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
288      throw e;
289    } finally {
290      if (storeFile != null) {
291        storeFile.closeStoreFile(false);
292      }
293    }
294  }
295
296  /**
297   * Reads the cell from the mob file, and the read point does not count. This is used for
298   * DefaultMobStoreCompactor where we can read empty value for the missing cell.
299   * @param reference   The cell found in the HBase, its value is a path to a mob file.
300   * @param cacheBlocks Whether the scanner should cache blocks.
301   * @return The cell found in the mob file. n
302   */
303  public MobCell resolve(Cell reference, boolean cacheBlocks) throws IOException {
304    return resolve(reference, cacheBlocks, -1, true);
305  }
306
307  /**
308   * Reads the cell from the mob file with readEmptyValueOnMobCellMiss
309   * @param reference                   The cell found in the HBase, its value is a path to a mob
310   *                                    file.
311   * @param cacheBlocks                 Whether the scanner should cache blocks.
312   * @param readEmptyValueOnMobCellMiss should return empty mob cell if reference can not be
313   *                                    resolved.
314   * @return The cell found in the mob file. n
315   */
316  public MobCell resolve(Cell reference, boolean cacheBlocks, boolean readEmptyValueOnMobCellMiss)
317    throws IOException {
318    return resolve(reference, cacheBlocks, -1, readEmptyValueOnMobCellMiss);
319  }
320
321  /**
322   * Reads the cell from the mob file.
323   * @param reference                   The cell found in the HBase, its value is a path to a mob
324   *                                    file.
325   * @param cacheBlocks                 Whether the scanner should cache blocks.
326   * @param readPt                      the read point.
327   * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or
328   *                                    corrupt.
329   * @return The cell found in the mob file. n
330   */
331  public MobCell resolve(Cell reference, boolean cacheBlocks, long readPt,
332    boolean readEmptyValueOnMobCellMiss) throws IOException {
333    MobCell mobCell = null;
334    if (MobUtils.hasValidMobRefCellValue(reference)) {
335      String fileName = MobUtils.getMobFileName(reference);
336      Optional<TableName> tableName = MobUtils.getTableName(reference);
337      if (tableName.isPresent()) {
338        List<Path> locations = getLocations(tableName.get());
339        mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
340          readEmptyValueOnMobCellMiss);
341      }
342    }
343    if (mobCell == null) {
344      LOG.warn("The Cell result is null, assemble a new Cell with the same row,family,"
345        + "qualifier,timestamp,type and tags but with an empty value to return.");
346      Cell cell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
347        .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
348        .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
349          reference.getFamilyLength())
350        .setQualifier(reference.getQualifierArray(), reference.getQualifierOffset(),
351          reference.getQualifierLength())
352        .setTimestamp(reference.getTimestamp()).setType(reference.getTypeByte())
353        .setValue(HConstants.EMPTY_BYTE_ARRAY)
354        .setTags(reference.getTagsArray(), reference.getTagsOffset(), reference.getTagsLength())
355        .build();
356      mobCell = new MobCell(cell);
357    }
358    return mobCell;
359  }
360
361  /**
362   * @param tableName to look up locations for, can not be null
363   * @return a list of location in order of working dir, archive dir. will not be null.
364   */
365  public List<Path> getLocations(TableName tableName) throws IOException {
366    List<Path> locations = map.get(tableName);
367    if (locations == null) {
368      IdLock.Entry lockEntry = keyLock.getLockEntry(tableName.hashCode());
369      try {
370        locations = map.get(tableName);
371        if (locations == null) {
372          locations = new ArrayList<>(2);
373          locations.add(MobUtils.getMobFamilyPath(conf, tableName,
374            getColumnFamilyDescriptor().getNameAsString()));
375          locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tableName,
376            MobUtils.getMobRegionInfo(tableName).getEncodedName(),
377            getColumnFamilyDescriptor().getNameAsString()));
378          map.put(tableName, locations);
379        }
380      } finally {
381        keyLock.releaseLockEntry(lockEntry);
382      }
383    }
384    return locations;
385  }
386
387  /**
388   * Reads the cell from a mob file. The mob file might be located in different directories. 1. The
389   * working directory. 2. The archive directory. Reads the cell from the files located in both of
390   * the above directories.
391   * @param locations                   The possible locations where the mob files are saved.
392   * @param fileName                    The file to be read.
393   * @param search                      The cell to be searched.
394   * @param cacheMobBlocks              Whether the scanner should cache blocks.
395   * @param readPt                      the read point.
396   * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or
397   *                                    corrupt.
398   * @return The found cell. Null if there's no such a cell. n
399   */
400  private MobCell readCell(List<Path> locations, String fileName, Cell search,
401    boolean cacheMobBlocks, long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
402    FileSystem fs = getFileSystem();
403    Throwable throwable = null;
404    for (Path location : locations) {
405      MobFile file = null;
406      Path path = new Path(location, fileName);
407      try {
408        file = mobFileCache.openFile(fs, path, getCacheConfig());
409        return readPt != -1
410          ? file.readCell(search, cacheMobBlocks, readPt)
411          : file.readCell(search, cacheMobBlocks);
412      } catch (IOException e) {
413        mobFileCache.evictFile(fileName);
414        throwable = e;
415        if (
416          (e instanceof FileNotFoundException) || (e.getCause() instanceof FileNotFoundException)
417        ) {
418          LOG.debug("Fail to read the cell, the mob file " + path + " doesn't exist", e);
419        } else if (e instanceof CorruptHFileException) {
420          LOG.error("The mob file " + path + " is corrupt", e);
421          break;
422        } else {
423          throw e;
424        }
425      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
426        mobFileCache.evictFile(fileName);
427        LOG.debug("Fail to read the cell", e);
428        throwable = e;
429      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
430        mobFileCache.evictFile(fileName);
431        LOG.debug("Fail to read the cell", e);
432        throwable = e;
433      } finally {
434        if (file != null) {
435          mobFileCache.closeFile(file);
436        }
437      }
438    }
439    LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
440      + " or it is corrupt");
441    if (readEmptyValueOnMobCellMiss) {
442      return null;
443    } else if (
444      (throwable instanceof FileNotFoundException)
445        || (throwable.getCause() instanceof FileNotFoundException)
446    ) {
447      // The region is re-opened when FileNotFoundException is thrown.
448      // This is not necessary when MOB files cannot be found, because the store files
449      // in a region only contain the references to MOB files and a re-open on a region
450      // doesn't help fix the lost MOB files.
451      throw new DoNotRetryIOException(throwable);
452    } else if (throwable instanceof IOException) {
453      throw (IOException) throwable;
454    } else {
455      throw new IOException(throwable);
456    }
457  }
458
459  /**
460   * Gets the mob file path.
461   * @return The mob file path.
462   */
463  public Path getPath() {
464    return mobFamilyPath;
465  }
466
467  public void updateCellsCountCompactedToMob(long count) {
468    cellsCountCompactedToMob.addAndGet(count);
469  }
470
471  public long getCellsCountCompactedToMob() {
472    return cellsCountCompactedToMob.get();
473  }
474
475  public void updateCellsCountCompactedFromMob(long count) {
476    cellsCountCompactedFromMob.addAndGet(count);
477  }
478
479  public long getCellsCountCompactedFromMob() {
480    return cellsCountCompactedFromMob.get();
481  }
482
483  public void updateCellsSizeCompactedToMob(long size) {
484    cellsSizeCompactedToMob.addAndGet(size);
485  }
486
487  public long getCellsSizeCompactedToMob() {
488    return cellsSizeCompactedToMob.get();
489  }
490
491  public void updateCellsSizeCompactedFromMob(long size) {
492    cellsSizeCompactedFromMob.addAndGet(size);
493  }
494
495  public long getCellsSizeCompactedFromMob() {
496    return cellsSizeCompactedFromMob.get();
497  }
498
499  public void updateMobFlushCount() {
500    mobFlushCount.incrementAndGet();
501  }
502
503  public long getMobFlushCount() {
504    return mobFlushCount.get();
505  }
506
507  public void updateMobFlushedCellsCount(long count) {
508    mobFlushedCellsCount.addAndGet(count);
509  }
510
511  public long getMobFlushedCellsCount() {
512    return mobFlushedCellsCount.get();
513  }
514
515  public void updateMobFlushedCellsSize(long size) {
516    mobFlushedCellsSize.addAndGet(size);
517  }
518
519  public long getMobFlushedCellsSize() {
520    return mobFlushedCellsSize.get();
521  }
522
523  public void updateMobScanCellsCount(long count) {
524    mobScanCellsCount.addAndGet(count);
525  }
526
527  public long getMobScanCellsCount() {
528    return mobScanCellsCount.get();
529  }
530
531  public void updateMobScanCellsSize(long size) {
532    mobScanCellsSize.addAndGet(size);
533  }
534
535  public long getMobScanCellsSize() {
536    return mobScanCellsSize.get();
537  }
538
539  public byte[] getRefCellTags() {
540    return this.refCellTags;
541  }
542
543}