001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.FSDataInputStream;
024import org.apache.hadoop.hbase.HDFSBlocksDistribution;
025import org.apache.hadoop.hbase.io.FileLink;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.FSUtils;
028import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Computes the HDFSBlockDistribution for a file based on the underlying located blocks for an
035 * HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
036 * allocating an array of numBlocks size per call. It may also involve calling the namenode, if the
037 * DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure, we
038 * cache the computed distribution for a configurable period of time.
039 * <p>
040 * This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e.
041 * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the value
042 * returned by {@link HStoreFile#getHDFSBlockDistribution()}.
043 * <p>
044 * Once the backing FSDataInputStream is closed, we should not expect the distribution result to
045 * change anymore. This is ok becuase the initialReader's InputStream is only closed when the
046 * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
047 * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will be
048 * created for the new initialReader.
049 */
050@InterfaceAudience.Private
051public class InputStreamBlockDistribution {
052  private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
053
054  private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
055    "hbase.locality.inputstream.derive.enabled";
056  private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
057
058  private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
059    "hbase.locality.inputstream.derive.cache.period";
060  private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
061
062  private final FSDataInputStream stream;
063  private final StoreFileInfo fileInfo;
064  private final int cachePeriodMs;
065
066  private HDFSBlocksDistribution hdfsBlocksDistribution;
067  private long lastCachedAt;
068  private boolean streamUnsupported;
069
070  /**
071   * This should only be called for the first FSDataInputStream of a StoreFile, in
072   * {@link HStoreFile#open()}.
073   * @see InputStreamBlockDistribution
074   * @param stream   the input stream to derive locality from
075   * @param fileInfo the StoreFileInfo for the related store file
076   */
077  public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
078    this.stream = stream;
079    this.fileInfo = fileInfo;
080    this.cachePeriodMs = fileInfo.getConf().getInt(HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
081      DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
082    this.lastCachedAt = EnvironmentEdgeManager.currentTime();
083    this.streamUnsupported = false;
084    this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
085  }
086
087  /**
088   * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
089   */
090  public static boolean isEnabled(Configuration conf) {
091    return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
092      DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
093  }
094
095  /**
096   * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
097   * is expired.
098   */
099  public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
100    if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
101      try {
102        LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
103        computeBlockDistribution();
104      } catch (IOException e) {
105        LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
106          fileInfo, e);
107      }
108    }
109    return hdfsBlocksDistribution;
110  }
111
112  private void computeBlockDistribution() throws IOException {
113    lastCachedAt = EnvironmentEdgeManager.currentTime();
114
115    FSDataInputStream stream;
116    if (fileInfo.isLink()) {
117      stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
118    } else {
119      stream = this.stream;
120    }
121
122    if (!(stream instanceof HdfsDataInputStream)) {
123      if (!streamUnsupported) {
124        LOG.warn(
125          "{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
126            + "used to derive locality. Falling back on cached value.",
127          stream, fileInfo, fileInfo.isLink());
128        streamUnsupported = true;
129      }
130      return;
131    }
132
133    streamUnsupported = false;
134    hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
135  }
136
137  /**
138   * For tests only, sets lastCachedAt so we can force a refresh
139   */
140  @RestrictedApi(explanation = "Should only be called in tests", link = "",
141      allowedOnPath = ".*/src/test/.*")
142  synchronized void setLastCachedAt(long timestamp) {
143    lastCachedAt = timestamp;
144  }
145
146  /**
147   * For tests only, returns the configured cache period
148   */
149  @RestrictedApi(explanation = "Should only be called in tests", link = "",
150      allowedOnPath = ".*/src/test/.*")
151  long getCachePeriodMs() {
152    return cachePeriodMs;
153  }
154
155  /**
156   * For tests only, returns whether the passed stream is supported
157   */
158  @RestrictedApi(explanation = "Should only be called in tests", link = "",
159      allowedOnPath = ".*/src/test/.*")
160  boolean isStreamUnsupported() {
161    return streamUnsupported;
162  }
163}