001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.storefiletracker;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021
022import java.io.BufferedInputStream;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.List;
029import java.util.regex.Matcher;
030import org.apache.commons.io.IOUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.backup.HFileArchiver;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.io.HFileLink;
041import org.apache.hadoop.hbase.io.Reference;
042import org.apache.hadoop.hbase.io.compress.Compression;
043import org.apache.hadoop.hbase.io.crypto.Encryption;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.HFile;
046import org.apache.hadoop.hbase.io.hfile.HFileContext;
047import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
048import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
049import org.apache.hadoop.hbase.regionserver.HStoreFile;
050import org.apache.hadoop.hbase.regionserver.StoreContext;
051import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
052import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
053import org.apache.hadoop.hbase.regionserver.StoreUtils;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.HFileArchiveUtil;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062
063/**
064 * Base class for all store file tracker.
065 * <p/>
066 * Mainly used to place the common logic to skip persistent for secondary replicas.
067 */
068@InterfaceAudience.Private
069abstract class StoreFileTrackerBase implements StoreFileTracker {
070
071  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
072
073  protected final Configuration conf;
074
075  protected final boolean isPrimaryReplica;
076
077  protected final StoreContext ctx;
078
079  private volatile boolean cacheOnWriteLogged;
080
081  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
082    this.conf = conf;
083    this.isPrimaryReplica = isPrimaryReplica;
084    this.ctx = ctx;
085  }
086
087  @Override
088  public final List<StoreFileInfo> load() throws IOException {
089    return doLoadStoreFiles(!isPrimaryReplica);
090  }
091
092  @Override
093  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
094    if (isPrimaryReplica) {
095      doAddNewStoreFiles(newFiles);
096    }
097  }
098
099  @Override
100  public final void replace(Collection<StoreFileInfo> compactedFiles,
101    Collection<StoreFileInfo> newFiles) throws IOException {
102    if (isPrimaryReplica) {
103      doAddCompactionResults(compactedFiles, newFiles);
104    }
105  }
106
107  @Override
108  public final void set(List<StoreFileInfo> files) throws IOException {
109    if (isPrimaryReplica) {
110      doSetStoreFiles(files);
111    }
112  }
113
114  @Override
115  public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
116    builder.setValue(TRACKER_IMPL, getTrackerName());
117    return builder;
118  }
119
120  protected final String getTrackerName() {
121    return StoreFileTrackerFactory.getStoreFileTrackerName(getClass());
122  }
123
124  private HFileContext createFileContext(Compression.Algorithm compression,
125    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
126    if (compression == null) {
127      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
128    }
129    ColumnFamilyDescriptor family = ctx.getFamily();
130    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
131      .withIncludesTags(includesTag).withCompression(compression)
132      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
133      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
134      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
135      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
136      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
137      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator())
138      .withIndexBlockEncoding(family.getIndexBlockEncoding()).build();
139    return hFileContext;
140  }
141
142  @Override
143  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
144    if (!isPrimaryReplica) {
145      throw new IllegalStateException("Should not call create writer on secondary replicas");
146    }
147    // creating new cache config for each new writer
148    final CacheConfig cacheConf = ctx.getCacheConf();
149    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
150    long totalCompactedFilesSize = params.totalCompactedFilesSize();
151    if (params.isCompaction()) {
152      // Don't cache data on write on compactions, unless specifically configured to do so
153      // Cache only when total file size remains lower than configured threshold
154      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
155      // if data blocks are to be cached on write
156      // during compaction, we should forcefully
157      // cache index and bloom blocks as well
158      if (
159        cacheCompactedBlocksOnWrite
160          && totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()
161      ) {
162        writerCacheConf.enableCacheOnWrite();
163        if (!cacheOnWriteLogged) {
164          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled "
165            + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
166          cacheOnWriteLogged = true;
167        }
168      } else {
169        writerCacheConf.setCacheDataOnWrite(false);
170        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
171          // checking condition once again for logging
172          LOG.debug(
173            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
174              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
175            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
176        }
177      }
178    } else {
179      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
180      if (shouldCacheDataOnWrite) {
181        writerCacheConf.enableCacheOnWrite();
182        if (!cacheOnWriteLogged) {
183          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for "
184            + "Index blocks and Bloom filter blocks", this);
185          cacheOnWriteLogged = true;
186        }
187      }
188    }
189    Encryption.Context encryptionContext = ctx.getEncryptionContext();
190    HFileContext hFileContext = createFileContext(params.compression(),
191      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
192    Path outputDir;
193    if (requireWritingToTmpDirFirst()) {
194      outputDir =
195        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
196    } else {
197      outputDir = ctx.getFamilyStoreDirectoryPath();
198    }
199    StoreFileWriter.Builder builder =
200      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
201        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
202        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
203        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
204        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
205        .withFileStoragePolicy(params.fileStoragePolicy())
206        .withWriterCreationTracker(params.writerCreationTracker())
207        .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior())
208        .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction());
209    return builder.build();
210  }
211
212  @Override
213  public Reference createReference(Reference reference, Path path) throws IOException {
214    FSDataOutputStream out = ctx.getRegionFileSystem().getFileSystem().create(path, false);
215    try {
216      out.write(reference.toByteArray());
217    } finally {
218      out.close();
219    }
220    return reference;
221  }
222
223  @Override
224  public Reference createAndCommitReference(Reference reference, Path path) throws IOException {
225    return createReference(reference, path);
226  }
227
228  /**
229   * Returns true if the specified family has reference files
230   * @param familyName Column Family Name
231   * @return true if family contains reference files
232   */
233  public boolean hasReferences() throws IOException {
234    Path storeDir = ctx.getRegionFileSystem().getStoreDir(ctx.getFamily().getNameAsString());
235    FileStatus[] files =
236      CommonFSUtils.listStatus(ctx.getRegionFileSystem().getFileSystem(), storeDir);
237    if (files != null) {
238      for (FileStatus stat : files) {
239        if (stat.isDirectory()) {
240          continue;
241        }
242        if (StoreFileInfo.isReference(stat.getPath())) {
243          LOG.trace("Reference {}", stat.getPath());
244          return true;
245        }
246      }
247    }
248    return false;
249  }
250
251  @Override
252  public Reference readReference(final Path p) throws IOException {
253    InputStream in = ctx.getRegionFileSystem().getFileSystem().open(p);
254    try {
255      // I need to be able to move back in the stream if this is not a pb serialization so I can
256      // do the Writable decoding instead.
257      in = in.markSupported() ? in : new BufferedInputStream(in);
258      int pblen = ProtobufUtil.lengthOfPBMagic();
259      in.mark(pblen);
260      byte[] pbuf = new byte[pblen];
261      IOUtils.readFully(in, pbuf, 0, pblen);
262      // WATCHOUT! Return in middle of function!!!
263      if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
264        return Reference.convert(
265          org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.Reference.parseFrom(in));
266      }
267      // Else presume Writables. Need to reset the stream since it didn't start w/ pb.
268      // We won't bother rewriting thie Reference as a pb since Reference is transitory.
269      in.reset();
270      Reference r = new Reference();
271      DataInputStream dis = new DataInputStream(in);
272      // Set in = dis so it gets the close below in the finally on our way out.
273      in = dis;
274      r.readFields(dis);
275      return r;
276    } finally {
277      in.close();
278    }
279  }
280
281  @Override
282  public StoreFileInfo getStoreFileInfo(Path initialPath, boolean primaryReplica)
283    throws IOException {
284    return getStoreFileInfo(null, initialPath, primaryReplica);
285  }
286
287  @Override
288  public StoreFileInfo getStoreFileInfo(FileStatus fileStatus, Path initialPath,
289    boolean primaryReplica) throws IOException {
290    FileSystem fs = this.ctx.getRegionFileSystem().getFileSystem();
291    assert fs != null;
292    assert initialPath != null;
293    assert conf != null;
294    Reference reference = null;
295    HFileLink link = null;
296    long createdTimestamp = 0;
297    long size = 0;
298    Path p = initialPath;
299    if (HFileLink.isHFileLink(p)) {
300      // HFileLink
301      reference = null;
302      link = HFileLink.buildFromHFileLinkPattern(conf, p);
303      LOG.trace("{} is a link", p);
304    } else if (StoreFileInfo.isReference(p)) {
305      reference = readReference(p);
306      Path referencePath = StoreFileInfo.getReferredToFile(p);
307      if (HFileLink.isHFileLink(referencePath)) {
308        // HFileLink Reference
309        link = HFileLink.buildFromHFileLinkPattern(conf, referencePath);
310      } else {
311        // Reference
312        link = null;
313      }
314      LOG.trace("{} is a {} reference to {}", p, reference.getFileRegion(), referencePath);
315    } else
316      if (StoreFileInfo.isHFile(p) || StoreFileInfo.isMobFile(p) || StoreFileInfo.isMobRefFile(p)) {
317        // HFile
318        if (fileStatus != null) {
319          createdTimestamp = fileStatus.getModificationTime();
320          size = fileStatus.getLen();
321        } else {
322          FileStatus fStatus = fs.getFileStatus(initialPath);
323          createdTimestamp = fStatus.getModificationTime();
324          size = fStatus.getLen();
325        }
326      } else {
327        throw new IOException("path=" + p + " doesn't look like a valid StoreFile");
328      }
329    return new StoreFileInfo(conf, fs, createdTimestamp, initialPath, size, reference, link,
330      isPrimaryReplica);
331  }
332
333  public HFileLink createAndCommitHFileLink(final TableName linkedTable, final String linkedRegion,
334    final String hfileName, final boolean createBackRef) throws IOException {
335    HFileLink hFileLink = createHFileLink(linkedTable, linkedRegion, hfileName, createBackRef);
336    Path path = new Path(ctx.getFamilyStoreDirectoryPath(),
337      HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName));
338    StoreFileInfo storeFileInfo =
339      new StoreFileInfo(conf, this.ctx.getRegionFileSystem().getFileSystem(), path, hFileLink);
340    add(Arrays.asList(storeFileInfo));
341    return hFileLink;
342  }
343
344  public HFileLink createHFileLink(final TableName linkedTable, final String linkedRegion,
345    final String hfileName, final boolean createBackRef) throws IOException {
346    String name = HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName);
347    String refName = HFileLink.createBackReferenceName(ctx.getTableName().toString(),
348      ctx.getRegionInfo().getEncodedName());
349
350    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
351    // Make sure the destination directory exists
352    fs.mkdirs(ctx.getFamilyStoreDirectoryPath());
353
354    // Make sure the FileLink reference directory exists
355    Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf, linkedTable, linkedRegion,
356      ctx.getFamily().getNameAsString());
357    Path backRefPath = null;
358    if (createBackRef) {
359      Path backRefssDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName);
360      fs.mkdirs(backRefssDir);
361
362      // Create the reference for the link
363      backRefPath = new Path(backRefssDir, refName);
364      fs.createNewFile(backRefPath);
365    }
366    try {
367      // Create the link
368      if (fs.createNewFile(new Path(ctx.getFamilyStoreDirectoryPath(), name))) {
369        return new HFileLink(new Path(ctx.getFamilyStoreDirectoryPath(), name), backRefPath, null,
370          archiveStoreDir);
371      }
372    } catch (IOException e) {
373      LOG.error("couldn't create the link=" + name + " for " + ctx.getFamilyStoreDirectoryPath(),
374        e);
375      // Revert the reference if the link creation failed
376      if (createBackRef) {
377        fs.delete(backRefPath, false);
378      }
379      throw e;
380    }
381    throw new IOException("File link=" + name + " already exists under "
382      + ctx.getFamilyStoreDirectoryPath() + " folder.");
383
384  }
385
386  public HFileLink createFromHFileLink(final String hfileLinkName, final boolean createBackRef)
387    throws IOException {
388    Matcher hfileLinkMatcher = HFileLink.LINK_NAME_PATTERN.matcher(hfileLinkName);
389    if (hfileLinkMatcher.matches()) {
390      return createHFileLink(
391        TableName.valueOf(hfileLinkMatcher.group(1), hfileLinkMatcher.group(2)),
392        hfileLinkMatcher.group(3), hfileLinkMatcher.group(4), createBackRef);
393    }
394    if (StoreFileInfo.isMobFileLink(hfileLinkName)) {
395      Matcher mobLinkMatcher = HFileLink.REF_OR_HFILE_LINK_PATTERN.matcher(hfileLinkName);
396      if (mobLinkMatcher.matches()) {
397        return createHFileLink(TableName.valueOf(mobLinkMatcher.group(1), mobLinkMatcher.group(2)),
398          mobLinkMatcher.group(3), mobLinkMatcher.group(4), createBackRef);
399      }
400    }
401    throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!");
402  }
403
404  @Override
405  public StoreContext getStoreContext() {
406    return ctx;
407  }
408
409  public void removeStoreFiles(List<HStoreFile> storeFiles) throws IOException {
410    archiveStoreFiles(storeFiles);
411  }
412
413  protected void archiveStoreFiles(List<HStoreFile> storeFiles) throws IOException {
414    HFileArchiver.archiveStoreFiles(this.conf, ctx.getRegionFileSystem().getFileSystem(),
415      ctx.getRegionInfo(), ctx.getRegionFileSystem().getTableDir(), ctx.getFamily().getName(),
416      storeFiles);
417  }
418
419  /**
420   * For primary replica, we will call load once when opening a region, and the implementation could
421   * choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
422   * are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
423   * {@code true}.
424   */
425  protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;
426
427  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
428
429  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
430    Collection<StoreFileInfo> newFiles) throws IOException;
431
432  protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;
433
434}