001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.fs; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.io.Closeable; 022import java.io.IOException; 023import java.lang.reflect.Field; 024import java.lang.reflect.InvocationHandler; 025import java.lang.reflect.InvocationTargetException; 026import java.lang.reflect.Method; 027import java.lang.reflect.Modifier; 028import java.lang.reflect.Proxy; 029import java.lang.reflect.UndeclaredThrowableException; 030import java.net.URI; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.FilterFileSystem; 035import org.apache.hadoop.fs.LocalFileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.util.CommonFSUtils; 039import org.apache.hadoop.hbase.util.ReflectionUtils; 040import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 041import org.apache.hadoop.hdfs.DFSClient; 042import org.apache.hadoop.hdfs.DistributedFileSystem; 043import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 044import org.apache.hadoop.hdfs.protocol.ClientProtocol; 045import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 046import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 047import org.apache.hadoop.hdfs.protocol.LocatedBlock; 048import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 049import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 050import org.apache.hadoop.ipc.RPC; 051import org.apache.hadoop.util.Progressable; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * An encapsulation for the FileSystem object that hbase uses to access data. This class allows the 058 * flexibility of using separate filesystem objects for reading and writing hfiles and wals. 059 */ 060@InterfaceAudience.Private 061public class HFileSystem extends FilterFileSystem { 062 public static final Logger LOG = LoggerFactory.getLogger(HFileSystem.class); 063 064 private final FileSystem noChecksumFs; // read hfile data from storage 065 private final boolean useHBaseChecksum; 066 private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; 067 068 /** 069 * Create a FileSystem object for HBase regionservers. 070 * @param conf The configuration to be used for the filesystem 071 * @param useHBaseChecksum if true, then use checksum verfication in hbase, otherwise delegate 072 * checksum verification to the FileSystem. 073 */ 074 public HFileSystem(Configuration conf, boolean useHBaseChecksum) throws IOException { 075 076 // Create the default filesystem with checksum verification switched on. 077 // By default, any operation to this FilterFileSystem occurs on 078 // the underlying filesystem that has checksums switched on. 079 // This FS#get(URI, conf) clearly indicates in the javadoc that if the FS is 080 // not created it will initialize the FS and return that created FS. If it is 081 // already created it will just return the FS that was already created. 082 // We take pains to funnel all of our FileSystem instantiation through this call to ensure 083 // we never need to call FS.initialize ourself so that we do not have to track any state to 084 // avoid calling initialize more than once. 085 this.fs = FileSystem.get(getDefaultUri(conf), conf); 086 this.useHBaseChecksum = useHBaseChecksum; 087 088 // disable checksum verification for local fileSystem, see HBASE-11218 089 if (fs instanceof LocalFileSystem) { 090 fs.setWriteChecksum(false); 091 fs.setVerifyChecksum(false); 092 } 093 094 addLocationsOrderInterceptor(conf); 095 096 // If hbase checksum verification is switched on, then create a new 097 // filesystem object that has cksum verification turned off. 098 // We will avoid verifying checksums in the fs client, instead do it 099 // inside of hbase. 100 // If this is the local file system hadoop has a bug where seeks 101 // do not go to the correct location if setVerifyChecksum(false) is called. 102 // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read 103 // their header magic numbers. See HBASE-5885 104 if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) { 105 conf = new Configuration(conf); 106 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 107 this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf); 108 this.noChecksumFs.setVerifyChecksum(false); 109 } else { 110 this.noChecksumFs = maybeWrapFileSystem(fs, conf); 111 } 112 113 this.fs = maybeWrapFileSystem(this.fs, conf); 114 } 115 116 /** 117 * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and writefs are both set to be 118 * the same specified fs. Do not verify hbase-checksums while reading data from filesystem. 119 * @param fs Set the noChecksumFs and writeFs to this specified filesystem. 120 */ 121 public HFileSystem(FileSystem fs) { 122 this.fs = fs; 123 this.noChecksumFs = fs; 124 this.useHBaseChecksum = false; 125 } 126 127 /** 128 * Returns the filesystem that is specially setup for doing reads from storage. This object avoids 129 * doing checksum verifications for reads. 130 * @return The FileSystem object that can be used to read data from files. 131 */ 132 public FileSystem getNoChecksumFs() { 133 return noChecksumFs; 134 } 135 136 /** 137 * Returns the underlying filesystem 138 * @return The underlying FileSystem for this FilterFileSystem object. 139 */ 140 public FileSystem getBackingFs() throws IOException { 141 return fs; 142 } 143 144 /** 145 * Set the source path (directory/file) to the specified storage policy. 146 * @param path The source path (directory/file). 147 * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. See see hadoop 2.6+ 148 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 'COLD', 149 * 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 150 */ 151 public void setStoragePolicy(Path path, String policyName) { 152 CommonFSUtils.setStoragePolicy(this.fs, path, policyName); 153 } 154 155 /** 156 * Get the storage policy of the source path (directory/file). 157 * @param path The source path (directory/file). 158 * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or 159 * exception thrown when trying to get policy 160 */ 161 @Nullable 162 public String getStoragePolicyName(Path path) { 163 try { 164 Object blockStoragePolicySpi = 165 ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path); 166 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 167 } catch (Exception e) { 168 // Maybe fail because of using old HDFS version, try the old way 169 if (LOG.isTraceEnabled()) { 170 LOG.trace("Failed to get policy directly", e); 171 } 172 return getStoragePolicyForOldHDFSVersion(path); 173 } 174 } 175 176 /** 177 * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need 178 * to keep compatible with it. See HADOOP-12161 for more details. 179 * @param path Path to get storage policy against 180 * @return the storage policy name 181 */ 182 private String getStoragePolicyForOldHDFSVersion(Path path) { 183 try { 184 if (this.fs instanceof DistributedFileSystem) { 185 DistributedFileSystem dfs = (DistributedFileSystem) this.fs; 186 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 187 if (null != status) { 188 if (unspecifiedStoragePolicyId < 0) { 189 // Get the unspecified id field through reflection to avoid compilation error. 190 // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to 191 // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED 192 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 193 unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class); 194 } 195 byte storagePolicyId = status.getStoragePolicy(); 196 if (storagePolicyId != unspecifiedStoragePolicyId) { 197 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 198 for (BlockStoragePolicy policy : policies) { 199 if (policy.getId() == storagePolicyId) { 200 return policy.getName(); 201 } 202 } 203 } 204 } 205 } 206 } catch (Throwable e) { 207 LOG.warn("failed to get block storage policy of [" + path + "]", e); 208 } 209 210 return null; 211 } 212 213 /** 214 * Are we verifying checksums in HBase? 215 * @return True, if hbase is configured to verify checksums, otherwise false. 216 */ 217 public boolean useHBaseChecksum() { 218 return useHBaseChecksum; 219 } 220 221 /** 222 * Close this filesystem object 223 */ 224 @Override 225 public void close() throws IOException { 226 super.close(); 227 if (this.noChecksumFs != fs) { 228 this.noChecksumFs.close(); 229 } 230 } 231 232 /** 233 * Returns a brand new instance of the FileSystem. It does not use the FileSystem.Cache. In newer 234 * versions of HDFS, we can directly invoke FileSystem.newInstance(Configuration). 235 * @param conf Configuration 236 * @return A new instance of the filesystem 237 */ 238 private static FileSystem newInstanceFileSystem(Configuration conf) throws IOException { 239 URI uri = FileSystem.getDefaultUri(conf); 240 FileSystem fs = null; 241 Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); 242 if (clazz != null) { 243 // This will be true for Hadoop 1.0, or 0.20. 244 fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); 245 fs.initialize(uri, conf); 246 } else { 247 // For Hadoop 2.0, we have to go through FileSystem for the filesystem 248 // implementation to be loaded by the service loader in case it has not 249 // been loaded yet. 250 Configuration clone = new Configuration(conf); 251 clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true); 252 fs = FileSystem.get(uri, clone); 253 } 254 if (fs == null) { 255 throw new IOException("No FileSystem for scheme: " + uri.getScheme()); 256 } 257 258 return fs; 259 } 260 261 /** 262 * Returns an instance of Filesystem wrapped into the class specified in hbase.fs.wrapper 263 * property, if one is set in the configuration, returns unmodified FS instance passed in as an 264 * argument otherwise. 265 * @param base Filesystem instance to wrap 266 * @param conf Configuration 267 * @return wrapped instance of FS, or the same instance if no wrapping configured. 268 */ 269 private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) { 270 try { 271 Class<?> clazz = conf.getClass("hbase.fs.wrapper", null); 272 if (clazz != null) { 273 return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class) 274 .newInstance(base, conf); 275 } 276 } catch (Exception e) { 277 LOG.error("Failed to wrap filesystem: " + e); 278 } 279 return base; 280 } 281 282 public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException { 283 return addLocationsOrderInterceptor(conf, new ReorderWALBlocks()); 284 } 285 286 /** 287 * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient linked to 288 * this FileSystem. See HBASE-6435 for the background. 289 * <p/> 290 * There should be no reason, except testing, to create a specific ReorderBlocks. 291 * @return true if the interceptor was added, false otherwise. 292 */ 293 static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) { 294 if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) { // activated by default 295 LOG.debug("addLocationsOrderInterceptor configured to false"); 296 return false; 297 } 298 299 FileSystem fs; 300 try { 301 fs = FileSystem.get(conf); 302 } catch (IOException e) { 303 LOG.warn("Can't get the file system from the conf.", e); 304 return false; 305 } 306 307 if (!(fs instanceof DistributedFileSystem)) { 308 LOG.debug("The file system is not a DistributedFileSystem. " 309 + "Skipping on block location reordering"); 310 return false; 311 } 312 313 DistributedFileSystem dfs = (DistributedFileSystem) fs; 314 DFSClient dfsc = dfs.getClient(); 315 if (dfsc == null) { 316 LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " 317 + "block reordering interceptor. Continuing, but this is unexpected."); 318 return false; 319 } 320 321 try { 322 Field nf = DFSClient.class.getDeclaredField("namenode"); 323 nf.setAccessible(true); 324 Field modifiersField = Field.class.getDeclaredField("modifiers"); 325 modifiersField.setAccessible(true); 326 modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL); 327 328 ClientProtocol namenode = (ClientProtocol) nf.get(dfsc); 329 if (namenode == null) { 330 LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" 331 + " reordering interceptor. Continuing, but this is unexpected."); 332 return false; 333 } 334 335 ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf); 336 nf.set(dfsc, cp1); 337 LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" 338 + " using class " + lrb.getClass().getName()); 339 } catch (NoSuchFieldException e) { 340 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); 341 return false; 342 } catch (IllegalAccessException e) { 343 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); 344 return false; 345 } 346 347 return true; 348 } 349 350 private static ClientProtocol createReorderingProxy(final ClientProtocol cp, 351 final ReorderBlocks lrb, final Configuration conf) { 352 return (ClientProtocol) Proxy.newProxyInstance(cp.getClass().getClassLoader(), 353 new Class[] { ClientProtocol.class, Closeable.class }, new InvocationHandler() { 354 @Override 355 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 356 try { 357 if ((args == null || args.length == 0) && "close".equals(method.getName())) { 358 RPC.stopProxy(cp); 359 return null; 360 } else { 361 Object res = method.invoke(cp, args); 362 if ( 363 res != null && args != null && args.length == 3 364 && "getBlockLocations".equals(method.getName()) && res instanceof LocatedBlocks 365 && args[0] instanceof String && args[0] != null 366 ) { 367 lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]); 368 } 369 return res; 370 } 371 } catch (InvocationTargetException ite) { 372 // We will have this for all the exception, checked on not, sent 373 // by any layer, including the functional exception 374 Throwable cause = ite.getCause(); 375 if (cause == null) { 376 throw new RuntimeException("Proxy invocation failed and getCause is null", ite); 377 } 378 if (cause instanceof UndeclaredThrowableException) { 379 Throwable causeCause = cause.getCause(); 380 if (causeCause == null) { 381 throw new RuntimeException("UndeclaredThrowableException had null cause!"); 382 } 383 cause = cause.getCause(); 384 } 385 throw cause; 386 } 387 } 388 }); 389 } 390 391 /** 392 * Interface to implement to add a specific reordering logic in hdfs. 393 */ 394 interface ReorderBlocks { 395 /** 396 * @param conf - the conf to use 397 * @param lbs - the LocatedBlocks to reorder 398 * @param src - the file name currently read 399 * @throws IOException - if something went wrong 400 */ 401 void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException; 402 } 403 404 /** 405 * We're putting at lowest priority the wal files blocks that are on the same datanode as the 406 * original regionserver which created these files. This because we fear that the datanode is 407 * actually dead, so if we use it it will timeout. 408 */ 409 static class ReorderWALBlocks implements ReorderBlocks { 410 @Override 411 public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) 412 throws IOException { 413 414 ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src); 415 if (sn == null) { 416 // It's not an WAL 417 return; 418 } 419 420 // Ok, so it's an WAL 421 String hostName = sn.getHostname(); 422 if (LOG.isTraceEnabled()) { 423 LOG.trace(src + " is an WAL file, so reordering blocks, last hostname will be:" + hostName); 424 } 425 426 // Just check for all blocks 427 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 428 DatanodeInfo[] dnis = lb.getLocations(); 429 if (dnis != null && dnis.length > 1) { 430 boolean found = false; 431 for (int i = 0; i < dnis.length - 1 && !found; i++) { 432 if (hostName.equals(dnis[i].getHostName())) { 433 // advance the other locations by one and put this one at the last place. 434 DatanodeInfo toLast = dnis[i]; 435 System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1); 436 dnis[dnis.length - 1] = toLast; 437 found = true; 438 } 439 } 440 } 441 } 442 } 443 } 444 445 /** 446 * Create a new HFileSystem object, similar to FileSystem.get(). This returns a filesystem object 447 * that avoids checksum verification in the filesystem for hfileblock-reads. For these blocks, 448 * checksum verification is done by HBase. 449 */ 450 static public FileSystem get(Configuration conf) throws IOException { 451 return new HFileSystem(conf, true); 452 } 453 454 /** 455 * Wrap a LocalFileSystem within a HFileSystem. 456 */ 457 static public FileSystem getLocalFs(Configuration conf) throws IOException { 458 return new HFileSystem(FileSystem.getLocal(conf)); 459 } 460 461 /** 462 * The org.apache.hadoop.fs.FilterFileSystem does not yet support createNonRecursive. This is a 463 * hadoop bug and when it is fixed in Hadoop, this definition will go away. 464 */ 465 @Override 466 @SuppressWarnings("deprecation") 467 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, 468 short replication, long blockSize, Progressable progress) throws IOException { 469 return fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, progress); 470 } 471}