001/* 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.hbase.fs; 022 023import edu.umd.cs.findbugs.annotations.Nullable; 024import java.io.Closeable; 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationHandler; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.lang.reflect.Modifier; 031import java.lang.reflect.Proxy; 032import java.lang.reflect.UndeclaredThrowableException; 033import java.net.URI; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.FilterFileSystem; 038import org.apache.hadoop.fs.LocalFileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.util.ReflectionUtils; 043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 044import org.apache.hadoop.hdfs.DFSClient; 045import org.apache.hadoop.hdfs.DistributedFileSystem; 046import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 047import org.apache.hadoop.hdfs.protocol.ClientProtocol; 048import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 049import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 050import org.apache.hadoop.hdfs.protocol.LocatedBlock; 051import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 052import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 053import org.apache.hadoop.ipc.RPC; 054import org.apache.hadoop.util.Progressable; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * An encapsulation for the FileSystem object that hbase uses to access 061 * data. This class allows the flexibility of using 062 * separate filesystem objects for reading and writing hfiles and wals. 063 */ 064@InterfaceAudience.Private 065public class HFileSystem extends FilterFileSystem { 066 public static final Logger LOG = LoggerFactory.getLogger(HFileSystem.class); 067 068 private final FileSystem noChecksumFs; // read hfile data from storage 069 private final boolean useHBaseChecksum; 070 private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; 071 072 /** 073 * Create a FileSystem object for HBase regionservers. 074 * @param conf The configuration to be used for the filesystem 075 * @param useHBaseChecksum if true, then use 076 * checksum verfication in hbase, otherwise 077 * delegate checksum verification to the FileSystem. 078 */ 079 public HFileSystem(Configuration conf, boolean useHBaseChecksum) 080 throws IOException { 081 082 // Create the default filesystem with checksum verification switched on. 083 // By default, any operation to this FilterFileSystem occurs on 084 // the underlying filesystem that has checksums switched on. 085 // This FS#get(URI, conf) clearly indicates in the javadoc that if the FS is 086 // not created it will initialize the FS and return that created FS. If it is 087 // already created it will just return the FS that was already created. 088 // We take pains to funnel all of our FileSystem instantiation through this call to ensure 089 // we never need to call FS.initialize ourself so that we do not have to track any state to 090 // avoid calling initialize more than once. 091 this.fs = FileSystem.get(getDefaultUri(conf), conf); 092 this.useHBaseChecksum = useHBaseChecksum; 093 094 // disable checksum verification for local fileSystem, see HBASE-11218 095 if (fs instanceof LocalFileSystem) { 096 fs.setWriteChecksum(false); 097 fs.setVerifyChecksum(false); 098 } 099 100 addLocationsOrderInterceptor(conf); 101 102 // If hbase checksum verification is switched on, then create a new 103 // filesystem object that has cksum verification turned off. 104 // We will avoid verifying checksums in the fs client, instead do it 105 // inside of hbase. 106 // If this is the local file system hadoop has a bug where seeks 107 // do not go to the correct location if setVerifyChecksum(false) is called. 108 // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read 109 // their header magic numbers. See HBASE-5885 110 if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) { 111 conf = new Configuration(conf); 112 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 113 this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf); 114 this.noChecksumFs.setVerifyChecksum(false); 115 } else { 116 this.noChecksumFs = maybeWrapFileSystem(fs, conf); 117 } 118 119 this.fs = maybeWrapFileSystem(this.fs, conf); 120 } 121 122 /** 123 * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and 124 * writefs are both set to be the same specified fs. 125 * Do not verify hbase-checksums while reading data from filesystem. 126 * @param fs Set the noChecksumFs and writeFs to this specified filesystem. 127 */ 128 public HFileSystem(FileSystem fs) { 129 this.fs = fs; 130 this.noChecksumFs = fs; 131 this.useHBaseChecksum = false; 132 } 133 134 /** 135 * Returns the filesystem that is specially setup for 136 * doing reads from storage. This object avoids doing 137 * checksum verifications for reads. 138 * @return The FileSystem object that can be used to read data 139 * from files. 140 */ 141 public FileSystem getNoChecksumFs() { 142 return noChecksumFs; 143 } 144 145 /** 146 * Returns the underlying filesystem 147 * @return The underlying FileSystem for this FilterFileSystem object. 148 */ 149 public FileSystem getBackingFs() throws IOException { 150 return fs; 151 } 152 153 /** 154 * Set the source path (directory/file) to the specified storage policy. 155 * @param path The source path (directory/file). 156 * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. 157 * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 158 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 159 */ 160 public void setStoragePolicy(Path path, String policyName) { 161 CommonFSUtils.setStoragePolicy(this.fs, path, policyName); 162 } 163 164 /** 165 * Get the storage policy of the source path (directory/file). 166 * @param path The source path (directory/file). 167 * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or 168 * exception thrown when trying to get policy 169 */ 170 @Nullable 171 public String getStoragePolicyName(Path path) { 172 try { 173 Object blockStoragePolicySpi = 174 ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path); 175 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 176 } catch (Exception e) { 177 // Maybe fail because of using old HDFS version, try the old way 178 if (LOG.isTraceEnabled()) { 179 LOG.trace("Failed to get policy directly", e); 180 } 181 return getStoragePolicyForOldHDFSVersion(path); 182 } 183 } 184 185 /** 186 * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need 187 * to keep compatible with it. See HADOOP-12161 for more details. 188 * @param path Path to get storage policy against 189 * @return the storage policy name 190 */ 191 private String getStoragePolicyForOldHDFSVersion(Path path) { 192 try { 193 if (this.fs instanceof DistributedFileSystem) { 194 DistributedFileSystem dfs = (DistributedFileSystem) this.fs; 195 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 196 if (null != status) { 197 if (unspecifiedStoragePolicyId < 0) { 198 // Get the unspecified id field through reflection to avoid compilation error. 199 // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to 200 // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED 201 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 202 unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class); 203 } 204 byte storagePolicyId = status.getStoragePolicy(); 205 if (storagePolicyId != unspecifiedStoragePolicyId) { 206 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 207 for (BlockStoragePolicy policy : policies) { 208 if (policy.getId() == storagePolicyId) { 209 return policy.getName(); 210 } 211 } 212 } 213 } 214 } 215 } catch (Throwable e) { 216 LOG.warn("failed to get block storage policy of [" + path + "]", e); 217 } 218 219 return null; 220 } 221 222 /** 223 * Are we verifying checksums in HBase? 224 * @return True, if hbase is configured to verify checksums, 225 * otherwise false. 226 */ 227 public boolean useHBaseChecksum() { 228 return useHBaseChecksum; 229 } 230 231 /** 232 * Close this filesystem object 233 */ 234 @Override 235 public void close() throws IOException { 236 super.close(); 237 if (this.noChecksumFs != fs) { 238 this.noChecksumFs.close(); 239 } 240 } 241 242 /** 243 * Returns a brand new instance of the FileSystem. It does not use 244 * the FileSystem.Cache. In newer versions of HDFS, we can directly 245 * invoke FileSystem.newInstance(Configuration). 246 * 247 * @param conf Configuration 248 * @return A new instance of the filesystem 249 */ 250 private static FileSystem newInstanceFileSystem(Configuration conf) throws IOException { 251 URI uri = FileSystem.getDefaultUri(conf); 252 FileSystem fs = null; 253 Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); 254 if (clazz != null) { 255 // This will be true for Hadoop 1.0, or 0.20. 256 fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); 257 fs.initialize(uri, conf); 258 } else { 259 // For Hadoop 2.0, we have to go through FileSystem for the filesystem 260 // implementation to be loaded by the service loader in case it has not 261 // been loaded yet. 262 Configuration clone = new Configuration(conf); 263 clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true); 264 fs = FileSystem.get(uri, clone); 265 } 266 if (fs == null) { 267 throw new IOException("No FileSystem for scheme: " + uri.getScheme()); 268 } 269 270 return fs; 271 } 272 273 /** 274 * Returns an instance of Filesystem wrapped into the class specified in 275 * hbase.fs.wrapper property, if one is set in the configuration, returns 276 * unmodified FS instance passed in as an argument otherwise. 277 * @param base Filesystem instance to wrap 278 * @param conf Configuration 279 * @return wrapped instance of FS, or the same instance if no wrapping configured. 280 */ 281 private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) { 282 try { 283 Class<?> clazz = conf.getClass("hbase.fs.wrapper", null); 284 if (clazz != null) { 285 return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class) 286 .newInstance(base, conf); 287 } 288 } catch (Exception e) { 289 LOG.error("Failed to wrap filesystem: " + e); 290 } 291 return base; 292 } 293 294 public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException { 295 return addLocationsOrderInterceptor(conf, new ReorderWALBlocks()); 296 } 297 298 /** 299 * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient 300 * linked to this FileSystem. See HBASE-6435 for the background. 301 * <p/> 302 * There should be no reason, except testing, to create a specific ReorderBlocks. 303 * 304 * @return true if the interceptor was added, false otherwise. 305 */ 306 static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) { 307 if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) { // activated by default 308 LOG.debug("addLocationsOrderInterceptor configured to false"); 309 return false; 310 } 311 312 FileSystem fs; 313 try { 314 fs = FileSystem.get(conf); 315 } catch (IOException e) { 316 LOG.warn("Can't get the file system from the conf.", e); 317 return false; 318 } 319 320 if (!(fs instanceof DistributedFileSystem)) { 321 LOG.debug("The file system is not a DistributedFileSystem. " + 322 "Skipping on block location reordering"); 323 return false; 324 } 325 326 DistributedFileSystem dfs = (DistributedFileSystem) fs; 327 DFSClient dfsc = dfs.getClient(); 328 if (dfsc == null) { 329 LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " + 330 "block reordering interceptor. Continuing, but this is unexpected." 331 ); 332 return false; 333 } 334 335 try { 336 Field nf = DFSClient.class.getDeclaredField("namenode"); 337 nf.setAccessible(true); 338 Field modifiersField = Field.class.getDeclaredField("modifiers"); 339 modifiersField.setAccessible(true); 340 modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL); 341 342 ClientProtocol namenode = (ClientProtocol) nf.get(dfsc); 343 if (namenode == null) { 344 LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" + 345 " reordering interceptor. Continuing, but this is unexpected." 346 ); 347 return false; 348 } 349 350 ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf); 351 nf.set(dfsc, cp1); 352 LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" + 353 " using class " + lrb.getClass().getName()); 354 } catch (NoSuchFieldException e) { 355 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); 356 return false; 357 } catch (IllegalAccessException e) { 358 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e); 359 return false; 360 } 361 362 return true; 363 } 364 365 private static ClientProtocol createReorderingProxy(final ClientProtocol cp, 366 final ReorderBlocks lrb, final Configuration conf) { 367 return (ClientProtocol) Proxy.newProxyInstance(cp.getClass().getClassLoader(), 368 new Class[]{ClientProtocol.class, Closeable.class}, new InvocationHandler() { 369 @Override 370 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 371 try { 372 if ((args == null || args.length == 0) && "close".equals(method.getName())) { 373 RPC.stopProxy(cp); 374 return null; 375 } else { 376 Object res = method.invoke(cp, args); 377 if (res != null && args != null && args.length == 3 378 && "getBlockLocations".equals(method.getName()) 379 && res instanceof LocatedBlocks 380 && args[0] instanceof String 381 && args[0] != null) { 382 lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]); 383 } 384 return res; 385 } 386 } catch (InvocationTargetException ite) { 387 // We will have this for all the exception, checked on not, sent 388 // by any layer, including the functional exception 389 Throwable cause = ite.getCause(); 390 if (cause == null){ 391 throw new RuntimeException("Proxy invocation failed and getCause is null", ite); 392 } 393 if (cause instanceof UndeclaredThrowableException) { 394 Throwable causeCause = cause.getCause(); 395 if (causeCause == null) { 396 throw new RuntimeException("UndeclaredThrowableException had null cause!"); 397 } 398 cause = cause.getCause(); 399 } 400 throw cause; 401 } 402 } 403 }); 404 } 405 406 /** 407 * Interface to implement to add a specific reordering logic in hdfs. 408 */ 409 interface ReorderBlocks { 410 /** 411 * 412 * @param conf - the conf to use 413 * @param lbs - the LocatedBlocks to reorder 414 * @param src - the file name currently read 415 * @throws IOException - if something went wrong 416 */ 417 void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException; 418 } 419 420 /** 421 * We're putting at lowest priority the wal files blocks that are on the same datanode 422 * as the original regionserver which created these files. This because we fear that the 423 * datanode is actually dead, so if we use it it will timeout. 424 */ 425 static class ReorderWALBlocks implements ReorderBlocks { 426 @Override 427 public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) 428 throws IOException { 429 430 ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src); 431 if (sn == null) { 432 // It's not an WAL 433 return; 434 } 435 436 // Ok, so it's an WAL 437 String hostName = sn.getHostname(); 438 if (LOG.isTraceEnabled()) { 439 LOG.trace(src + 440 " is an WAL file, so reordering blocks, last hostname will be:" + hostName); 441 } 442 443 // Just check for all blocks 444 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 445 DatanodeInfo[] dnis = lb.getLocations(); 446 if (dnis != null && dnis.length > 1) { 447 boolean found = false; 448 for (int i = 0; i < dnis.length - 1 && !found; i++) { 449 if (hostName.equals(dnis[i].getHostName())) { 450 // advance the other locations by one and put this one at the last place. 451 DatanodeInfo toLast = dnis[i]; 452 System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1); 453 dnis[dnis.length - 1] = toLast; 454 found = true; 455 } 456 } 457 } 458 } 459 } 460 } 461 462 /** 463 * Create a new HFileSystem object, similar to FileSystem.get(). 464 * This returns a filesystem object that avoids checksum 465 * verification in the filesystem for hfileblock-reads. 466 * For these blocks, checksum verification is done by HBase. 467 */ 468 static public FileSystem get(Configuration conf) throws IOException { 469 return new HFileSystem(conf, true); 470 } 471 472 /** 473 * Wrap a LocalFileSystem within a HFileSystem. 474 */ 475 static public FileSystem getLocalFs(Configuration conf) throws IOException { 476 return new HFileSystem(FileSystem.getLocal(conf)); 477 } 478 479 /** 480 * The org.apache.hadoop.fs.FilterFileSystem does not yet support 481 * createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop, 482 * this definition will go away. 483 */ 484 @Override 485 @SuppressWarnings("deprecation") 486 public FSDataOutputStream createNonRecursive(Path f, 487 boolean overwrite, 488 int bufferSize, short replication, long blockSize, 489 Progressable progress) throws IOException { 490 return fs.createNonRecursive(f, overwrite, bufferSize, replication, 491 blockSize, progress); 492 } 493}