001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FSDataInputStream; 024import org.apache.hadoop.hbase.HDFSBlocksDistribution; 025import org.apache.hadoop.hbase.io.FileLink; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.FSUtils; 028import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Computes the HDFSBlockDistribution for a file based on the underlying located blocks for an 035 * HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves 036 * allocating an array of numBlocks size per call. It may also involve calling the namenode, if the 037 * DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure, we 038 * cache the computed distribution for a configurable period of time. 039 * <p> 040 * This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e. 041 * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the value 042 * returned by {@link HStoreFile#getHDFSBlockDistribution()}. 043 * <p> 044 * Once the backing FSDataInputStream is closed, we should not expect the distribution result to 045 * change anymore. This is ok becuase the initialReader's InputStream is only closed when the 046 * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution 047 * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will be 048 * created for the new initialReader. 049 */ 050@InterfaceAudience.Private 051public class InputStreamBlockDistribution { 052 private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class); 053 054 private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = 055 "hbase.locality.inputstream.derive.enabled"; 056 private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false; 057 058 private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 059 "hbase.locality.inputstream.derive.cache.period"; 060 private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000; 061 062 private final FSDataInputStream stream; 063 private final StoreFileInfo fileInfo; 064 private final int cachePeriodMs; 065 066 private HDFSBlocksDistribution hdfsBlocksDistribution; 067 private long lastCachedAt; 068 private boolean streamUnsupported; 069 070 /** 071 * This should only be called for the first FSDataInputStream of a StoreFile, in 072 * {@link HStoreFile#open()}. 073 * @see InputStreamBlockDistribution 074 * @param stream the input stream to derive locality from 075 * @param fileInfo the StoreFileInfo for the related store file 076 */ 077 public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) { 078 this.stream = stream; 079 this.fileInfo = fileInfo; 080 this.cachePeriodMs = fileInfo.getConf().getInt(HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD, 081 DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD); 082 this.lastCachedAt = EnvironmentEdgeManager.currentTime(); 083 this.streamUnsupported = false; 084 this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution(); 085 } 086 087 /** 088 * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream 089 */ 090 public static boolean isEnabled(Configuration conf) { 091 return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED, 092 DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED); 093 } 094 095 /** 096 * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache 097 * is expired. 098 */ 099 public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() { 100 if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) { 101 try { 102 LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo); 103 computeBlockDistribution(); 104 } catch (IOException e) { 105 LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.", 106 fileInfo, e); 107 } 108 } 109 return hdfsBlocksDistribution; 110 } 111 112 private void computeBlockDistribution() throws IOException { 113 lastCachedAt = EnvironmentEdgeManager.currentTime(); 114 115 FSDataInputStream stream; 116 if (fileInfo.isLink()) { 117 stream = FileLink.getUnderlyingFileLinkInputStream(this.stream); 118 } else { 119 stream = this.stream; 120 } 121 122 if (!(stream instanceof HdfsDataInputStream)) { 123 if (!streamUnsupported) { 124 LOG.warn( 125 "{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be " 126 + "used to derive locality. Falling back on cached value.", 127 stream, fileInfo, fileInfo.isLink()); 128 streamUnsupported = true; 129 } 130 return; 131 } 132 133 streamUnsupported = false; 134 hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream); 135 } 136 137 /** 138 * For tests only, sets lastCachedAt so we can force a refresh 139 */ 140 @RestrictedApi(explanation = "Should only be called in tests", link = "", 141 allowedOnPath = ".*/src/test/.*") 142 synchronized void setLastCachedAt(long timestamp) { 143 lastCachedAt = timestamp; 144 } 145 146 /** 147 * For tests only, returns the configured cache period 148 */ 149 @RestrictedApi(explanation = "Should only be called in tests", link = "", 150 allowedOnPath = ".*/src/test/.*") 151 long getCachePeriodMs() { 152 return cachePeriodMs; 153 } 154 155 /** 156 * For tests only, returns whether the passed stream is supported 157 */ 158 @RestrictedApi(explanation = "Should only be called in tests", link = "", 159 allowedOnPath = ".*/src/test/.*") 160 boolean isStreamUnsupported() { 161 return streamUnsupported; 162 } 163}