001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.BufferedInputStream;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Collection;
027import java.util.List;
028import org.apache.commons.io.IOUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.io.HFileLink;
037import org.apache.hadoop.hbase.io.Reference;
038import org.apache.hadoop.hbase.io.compress.Compression;
039import org.apache.hadoop.hbase.io.crypto.Encryption;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.HFileContext;
043import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
044import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
045import org.apache.hadoop.hbase.regionserver.StoreContext;
046import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
047import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
048import org.apache.hadoop.hbase.regionserver.StoreUtils;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
056
057/**
058 * Base class for all store file tracker.
059 * <p/>
060 * Mainly used to place the common logic to skip persistent for secondary replicas.
061 */
062@InterfaceAudience.Private
063abstract class StoreFileTrackerBase implements StoreFileTracker {
064
065  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
066
067  protected final Configuration conf;
068
069  protected final boolean isPrimaryReplica;
070
071  protected final StoreContext ctx;
072
073  private volatile boolean cacheOnWriteLogged;
074
075  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
076    this.conf = conf;
077    this.isPrimaryReplica = isPrimaryReplica;
078    this.ctx = ctx;
079  }
080
081  @Override
082  public final List<StoreFileInfo> load() throws IOException {
083    return doLoadStoreFiles(!isPrimaryReplica);
084  }
085
086  @Override
087  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
088    if (isPrimaryReplica) {
089      doAddNewStoreFiles(newFiles);
090    }
091  }
092
093  @Override
094  public final void replace(Collection<StoreFileInfo> compactedFiles,
095    Collection<StoreFileInfo> newFiles) throws IOException {
096    if (isPrimaryReplica) {
097      doAddCompactionResults(compactedFiles, newFiles);
098    }
099  }
100
101  @Override
102  public final void set(List<StoreFileInfo> files) throws IOException {
103    if (isPrimaryReplica) {
104      doSetStoreFiles(files);
105    }
106  }
107
108  @Override
109  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
110    builder.setValue(TRACKER_IMPL, getTrackerName());
111    return builder;
112  }
113
114  protected final String getTrackerName() {
115    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
116  }
117
118  private HFileContext createFileContext(Compression.Algorithm compression,
119    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
120    if (compression == null) {
121      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
122    }
123    ColumnFamilyDescriptor family = ctx.getFamily();
124    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
125      .withIncludesTags(includesTag).withCompression(compression)
126      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
127      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
128      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
129      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
130      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
131      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator())
132      .withIndexBlockEncoding(family.getIndexBlockEncoding()).build();
133    return hFileContext;
134  }
135
136  @Override
137  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
138    if (!isPrimaryReplica) {
139      throw new IllegalStateException("Should not call create writer on secondary replicas");
140    }
141    // creating new cache config for each new writer
142    final CacheConfig cacheConf = ctx.getCacheConf();
143    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
144    long totalCompactedFilesSize = params.totalCompactedFilesSize();
145    if (params.isCompaction()) {
146      // Don't cache data on write on compactions, unless specifically configured to do so
147      // Cache only when total file size remains lower than configured threshold
148      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
149      // if data blocks are to be cached on write
150      // during compaction, we should forcefully
151      // cache index and bloom blocks as well
152      if (
153        cacheCompactedBlocksOnWrite
154          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
155      ) {
156        writerCacheConf.enableCacheOnWrite();
157        if (!cacheOnWriteLogged) {
158          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
159            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
160          cacheOnWriteLogged = true;
161        }
162      } else {
163        writerCacheConf.setCacheDataOnWrite(false);
164        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
165          // checking condition once again for logging
166          LOG.debug(
167            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
168              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
169            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
170        }
171      }
172    } else {
173      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
174      if (shouldCacheDataOnWrite) {
175        writerCacheConf.enableCacheOnWrite();
176        if (!cacheOnWriteLogged) {
177          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
178            + "Index blocks and Bloom filter blocks", this);
179          cacheOnWriteLogged = true;
180        }
181      }
182    }
183    Encryption.Context encryptionContext = ctx.getEncryptionContext();
184    HFileContext hFileContext = createFileContext(params.compression(),
185      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
186    Path outputDir;
187    if (requireWritingToTmpDirFirst()) {
188      outputDir =
189        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
190    } else {
191      outputDir = ctx.getFamilyStoreDirectoryPath();
192    }
193    StoreFileWriter.Builder builder =
194      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
195        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
196        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
197        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
198        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
199        .withFileStoragePolicy(params.fileStoragePolicy())
200        .withWriterCreationTracker(params.writerCreationTracker())
201        .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior())
202        .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction());
203    return builder.build();
204  }
205
206  @Override
207  public Reference createReference(Reference reference, Path path) throws IOException {
208    FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false);
209    try {
210      out.write(reference.toByteArray());
211    } finally {
212      out.close();
213    }
214    return reference;
215  }
216
217  /**
218   * Returns true if the specified family has reference files
219   * @param familyName Column Family Name
220   * @return true if family contains reference files
221   */
222  public boolean hasReferences() throws IOException {
223    Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString());
224    FileStatus[] files =
225      CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir);
226    if (files != null) {
227      for (FileStatus stat : files) {
228        if (stat.isDirectory()) {
229          continue;
230        }
231        if (StoreFileInfo.isReference(stat.getPath())) {
232          LOG.trace("Reference {}", stat.getPath());
233          return true;
234        }
235      }
236    }
237    return false;
238  }
239
240  @Override
241  public Reference readReference(final Path p) throws IOException {
242    InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p);
243    try {
244      // I need to be able to move back in the stream if this is not a pb serialization so I can
245      // do the Writable decoding instead.
246      in = in.markSupported() ? in : new BufferedInputStream(in);
247      int pblen = ProtobufUtil.lengthOfPBMagic();
248      in.mark(pblen);
249      byte[] pbuf = new byte[pblen];
250      IOUtils.readFully(in, pbuf, 0, pblen);
251      // WATCHOUT! Return in middle of function!!!
252      if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
253        return Reference.convert(
254          org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in));
255      }
256      // Else presume Writables. Need to reset the stream since it didn't start w/ pb.
257      // We won't bother rewriting thie Reference as a pb since Reference is transitory.
258      in.reset();
259      Reference r = new Reference();
260      DataInputStream dis = new DataInputStream(in);
261      // Set in = dis so it gets the close below in the finally on our way out.
262      in = dis;
263      r.readFields(dis);
264      return r;
265    } finally {
266      in.close();
267    }
268  }
269
270  @Override
271  public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica)
272    throws IOException {
273    return getStoreFileInfo(null, initialPath, primaryReplica);
274  }
275
276  @Override
277  public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath,
278    boolean primaryReplica) throws IOException {
279    FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem();
280    assert fs != null;
281    assert initialPath != null;
282    assert conf != null;
283    Reference reference = null;
284    HFileLink link = null;
285    long createdTimestamp = 0;
286    long size = 0;
287    Path p = initialPath;
288    if (HFileLink.isHFileLink(p)) {
289      // HFileLink
290      reference = null;
291      link = HFileLink.buildFromHFileLinkPattern(conf, p);
292      LOG.trace("{} is a link", p);
293    } else if (StoreFileInfo.isReference(p)) {
294      reference = readReference(p);
295      Path referencePath = StoreFileInfo.getReferredToFile(p);
296      if (HFileLink.isHFileLink(referencePath)) {
297        // HFileLink Reference
298        link = HFileLink.buildFromHFileLinkPattern(conf, referencePath);
299      } else {
300        // Reference
301        link = null;
302      }
303      LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath);
304    } else
305      if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) {
306        // HFile
307        if (fileStatus != null) {
308          createdTimestamp = fileStatus.getModificationTime();
309          size = fileStatus.getLen();
310        } else {
311          FileStatus fStatus = fs.getFileStatus(initialPath);
312          createdTimestamp = fStatus.getModificationTime();
313          size = fStatus.getLen();
314        }
315      } else {
316        throw new IOException("path=" + p + " doesn't look like a valid StoreFile");
317      }
318    return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link,
319      isPrimaryReplica);
320  }
321
322  /**
323   * For primary replica, we will call load once when opening a region, and the implementation could
324   * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
325   * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
326   * {@code true}.
327   */
328  protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
329
330  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
331
332  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
333    Collection<StoreFileInfo> newFiles) throws IOException;
334
335  protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
336
337}