001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.BufferedInputStream;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Collection;
027import java.util.List;
028import java.util.regex.Matcher;
029import org.apache.commons.io.IOUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.backup.HFileArchiver;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.io.HFileLink;
040import org.apache.hadoop.hbase.io.Reference;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.crypto.Encryption;
043import org.apache.hadoop.hbase.io.hfile.CacheConfig;
044import org.apache.hadoop.hbase.io.hfile.HFile;
045import org.apache.hadoop.hbase.io.hfile.HFileContext;
046import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
047import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
048import org.apache.hadoop.hbase.regionserver.HStoreFile;
049import org.apache.hadoop.hbase.regionserver.StoreContext;
050import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
051import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
052import org.apache.hadoop.hbase.regionserver.StoreUtils;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.HFileArchiveUtil;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061
062/**
063 * Base class for all store file tracker.
064 * <p/>
065 * Mainly used to place the common logic to skip persistent for secondary replicas.
066 */
067@InterfaceAudience.Private
068abstract class StoreFileTrackerBase implements StoreFileTracker {
069
070  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
071
072  protected final Configuration conf;
073
074  protected final boolean isPrimaryReplica;
075
076  protected final StoreContext ctx;
077
078  private volatile boolean cacheOnWriteLogged;
079
080  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
081    this.conf = conf;
082    this.isPrimaryReplica = isPrimaryReplica;
083    this.ctx = ctx;
084  }
085
086  @Override
087  public final List<StoreFileInfo> load() throws IOException {
088    return doLoadStoreFiles(!isPrimaryReplica);
089  }
090
091  @Override
092  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
093    if (isPrimaryReplica) {
094      doAddNewStoreFiles(newFiles);
095    }
096  }
097
098  @Override
099  public final void replace(Collection<StoreFileInfo> compactedFiles,
100    Collection<StoreFileInfo> newFiles) throws IOException {
101    if (isPrimaryReplica) {
102      doAddCompactionResults(compactedFiles, newFiles);
103    }
104  }
105
106  @Override
107  public final void set(List<StoreFileInfo> files) throws IOException {
108    if (isPrimaryReplica) {
109      doSetStoreFiles(files);
110    }
111  }
112
113  @Override
114  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
115    builder.setValue(TRACKER_IMPL, getTrackerName());
116    return builder;
117  }
118
119  protected final String getTrackerName() {
120    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
121  }
122
123  private HFileContext createFileContext(Compression.Algorithm compression,
124    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
125    if (compression == null) {
126      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
127    }
128    ColumnFamilyDescriptor family = ctx.getFamily();
129    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
130      .withIncludesTags(includesTag).withCompression(compression)
131      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
132      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
133      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
134      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
135      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
136      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator())
137      .withIndexBlockEncoding(family.getIndexBlockEncoding()).build();
138    return hFileContext;
139  }
140
141  @Override
142  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
143    if (!isPrimaryReplica) {
144      throw new IllegalStateException("Should not call create writer on secondary replicas");
145    }
146    // creating new cache config for each new writer
147    final CacheConfig cacheConf = ctx.getCacheConf();
148    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
149    long totalCompactedFilesSize = params.totalCompactedFilesSize();
150    if (params.isCompaction()) {
151      // Don't cache data on write on compactions, unless specifically configured to do so
152      // Cache only when total file size remains lower than configured threshold
153      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
154      // if data blocks are to be cached on write
155      // during compaction, we should forcefully
156      // cache index and bloom blocks as well
157      if (
158        cacheCompactedBlocksOnWrite
159          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
160      ) {
161        writerCacheConf.enableCacheOnWrite();
162        if (!cacheOnWriteLogged) {
163          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
164            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
165          cacheOnWriteLogged = true;
166        }
167      } else {
168        writerCacheConf.setCacheDataOnWrite(false);
169        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
170          // checking condition once again for logging
171          LOG.debug(
172            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
173              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
174            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
175        }
176      }
177    } else {
178      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
179      if (shouldCacheDataOnWrite) {
180        writerCacheConf.enableCacheOnWrite();
181        if (!cacheOnWriteLogged) {
182          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
183            + "Index blocks and Bloom filter blocks", this);
184          cacheOnWriteLogged = true;
185        }
186      }
187    }
188    Encryption.Context encryptionContext = ctx.getEncryptionContext();
189    HFileContext hFileContext = createFileContext(params.compression(),
190      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
191    Path outputDir;
192    if (requireWritingToTmpDirFirst()) {
193      outputDir =
194        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
195    } else {
196      outputDir = ctx.getFamilyStoreDirectoryPath();
197    }
198    StoreFileWriter.Builder builder =
199      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
200        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
201        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
202        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
203        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
204        .withFileStoragePolicy(params.fileStoragePolicy())
205        .withWriterCreationTracker(params.writerCreationTracker())
206        .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior())
207        .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction());
208    return builder.build();
209  }
210
211  @Override
212  public Reference createReference(Reference reference, Path path) throws IOException {
213    FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false);
214    try {
215      out.write(reference.toByteArray());
216    } finally {
217      out.close();
218    }
219    return reference;
220  }
221
222  /**
223   * Returns true if the specified family has reference files
224   * @param familyName Column Family Name
225   * @return true if family contains reference files
226   */
227  public boolean hasReferences() throws IOException {
228    Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString());
229    FileStatus[] files =
230      CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir);
231    if (files != null) {
232      for (FileStatus stat : files) {
233        if (stat.isDirectory()) {
234          continue;
235        }
236        if (StoreFileInfo.isReference(stat.getPath())) {
237          LOG.trace("Reference {}", stat.getPath());
238          return true;
239        }
240      }
241    }
242    return false;
243  }
244
245  @Override
246  public Reference readReference(final Path p) throws IOException {
247    InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p);
248    try {
249      // I need to be able to move back in the stream if this is not a pb serialization so I can
250      // do the Writable decoding instead.
251      in = in.markSupported() ? in : new BufferedInputStream(in);
252      int pblen = ProtobufUtil.lengthOfPBMagic();
253      in.mark(pblen);
254      byte[] pbuf = new byte[pblen];
255      IOUtils.readFully(in, pbuf, 0, pblen);
256      // WATCHOUT! Return in middle of function!!!
257      if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
258        return Reference.convert(
259          org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in));
260      }
261      // Else presume Writables. Need to reset the stream since it didn't start w/ pb.
262      // We won't bother rewriting thie Reference as a pb since Reference is transitory.
263      in.reset();
264      Reference r = new Reference();
265      DataInputStream dis = new DataInputStream(in);
266      // Set in = dis so it gets the close below in the finally on our way out.
267      in = dis;
268      r.readFields(dis);
269      return r;
270    } finally {
271      in.close();
272    }
273  }
274
275  @Override
276  public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica)
277    throws IOException {
278    return getStoreFileInfo(null, initialPath, primaryReplica);
279  }
280
281  @Override
282  public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath,
283    boolean primaryReplica) throws IOException {
284    FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem();
285    assert fs != null;
286    assert initialPath != null;
287    assert conf != null;
288    Reference reference = null;
289    HFileLink link = null;
290    long createdTimestamp = 0;
291    long size = 0;
292    Path p = initialPath;
293    if (HFileLink.isHFileLink(p)) {
294      // HFileLink
295      reference = null;
296      link = HFileLink.buildFromHFileLinkPattern(conf, p);
297      LOG.trace("{} is a link", p);
298    } else if (StoreFileInfo.isReference(p)) {
299      reference = readReference(p);
300      Path referencePath = StoreFileInfo.getReferredToFile(p);
301      if (HFileLink.isHFileLink(referencePath)) {
302        // HFileLink Reference
303        link = HFileLink.buildFromHFileLinkPattern(conf, referencePath);
304      } else {
305        // Reference
306        link = null;
307      }
308      LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath);
309    } else
310      if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) {
311        // HFile
312        if (fileStatus != null) {
313          createdTimestamp = fileStatus.getModificationTime();
314          size = fileStatus.getLen();
315        } else {
316          FileStatus fStatus = fs.getFileStatus(initialPath);
317          createdTimestamp = fStatus.getModificationTime();
318          size = fStatus.getLen();
319        }
320      } else {
321        throw new IOException("path=" + p + " doesn't look like a valid StoreFile");
322      }
323    return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link,
324      isPrimaryReplica);
325  }
326
327  public String createHFileLink(final TableName linkedTable, final String linkedRegion,
328    final String hfileName, final boolean createBackRef) throws IOException {
329    String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName);
330    String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(),
331      ctx.getRegionInfo().getEncodedName());
332
333    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
334    // Make sure the destination directory exists
335    fs.mkdirs(ctx.getFamilyStoreDirectoryPath());
336
337    // Make sure the FileLink reference directory exists
338    Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion,
339      ctx.getFamily().getNameAsString());
340    Path backRefPath = null;
341    if (createBackRef) {
342      Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName);
343      fs.mkdirs(backRefssDir);
344
345      // Create the reference for the link
346      backRefPath = new Path(backRefssDir, refName);
347      fs.createNewFile(backRefPath);
348    }
349    try {
350      // Create the link
351      if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) {
352        return name;
353      }
354    } catch (IOException e) {
355      LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(),
356        e);
357      // Revert the reference if the link creation failed
358      if (createBackRef) {
359        fs.delete(backRefPath, false);
360      }
361      throw e;
362    }
363    throw new IOException("File link=" + name + " already exists under "
364      + ctx.getFamilyStoreDirectoryPath() + " folder.");
365
366  }
367
368  public String createFromHFileLink(final String hfileLinkName, final boolean createBackRef)
369    throws IOException {
370    Matcher m = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName);
371    if (!m.matches()) {
372      throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!");
373    }
374    return createHFileLink(TableName.valueOf(m.group(1), m.group(2)), m.group(3), m.group(4),
375      createBackRef);
376  }
377
378  public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException {
379    archiveStoreFiles(storeFiles);
380  }
381
382  protected void archiveStoreFiles(List<HStoreFile> storeFiles) throws IOException {
383    HFileArchiver.archiveStoreFiles(this.conf, ctx.getRegionFileSystem().getFileSystem(),
384      ctx.getRegionInfo(), ctx.getRegionFileSystem().getTableDir(), ctx.getFamily().getName(),
385      storeFiles);
386  }
387
388  /**
389   * For primary replica, we will call load once when opening a region, and the implementation could
390   * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
391   * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
392   * {@code true}.
393   */
394  protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
395
396  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
397
398  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
399    Collection<StoreFileInfo> newFiles) throws IOException;
400
401  protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
402
403}