001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.Path;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Implementation of {@link HFile.Reader} to deal with pread.
029 */
030@InterfaceAudience.Private
031public class HFilePreadReader extends HFileReaderImpl {
032  private static final Logger LOG = LoggerFactory.getLogger(HFileReaderImpl.class);
033
034  public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
035    Configuration conf) throws IOException {
036    super(context, fileInfo, cacheConf, conf);
037    // Prefetch file blocks upon open if requested
038    if (cacheConf.shouldPrefetchOnOpen()) {
039      PrefetchExecutor.request(path, new Runnable() {
040        @Override
041        public void run() {
042          long offset = 0;
043          long end = 0;
044          try {
045            end = getTrailer().getLoadOnOpenDataOffset();
046            if (LOG.isTraceEnabled()) {
047              LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
048            }
049            // Don't use BlockIterator here, because it's designed to read load-on-open section.
050            long onDiskSizeOfNextBlock = -1;
051            while (offset < end) {
052              if (Thread.interrupted()) {
053                break;
054              }
055              // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
056              // the internal-to-hfileblock thread local which holds the overread that gets the
057              // next header, will not have happened...so, pass in the onDiskSize gotten from the
058              // cached block. This 'optimization' triggers extremely rarely I'd say.
059              HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
060                /* pread= */true, false, false, null, null);
061              try {
062                onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
063                offset += block.getOnDiskSizeWithHeader();
064              } finally {
065                // Ideally here the readBlock won't find the block in cache. We call this
066                // readBlock so that block data is read from FS and cached in BC. we must call
067                // returnBlock here to decrease the reference count of block.
068                block.release();
069              }
070            }
071          } catch (IOException e) {
072            // IOExceptions are probably due to region closes (relocation, etc.)
073            if (LOG.isTraceEnabled()) {
074              LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
075            }
076          } catch (NullPointerException e) {
077            LOG.warn(
078              "Stream moved/closed or prefetch cancelled?" + getPathOffsetEndStr(path, offset, end),
079              e);
080          } catch (Exception e) {
081            // Other exceptions are interesting
082            LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
083          } finally {
084            PrefetchExecutor.complete(path);
085          }
086        }
087      });
088    }
089  }
090
091  private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
092    return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
093  }
094
095  public void close(boolean evictOnClose) throws IOException {
096    PrefetchExecutor.cancel(path);
097    // Deallocate blocks in load-on-open section
098    this.fileInfo.close();
099    // Deallocate data blocks
100    cacheConf.getBlockCache().ifPresent(cache -> {
101      if (evictOnClose) {
102        int numEvicted = cache.evictBlocksByHfileName(name);
103        if (LOG.isTraceEnabled()) {
104          LOG.trace("On close, file=" + name + " evicted=" + numEvicted + " block(s)");
105        }
106      }
107    });
108    fsBlockReader.closeStreams();
109  }
110}