001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.BufferedInputStream;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.List;
029import java.util.regex.Matcher;
030import org.apache.commons.io.IOUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.backup.HFileArchiver;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.io.HFileLink;
042import org.apache.hadoop.hbase.io.Reference;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.io.crypto.Encryption;
045import org.apache.hadoop.hbase.io.hfile.CacheConfig;
046import org.apache.hadoop.hbase.io.hfile.HFile;
047import org.apache.hadoop.hbase.io.hfile.HFileContext;
048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
049import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
050import org.apache.hadoop.hbase.regionserver.HStoreFile;
051import org.apache.hadoop.hbase.regionserver.StoreContext;
052import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
053import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
054import org.apache.hadoop.hbase.regionserver.StoreUtils;
055import org.apache.hadoop.hbase.security.access.AbstractReadOnlyController;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
064
065/**
066 * Base class for all store file tracker.
067 * <p/>
068 * Mainly used to place the common logic to skip persistent for secondary replicas.
069 */
070@InterfaceAudience.Private
071abstract class StoreFileTrackerBase implements StoreFileTracker {
072
073  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
074
075  protected final Configuration conf;
076
077  protected final boolean isPrimaryReplica;
078
079  protected final StoreContext ctx;
080
081  private volatile boolean cacheOnWriteLogged;
082
083  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
084    this.conf = conf;
085    this.isPrimaryReplica = isPrimaryReplica;
086    this.ctx = ctx;
087  }
088
089  private boolean isReadOnlyEnabled() {
090    return conf.getBoolean(HConstants.HBASE_GLOBAL_READONLY_ENABLED_KEY,
091      HConstants.HBASE_GLOBAL_READONLY_ENABLED_DEFAULT);
092  }
093
094  private boolean isNonWritableTableWhenReadOnlyMode() {
095    return isReadOnlyEnabled()
096      && !AbstractReadOnlyController.isWritableInReadOnlyMode(ctx.getTableName());
097  }
098
099  @Override
100  public final List<StoreFileInfo> load() throws IOException {
101    return doLoadStoreFiles(!isPrimaryReplica || isNonWritableTableWhenReadOnlyMode());
102  }
103
104  @Override
105  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
106    if (isPrimaryReplica && !isNonWritableTableWhenReadOnlyMode()) {
107      doAddNewStoreFiles(newFiles);
108    }
109  }
110
111  @Override
112  public final void replace(Collection<StoreFileInfo> compactedFiles,
113    Collection<StoreFileInfo> newFiles) throws IOException {
114    if (isPrimaryReplica && !isNonWritableTableWhenReadOnlyMode()) {
115      doAddCompactionResults(compactedFiles, newFiles);
116    }
117  }
118
119  @Override
120  public final void set(List<StoreFileInfo> files) throws IOException {
121    if (isPrimaryReplica && !isNonWritableTableWhenReadOnlyMode()) {
122      doSetStoreFiles(files);
123    }
124  }
125
126  @Override
127  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
128    builder.setValue(TRACKER_IMPL, getTrackerName());
129    return builder;
130  }
131
132  protected final String getTrackerName() {
133    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
134  }
135
136  private HFileContext createFileContext(Compression.Algorithm compression,
137    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
138    if (compression == null) {
139      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
140    }
141    ColumnFamilyDescriptor family = ctx.getFamily();
142    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
143      .withIncludesTags(includesTag).withCompression(compression)
144      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
145      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
146      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
147      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
148      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
149      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator())
150      .withIndexBlockEncoding(family.getIndexBlockEncoding()).build();
151    return hFileContext;
152  }
153
154  @Override
155  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
156    if (!isPrimaryReplica || isNonWritableTableWhenReadOnlyMode()) {
157      throw new IllegalStateException(
158        "Should not call create writer on secondary replicas or in read-only mode");
159    }
160    // creating new cache config for each new writer
161    final CacheConfig cacheConf = ctx.getCacheConf();
162    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
163    long totalCompactedFilesSize = params.totalCompactedFilesSize();
164    if (params.isCompaction()) {
165      // Don't cache data on write on compactions, unless specifically configured to do so
166      // Cache only when total file size remains lower than configured threshold
167      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
168      // if data blocks are to be cached on write
169      // during compaction, we should forcefully
170      // cache index and bloom blocks as well
171      if (
172        cacheCompactedBlocksOnWrite
173          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
174      ) {
175        writerCacheConf.enableCacheOnWrite();
176        if (!cacheOnWriteLogged) {
177          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
178            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
179          cacheOnWriteLogged = true;
180        }
181      } else {
182        writerCacheConf.setCacheDataOnWrite(false);
183        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
184          // checking condition once again for logging
185          LOG.debug(
186            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
187              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
188            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
189        }
190      }
191    } else {
192      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
193      if (shouldCacheDataOnWrite) {
194        writerCacheConf.enableCacheOnWrite();
195        if (!cacheOnWriteLogged) {
196          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
197            + "Index blocks and Bloom filter blocks", this);
198          cacheOnWriteLogged = true;
199        }
200      }
201    }
202    Encryption.Context encryptionContext = ctx.getEncryptionContext();
203    HFileContext hFileContext = createFileContext(params.compression(),
204      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
205    Path outputDir;
206    if (requireWritingToTmpDirFirst()) {
207      outputDir =
208        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
209    } else {
210      outputDir = ctx.getFamilyStoreDirectoryPath();
211    }
212    StoreFileWriter.Builder builder =
213      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
214        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
215        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
216        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
217        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
218        .withFileStoragePolicy(params.fileStoragePolicy())
219        .withWriterCreationTracker(params.writerCreationTracker())
220        .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior())
221        .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction());
222    return builder.build();
223  }
224
225  @Override
226  public Reference createReference(Reference reference, Path path) throws IOException {
227    FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false);
228    try {
229      out.write(reference.toByteArray());
230    } finally {
231      out.close();
232    }
233    return reference;
234  }
235
236  @Override
237  public Reference createAndCommitReference(Reference reference, Path path) throws IOException {
238    return createReference(reference, path);
239  }
240
241  /**
242   * Returns true if the specified family has reference files
243   * @param familyName Column Family Name
244   * @return true if family contains reference files
245   */
246  public boolean hasReferences() throws IOException {
247    Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString());
248    FileStatus[] files =
249      CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir);
250    if (files != null) {
251      for (FileStatus stat : files) {
252        if (stat.isDirectory()) {
253          continue;
254        }
255        if (StoreFileInfo.isReference(stat.getPath())) {
256          LOG.trace("Reference {}", stat.getPath());
257          return true;
258        }
259      }
260    }
261    return false;
262  }
263
264  @Override
265  public Reference readReference(final Path p) throws IOException {
266    InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p);
267    try {
268      // I need to be able to move back in the stream if this is not a pb serialization so I can
269      // do the Writable decoding instead.
270      in = in.markSupported() ? in : new BufferedInputStream(in);
271      int pblen = ProtobufUtil.lengthOfPBMagic();
272      in.mark(pblen);
273      byte[] pbuf = new byte[pblen];
274      IOUtils.readFully(in, pbuf, 0, pblen);
275      // WATCHOUT! Return in middle of function!!!
276      if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
277        return Reference.convert(
278          org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in));
279      }
280      // Else presume Writables. Need to reset the stream since it didn't start w/ pb.
281      // We won't bother rewriting thie Reference as a pb since Reference is transitory.
282      in.reset();
283      Reference r = new Reference();
284      DataInputStream dis = new DataInputStream(in);
285      // Set in = dis so it gets the close below in the finally on our way out.
286      in = dis;
287      r.readFields(dis);
288      return r;
289    } finally {
290      in.close();
291    }
292  }
293
294  @Override
295  public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica)
296    throws IOException {
297    return getStoreFileInfo(null, initialPath, primaryReplica);
298  }
299
300  @Override
301  public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath,
302    boolean primaryReplica) throws IOException {
303    FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem();
304    assert fs != null;
305    assert initialPath != null;
306    assert conf != null;
307    Reference reference = null;
308    HFileLink link = null;
309    long createdTimestamp = 0;
310    long size = 0;
311    Path p = initialPath;
312    if (HFileLink.isHFileLink(p)) {
313      // HFileLink
314      reference = null;
315      link = HFileLink.buildFromHFileLinkPattern(conf, p);
316      LOG.trace("{} is a link", p);
317    } else if (StoreFileInfo.isReference(p)) {
318      reference = readReference(p);
319      Path referencePath = StoreFileInfo.getReferredToFile(p);
320      if (HFileLink.isHFileLink(referencePath)) {
321        // HFileLink Reference
322        link = HFileLink.buildFromHFileLinkPattern(conf, referencePath);
323      } else {
324        // Reference
325        link = null;
326      }
327      LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath);
328    } else
329      if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) {
330        // HFile
331        if (fileStatus != null) {
332          createdTimestamp = fileStatus.getModificationTime();
333          size = fileStatus.getLen();
334        } else {
335          FileStatus fStatus = fs.getFileStatus(initialPath);
336          createdTimestamp = fStatus.getModificationTime();
337          size = fStatus.getLen();
338        }
339      } else {
340        throw new IOException("path=" + p + " doesn't look like a valid StoreFile");
341      }
342    return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link,
343      isPrimaryReplica);
344  }
345
346  public HFileLink createAndCommitHFileLink(final TableName linkedTable, final String linkedRegion,
347    final String hfileName, final boolean createBackRef) throws IOException {
348    HFileLink hFileLink = createHFileLink(linkedTable, linkedRegion, hfileName, createBackRef);
349    Path path = new Path(ctx.getFamilyStoreDirectoryPath(),
350      HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName));
351    StoreFileInfo storeFileInfo =
352      new StoreFileInfo(conf, this.ctx.getRegionFileSystem().getFileSystem(), path, hFileLink);
353    add(Arrays.asList(storeFileInfo));
354    return hFileLink;
355  }
356
357  public HFileLink createHFileLink(final TableName linkedTable, final String linkedRegion,
358    final String hfileName, final boolean createBackRef) throws IOException {
359    String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName);
360    String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(),
361      ctx.getRegionInfo().getEncodedName());
362
363    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
364    // Make sure the destination directory exists
365    fs.mkdirs(ctx.getFamilyStoreDirectoryPath());
366
367    // Make sure the FileLink reference directory exists
368    Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion,
369      ctx.getFamily().getNameAsString());
370    Path backRefPath = null;
371    if (createBackRef) {
372      Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName);
373      fs.mkdirs(backRefssDir);
374
375      // Create the reference for the link
376      backRefPath = new Path(backRefssDir, refName);
377      fs.createNewFile(backRefPath);
378    }
379    try {
380      // Create the link
381      if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) {
382        return new HFileLink(new Path(ctx.getFamilyStoreDirectoryPath(), name), backRefPath, null,
383          archiveStoreDir);
384      }
385    } catch (IOException e) {
386      LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(),
387        e);
388      // Revert the reference if the link creation failed
389      if (createBackRef) {
390        fs.delete(backRefPath, false);
391      }
392      throw e;
393    }
394    throw new IOException("File link=" + name + " already exists under "
395      + ctx.getFamilyStoreDirectoryPath() + " folder.");
396
397  }
398
399  public HFileLink createFromHFileLink(final String hfileLinkName, final boolean createBackRef)
400    throws IOException {
401    Matcher hfileLinkMatcher = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName);
402    if (hfileLinkMatcher.matches()) {
403      return createHFileLink(
404        TableName.valueOf(hfileLinkMatcher.group(1), hfileLinkMatcher.group(2)),
405        hfileLinkMatcher.group(3), hfileLinkMatcher.group(4), createBackRef);
406    }
407    if (StoreFileInfo.isMobFileLink(hfileLinkName)) {
408      Matcher mobLinkMatcher = HFileLink.REF_OR_HFILE_LINK_PATTERN.matcher(hfileLinkName);
409      if (mobLinkMatcher.matches()) {
410        return createHFileLink(TableName.valueOf(mobLinkMatcher.group(1), mobLinkMatcher.group(2)),
411          mobLinkMatcher.group(3), mobLinkMatcher.group(4), createBackRef);
412      }
413    }
414    throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!");
415  }
416
417  @Override
418  public StoreContext getStoreContext() {
419    return ctx;
420  }
421
422  public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException {
423    archiveStoreFiles(storeFiles);
424  }
425
426  protected void archiveStoreFiles(List<HStoreFile> storeFiles) throws IOException {
427    HFileArchiver.archiveStoreFiles(this.conf, ctx.getRegionFileSystem().getFileSystem(),
428      ctx.getRegionInfo(), ctx.getRegionFileSystem().getTableDir(), ctx.getFamily().getName(),
429      storeFiles);
430  }
431
432  /**
433   * For primary replica, we will call load once when opening a region, and the implementation could
434   * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
435   * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
436   * {@code true}.
437   */
438  protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
439
440  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
441
442  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
443    Collection<StoreFileInfo> newFiles) throws IOException;
444
445  protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
446
447}