001/* 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.hbase.fs; 022 023import edu.umd.cs.findbugs.annotations.Nullable; 024import java.io.Closeable; 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationHandler; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.lang.reflect.Modifier; 031import java.lang.reflect.Proxy; 032import java.lang.reflect.UndeclaredThrowableException; 033import java.net.URI; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.FilterFileSystem; 038import org.apache.hadoop.fs.LocalFileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.util.FSUtils; 042import org.apache.hadoop.hbase.util.ReflectionUtils; 043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 044import org.apache.hadoop.hdfs.DFSClient; 045import org.apache.hadoop.hdfs.DistributedFileSystem; 046import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 047import org.apache.hadoop.hdfs.protocol.ClientProtocol; 048import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 049import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 050import org.apache.hadoop.hdfs.protocol.LocatedBlock; 051import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 052import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 053import org.apache.hadoop.ipc.RPC; 054import org.apache.hadoop.util.Progressable; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * An encapsulation for the FileSystem object that hbase uses to access 061 * data. This class allows the flexibility of using 062 * separate filesystem objects for reading and writing hfiles and wals. 063 */ 064@InterfaceAudience.Private 065public class HFileSystem extends FilterFileSystem { 066 public static final Logger LOG = LoggerFactory.getLogger(HFileSystem.class); 067 068 private final FileSystem noChecksumFs; // read hfile data from storage 069 private final boolean useHBaseChecksum; 070 private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; 071 072 /** 073 * Create a FileSystem object for HBase regionservers. 074 * @param conf The configuration to be used for the filesystem 075 * @param useHBaseChecksum if true, then use 076 * checksum verfication in hbase, otherwise 077 * delegate checksum verification to the FileSystem. 078 */ 079 public HFileSystem(Configuration conf, boolean useHBaseChecksum) 080 throws IOException { 081 082 // Create the default filesystem with checksum verification switched on. 083 // By default, any operation to this FilterFileSystem occurs on 084 // the underlying filesystem that has checksums switched on. 085 this.fs = FileSystem.get(conf); 086 this.useHBaseChecksum = useHBaseChecksum; 087 088 fs.initialize(getDefaultUri(conf), conf); 089 090 // disable checksum verification for local fileSystem, see HBASE-11218 091 if (fs instanceof LocalFileSystem) { 092 fs.setWriteChecksum(false); 093 fs.setVerifyChecksum(false); 094 } 095 096 addLocationsOrderInterceptor(conf); 097 098 // If hbase checksum verification is switched on, then create a new 099 // filesystem object that has cksum verification turned off. 100 // We will avoid verifying checksums in the fs client, instead do it 101 // inside of hbase. 102 // If this is the local file system hadoop has a bug where seeks 103 // do not go to the correct location if setVerifyChecksum(false) is called. 104 // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read 105 // their header magic numbers. See HBASE-5885 106 if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) { 107 conf = new Configuration(conf); 108 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 109 this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf); 110 this.noChecksumFs.setVerifyChecksum(false); 111 } else { 112 this.noChecksumFs = maybeWrapFileSystem(fs, conf); 113 } 114 115 this.fs = maybeWrapFileSystem(this.fs, conf); 116 } 117 118 /** 119 * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and 120 * writefs are both set to be the same specified fs. 121 * Do not verify hbase-checksums while reading data from filesystem. 122 * @param fs Set the noChecksumFs and writeFs to this specified filesystem. 123 */ 124 public HFileSystem(FileSystem fs) { 125 this.fs = fs; 126 this.noChecksumFs = fs; 127 this.useHBaseChecksum = false; 128 } 129 130 /** 131 * Returns the filesystem that is specially setup for 132 * doing reads from storage. This object avoids doing 133 * checksum verifications for reads. 134 * @return The FileSystem object that can be used to read data 135 * from files. 136 */ 137 public FileSystem getNoChecksumFs() { 138 return noChecksumFs; 139 } 140 141 /** 142 * Returns the underlying filesystem 143 * @return The underlying FileSystem for this FilterFileSystem object. 144 */ 145 public FileSystem getBackingFs() throws IOException { 146 return fs; 147 } 148 149 /** 150 * Set the source path (directory/file) to the specified storage policy. 151 * @param path The source path (directory/file). 152 * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. 153 * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 154 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 155 */ 156 public void setStoragePolicy(Path path, String policyName) { 157 FSUtils.setStoragePolicy(this.fs, path, policyName); 158 } 159 160 /** 161 * Get the storage policy of the source path (directory/file). 162 * @param path The source path (directory/file). 163 * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or 164 * exception thrown when trying to get policy 165 */ 166 @Nullable 167 public String getStoragePolicyName(Path path) { 168 try { 169 Object blockStoragePolicySpi = 170 ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path); 171 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 172 } catch (Exception e) { 173 // Maybe fail because of using old HDFS version, try the old way 174 if (LOG.isTraceEnabled()) { 175 LOG.trace("Failed to get policy directly", e); 176 } 177 return getStoragePolicyForOldHDFSVersion(path); 178 } 179 } 180 181 /** 182 * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need 183 * to keep compatible with it. See HADOOP-12161 for more details. 184 * @param path Path to get storage policy against 185 * @return the storage policy name 186 */ 187 private String getStoragePolicyForOldHDFSVersion(Path path) { 188 try { 189 if (this.fs instanceof DistributedFileSystem) { 190 DistributedFileSystem dfs = (DistributedFileSystem) this.fs; 191 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 192 if (null != status) { 193 if (unspecifiedStoragePolicyId < 0) { 194 // Get the unspecified id field through reflection to avoid compilation error. 195 // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to 196 // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED 197 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 198 unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class); 199 } 200 byte storagePolicyId = status.getStoragePolicy(); 201 if (storagePolicyId != unspecifiedStoragePolicyId) { 202 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 203 for (BlockStoragePolicy policy : policies) { 204 if (policy.getId() == storagePolicyId) { 205 return policy.getName(); 206 } 207 } 208 } 209 } 210 } 211 } catch (Throwable e) { 212 LOG.warn("failed to get block storage policy of [" + path + "]", e); 213 } 214 215 return null; 216 } 217 218 /** 219 * Are we verifying checksums in HBase? 220 * @return True, if hbase is configured to verify checksums, 221 * otherwise false. 222 */ 223 public boolean useHBaseChecksum() { 224 return useHBaseChecksum; 225 } 226 227 /** 228 * Close this filesystem object 229 */ 230 @Override 231 public void close() throws IOException { 232 super.close(); 233 if (this.noChecksumFs != fs) { 234 this.noChecksumFs.close(); 235 } 236 } 237 238 /** 239 * Returns a brand new instance of the FileSystem. It does not use 240 * the FileSystem.Cache. In newer versions of HDFS, we can directly 241 * invoke FileSystem.newInstance(Configuration). 242 * 243 * @param conf Configuration 244 * @return A new instance of the filesystem 245 */ 246 private static FileSystem newInstanceFileSystem(Configuration conf) throws IOException { 247 URI uri = FileSystem.getDefaultUri(conf); 248 FileSystem fs = null; 249 Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); 250 if (clazz != null) { 251 // This will be true for Hadoop 1.0, or 0.20. 252 fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); 253 fs.initialize(uri, conf); 254 } else { 255 // For Hadoop 2.0, we have to go through FileSystem for the filesystem 256 // implementation to be loaded by the service loader in case it has not 257 // been loaded yet. 258 Configuration clone = new Configuration(conf); 259 clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true); 260 fs = FileSystem.get(uri, clone); 261 } 262 if (fs == null) { 263 throw new IOException("No FileSystem for scheme: " + uri.getScheme()); 264 } 265 266 return fs; 267 } 268 269 /** 270 * Returns an instance of Filesystem wrapped into the class specified in 271 * hbase.fs.wrapper property, if one is set in the configuration, returns 272 * unmodified FS instance passed in as an argument otherwise. 273 * @param base Filesystem instance to wrap 274 * @param conf Configuration 275 * @return wrapped instance of FS, or the same instance if no wrapping configured. 276 */ 277 private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) { 278 try { 279 Class<?> clazz = conf.getClass("hbase.fs.wrapper", null); 280 if (clazz != null) { 281 return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class) 282 .newInstance(base, conf); 283 } 284 } catch (Exception e) { 285 LOG.error("Failed to wrap filesystem: " + e); 286 } 287 return base; 288 } 289 290 public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException { 291 return addLocationsOrderInterceptor(conf, new ReorderWALBlocks()); 292 } 293 294 /** 295 * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient 296 * linked to this FileSystem. See HBASE-6435 for the background. 297 * <p/> 298 * There should be no reason, except testing, to create a specific ReorderBlocks. 299 * 300 * @return true if the interceptor was added, false otherwise. 301 */ 302 static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) { 303 if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) { // activated by default 304 LOG.debug("addLocationsOrderInterceptor configured to false"); 305 return false; 306 } 307 308 FileSystem fs; 309 try { 310 fs = FileSystem.get(conf); 311 } catch (IOException e) { 312 LOG.warn("Can't get the file system from the conf.", e); 313 return false; 314 } 315 316 if (!(fs instanceof DistributedFileSystem)) { 317 LOG.debug("The file system is not a DistributedFileSystem. " + 318 "Skipping on block location reordering"); 319 return false; 320 } 321 322 DistributedFileSystem dfs = (DistributedFileSystem) fs; 323 DFSClient dfsc = dfs.getClient(); 324 if (dfsc == null) { 325 LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " + 326 "block reordering interceptor. Continuing, but this is unexpected." 327 ); 328 return false; 329 } 330 331 try { 332 Field nf = DFSClient.class.getDeclaredField("namenode"); 333 nf.setAccessible(true); 334 Field modifiersField = Field.class.getDeclaredField("modifiers"); 335 modifiersField.setAccessible(true); 336 modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL); 337 338 ClientProtocol namenode = (ClientProtocol) nf.get(dfsc); 339 if (namenode == null) { 340 LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" + 341 " reordering interceptor. Continuing, but this is unexpected." 342 ); 343 return false; 344 } 345 346 ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf); 347 nf.set(dfsc, cp1); 348 LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" + 349 " using class " + lrb.getClass().getName()); 350 } catch (NoSuchFieldException e) { 351 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); 352 return false; 353 } catch (IllegalAccessException e) { 354 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); 355 return false; 356 } 357 358 return true; 359 } 360 361 private static ClientProtocol createReorderingProxy(final ClientProtocol cp, 362 final ReorderBlocks lrb, final Configuration conf) { 363 return (ClientProtocol) Proxy.newProxyInstance(cp.getClass().getClassLoader(), 364 new Class[]{ClientProtocol.class, Closeable.class}, new InvocationHandler() { 365 @Override 366 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 367 try { 368 if ((args == null || args.length == 0) && "close".equals(method.getName())) { 369 RPC.stopProxy(cp); 370 return null; 371 } else { 372 Object res = method.invoke(cp, args); 373 if (res != null && args != null && args.length == 3 374 && "getBlockLocations".equals(method.getName()) 375 && res instanceof LocatedBlocks 376 && args[0] instanceof String 377 && args[0] != null) { 378 lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]); 379 } 380 return res; 381 } 382 } catch (InvocationTargetException ite) { 383 // We will have this for all the exception, checked on not, sent 384 // by any layer, including the functional exception 385 Throwable cause = ite.getCause(); 386 if (cause == null){ 387 throw new RuntimeException("Proxy invocation failed and getCause is null", ite); 388 } 389 if (cause instanceof UndeclaredThrowableException) { 390 Throwable causeCause = cause.getCause(); 391 if (causeCause == null) { 392 throw new RuntimeException("UndeclaredThrowableException had null cause!"); 393 } 394 cause = cause.getCause(); 395 } 396 throw cause; 397 } 398 } 399 }); 400 } 401 402 /** 403 * Interface to implement to add a specific reordering logic in hdfs. 404 */ 405 interface ReorderBlocks { 406 /** 407 * 408 * @param conf - the conf to use 409 * @param lbs - the LocatedBlocks to reorder 410 * @param src - the file name currently read 411 * @throws IOException - if something went wrong 412 */ 413 void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException; 414 } 415 416 /** 417 * We're putting at lowest priority the wal files blocks that are on the same datanode 418 * as the original regionserver which created these files. This because we fear that the 419 * datanode is actually dead, so if we use it it will timeout. 420 */ 421 static class ReorderWALBlocks implements ReorderBlocks { 422 @Override 423 public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) 424 throws IOException { 425 426 ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src); 427 if (sn == null) { 428 // It's not an WAL 429 return; 430 } 431 432 // Ok, so it's an WAL 433 String hostName = sn.getHostname(); 434 if (LOG.isTraceEnabled()) { 435 LOG.trace(src + 436 " is an WAL file, so reordering blocks, last hostname will be:" + hostName); 437 } 438 439 // Just check for all blocks 440 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 441 DatanodeInfo[] dnis = lb.getLocations(); 442 if (dnis != null && dnis.length > 1) { 443 boolean found = false; 444 for (int i = 0; i < dnis.length - 1 && !found; i++) { 445 if (hostName.equals(dnis[i].getHostName())) { 446 // advance the other locations by one and put this one at the last place. 447 DatanodeInfo toLast = dnis[i]; 448 System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1); 449 dnis[dnis.length - 1] = toLast; 450 found = true; 451 } 452 } 453 } 454 } 455 } 456 } 457 458 /** 459 * Create a new HFileSystem object, similar to FileSystem.get(). 460 * This returns a filesystem object that avoids checksum 461 * verification in the filesystem for hfileblock-reads. 462 * For these blocks, checksum verification is done by HBase. 463 */ 464 static public FileSystem get(Configuration conf) throws IOException { 465 return new HFileSystem(conf, true); 466 } 467 468 /** 469 * Wrap a LocalFileSystem within a HFileSystem. 470 */ 471 static public FileSystem getLocalFs(Configuration conf) throws IOException { 472 return new HFileSystem(FileSystem.getLocal(conf)); 473 } 474 475 /** 476 * The org.apache.hadoop.fs.FilterFileSystem does not yet support 477 * createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop, 478 * this definition will go away. 479 */ 480 @Override 481 @SuppressWarnings("deprecation") 482 public FSDataOutputStream createNonRecursive(Path f, 483 boolean overwrite, 484 int bufferSize, short replication, long blockSize, 485 Progressable progress) throws IOException { 486 return fs.createNonRecursive(f, overwrite, bufferSize, replication, 487 blockSize, progress); 488 } 489}