001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicReference; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 031import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 032import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader; 033import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader; 034import org.apache.hadoop.hbase.util.CancelableProgressable; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 037import org.apache.hadoop.hbase.wal.WALProvider.Writer; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 043 044/** 045 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the 046 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the 047 * configuration setting "hbase.wal.provider". Available implementations: 048 * <ul> 049 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 050 * "asyncfs"</li> 051 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 052 * FileSystem interface via an asynchronous client.</li> 053 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 054 * FileSystem interface via HDFS's synchronous DFSClient.</li> 055 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 056 * server.</li> 057 * </ul> 058 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 059 */ 060@InterfaceAudience.Private 061public class WALFactory { 062 063 /** 064 * Used in tests for injecting customized stream reader implementation, for example, inject fault 065 * when reading, etc. 066 * <p/> 067 * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we 068 * will also determine whether the WAL file is encrypted and we should use 069 * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the 070 * header of the WAL file, so we do not need to specify a specical reader to read the WAL file 071 * either. 072 * <p/> 073 * So typically you should not use this config in production. 074 */ 075 public static final String WAL_STREAM_READER_CLASS_IMPL = 076 "hbase.regionserver.wal.stream.reader.impl"; 077 078 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 079 080 /** 081 * Maps between configuration names for providers and implementation classes. 082 */ 083 enum Providers { 084 defaultProvider(AsyncFSWALProvider.class), 085 filesystem(FSHLogProvider.class), 086 multiwal(RegionGroupingProvider.class), 087 asyncfs(AsyncFSWALProvider.class); 088 089 final Class<? extends WALProvider> clazz; 090 091 Providers(Class<? extends WALProvider> clazz) { 092 this.clazz = clazz; 093 } 094 } 095 096 public static final String WAL_PROVIDER = "hbase.wal.provider"; 097 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 098 099 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 100 101 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 102 103 final String factoryId; 104 final Abortable abortable; 105 private final WALProvider provider; 106 // The meta updates are written to a different wal. If this 107 // regionserver holds meta regions, then this ref will be non-null. 108 // lazily intialized; most RegionServers don't deal with META 109 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 110 111 /** 112 * Configuration-specified WAL Reader used when a custom reader is requested 113 */ 114 private final Class<? extends WALStreamReader> walStreamReaderClass; 115 116 /** 117 * How long to attempt opening in-recovery wals 118 */ 119 private final int timeoutMillis; 120 121 private final Configuration conf; 122 123 private final ExcludeDatanodeManager excludeDatanodeManager; 124 125 // Used for the singleton WALFactory, see below. 126 private WALFactory(Configuration conf) { 127 // this code is duplicated here so we can keep our members final. 128 // until we've moved reader/writer construction down into providers, this initialization must 129 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 130 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 131 /* TODO Both of these are probably specific to the fs wal provider */ 132 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 133 ProtobufWALStreamReader.class, WALStreamReader.class); 134 Preconditions.checkArgument( 135 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 136 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 137 AbstractFSWALProvider.Initializer.class.getName()); 138 this.conf = conf; 139 // end required early initialization 140 141 // this instance can't create wals, just reader/writers. 142 provider = null; 143 factoryId = SINGLETON_ID; 144 this.abortable = null; 145 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 146 } 147 148 Providers getDefaultProvider() { 149 return Providers.defaultProvider; 150 } 151 152 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 153 try { 154 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 155 156 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 157 // the default and we can't use it, we want to fall back to FSHLog which we know works on 158 // all versions. 159 if ( 160 provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 161 && !AsyncFSWALProvider.load() 162 ) { 163 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 164 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 165 // easily broken when upgrading hadoop. 166 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 167 return FSHLogProvider.class; 168 } 169 170 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 171 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 172 // not fall back to FSHLogProvider. 173 return provider.clazz; 174 } catch (IllegalArgumentException exception) { 175 // Fall back to them specifying a class name 176 // Note that the passed default class shouldn't actually be used, since the above only fails 177 // when there is a config value present. 178 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 179 } 180 } 181 182 static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException { 183 LOG.info("Instantiating WALProvider of type {}", clazz); 184 try { 185 return clazz.getDeclaredConstructor().newInstance(); 186 } catch (Exception e) { 187 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 188 LOG.debug("Exception details for failure to load WALProvider.", e); 189 throw new IOException("couldn't set up WALProvider", e); 190 } 191 } 192 193 /** 194 * @param conf must not be null, will keep a reference to read params in later reader/writer 195 * instances. 196 * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations 197 * to make a directory 198 */ 199 public WALFactory(Configuration conf, String factoryId) throws IOException { 200 // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider 201 // for HMaster or HRegionServer which take system table only. See HBASE-19999 202 this(conf, factoryId, null, true); 203 } 204 205 /** 206 * @param conf must not be null, will keep a reference to read params 207 * in later reader/writer instances. 208 * @param factoryId a unique identifier for this factory. used i.e. by 209 * filesystem implementations to make a directory 210 * @param abortable the server associated with this WAL file 211 * @param enableSyncReplicationWALProvider whether wrap the wal provider to a 212 * {@link SyncReplicationWALProvider} 213 */ 214 public WALFactory(Configuration conf, String factoryId, Abortable abortable, 215 boolean enableSyncReplicationWALProvider) throws IOException { 216 // until we've moved reader/writer construction down into providers, this initialization must 217 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 218 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 219 /* TODO Both of these are probably specific to the fs wal provider */ 220 walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL, 221 ProtobufWALStreamReader.class, WALStreamReader.class); 222 Preconditions.checkArgument( 223 AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass), 224 "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(), 225 AbstractFSWALProvider.Initializer.class.getName()); 226 this.conf = conf; 227 this.factoryId = factoryId; 228 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 229 this.abortable = abortable; 230 // end required early initialization 231 if (conf.getBoolean(WAL_ENABLED, true)) { 232 WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 233 if (enableSyncReplicationWALProvider) { 234 provider = new SyncReplicationWALProvider(provider); 235 } 236 provider.init(this, conf, null, this.abortable); 237 provider.addWALActionsListener(new MetricsWAL()); 238 this.provider = provider; 239 } else { 240 // special handling of existing configuration behavior. 241 LOG.warn("Running with WAL disabled."); 242 provider = new DisabledWALProvider(); 243 provider.init(this, conf, factoryId, null); 244 } 245 } 246 247 /** 248 * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to 249 * replay and edits that have gone to any wals from this factory. 250 */ 251 public void close() throws IOException { 252 final WALProvider metaProvider = this.metaProvider.get(); 253 if (null != metaProvider) { 254 metaProvider.close(); 255 } 256 // close is called on a WALFactory with null provider in the case of contention handling 257 // within the getInstance method. 258 if (null != provider) { 259 provider.close(); 260 } 261 } 262 263 /** 264 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you 265 * are not ending cleanly and will need to replay edits from this factory's wals, use this method 266 * if you can as it will try to leave things as tidy as possible. 267 */ 268 public void shutdown() throws IOException { 269 IOException exception = null; 270 final WALProvider metaProvider = this.metaProvider.get(); 271 if (null != metaProvider) { 272 try { 273 metaProvider.shutdown(); 274 } catch (IOException ioe) { 275 exception = ioe; 276 } 277 } 278 provider.shutdown(); 279 if (null != exception) { 280 throw exception; 281 } 282 } 283 284 public List<WAL> getWALs() { 285 return provider.getWALs(); 286 } 287 288 /** 289 * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of 290 * creating the first hbase:meta WAL so we can register a listener. 291 * @see #getMetaWALProvider() 292 */ 293 public WALProvider getMetaProvider() throws IOException { 294 for (;;) { 295 WALProvider provider = this.metaProvider.get(); 296 if (provider != null) { 297 return provider; 298 } 299 Class<? extends WALProvider> clz = null; 300 if (conf.get(META_WAL_PROVIDER) == null) { 301 try { 302 clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); 303 } catch (Throwable t) { 304 // the WAL provider should be an enum. Proceed 305 } 306 } 307 if (clz == null) { 308 clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 309 } 310 provider = createProvider(clz); 311 provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable); 312 provider.addWALActionsListener(new MetricsWAL()); 313 if (metaProvider.compareAndSet(null, provider)) { 314 return provider; 315 } else { 316 // someone is ahead of us, close and try again. 317 provider.close(); 318 } 319 } 320 } 321 322 /** 323 * @param region the region which we want to get a WAL for. Could be null. 324 */ 325 public WAL getWAL(RegionInfo region) throws IOException { 326 // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. 327 if ( 328 region != null && region.isMetaRegion() 329 && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID 330 ) { 331 return getMetaProvider().getWAL(region); 332 } else { 333 return provider.getWAL(region); 334 } 335 } 336 337 public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException { 338 return createStreamReader(fs, path, (CancelableProgressable) null); 339 } 340 341 /** 342 * Create a one-way stream reader for the WAL. 343 * @return A WAL reader. Close when done with it. 344 */ 345 public WALStreamReader createStreamReader(FileSystem fs, Path path, 346 CancelableProgressable reporter) throws IOException { 347 return createStreamReader(fs, path, reporter, -1); 348 } 349 350 /** 351 * Create a one-way stream reader for the WAL, and start reading from the given 352 * {@code startPosition}. 353 * @return A WAL reader. Close when done with it. 354 */ 355 public WALStreamReader createStreamReader(FileSystem fs, Path path, 356 CancelableProgressable reporter, long startPosition) throws IOException { 357 try { 358 // A wal file could be under recovery, so it may take several 359 // tries to get it open. Instead of claiming it is corrupted, retry 360 // to open it up to 5 minutes by default. 361 long startWaiting = EnvironmentEdgeManager.currentTime(); 362 long openTimeout = timeoutMillis + startWaiting; 363 int nbAttempt = 0; 364 WALStreamReader reader = null; 365 while (true) { 366 try { 367 reader = walStreamReaderClass.getDeclaredConstructor().newInstance(); 368 ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition); 369 return reader; 370 } catch (Exception e) { 371 // catch Exception so that we close reader for all exceptions. If we don't 372 // close the reader, we leak a socket. 373 if (reader != null) { 374 reader.close(); 375 } 376 377 // Only inspect the Exception to consider retry when it's an IOException 378 if (e instanceof IOException) { 379 String msg = e.getMessage(); 380 if ( 381 msg != null && (msg.contains("Cannot obtain block length") 382 || msg.contains("Could not obtain the last block") 383 || msg.matches("Blocklist for [^ ]* has changed.*")) 384 ) { 385 if (++nbAttempt == 1) { 386 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 387 } 388 if (reporter != null && !reporter.progress()) { 389 throw new InterruptedIOException("Operation is cancelled"); 390 } 391 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 392 LOG.error("Can't open after " + nbAttempt + " attempts and " 393 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 394 } else { 395 try { 396 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 397 continue; // retry 398 } catch (InterruptedException ie) { 399 InterruptedIOException iioe = new InterruptedIOException(); 400 iioe.initCause(ie); 401 throw iioe; 402 } 403 } 404 throw new LeaseNotRecoveredException(e); 405 } 406 } 407 408 // Rethrow the original exception if we are not retrying due to HDFS-isms. 409 throw e; 410 } 411 } 412 } catch (IOException ie) { 413 throw ie; 414 } catch (Exception e) { 415 throw new IOException("Cannot get log reader", e); 416 } 417 } 418 419 /** 420 * Create a writer for the WAL. Uses defaults. 421 * <p> 422 * Should be package-private. public only for tests and 423 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 424 * @return A WAL writer. Close when done with it. 425 */ 426 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 427 return FSHLogProvider.createWriter(conf, fs, path, false); 428 } 429 430 /** 431 * Should be package-private, visible for recovery testing. Uses defaults. 432 * @return an overwritable writer for recovered edits. caller should close. 433 */ 434 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 435 throws IOException { 436 return FSHLogProvider.createWriter(conf, fs, path, true); 437 } 438 439 // These static methods are currently used where it's impractical to 440 // untangle the reliance on state in the filesystem. They rely on singleton 441 // WALFactory that just provides Reader / Writers. 442 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 443 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 444 private static final String SINGLETON_ID = WALFactory.class.getName(); 445 446 // Public only for FSHLog 447 public static WALFactory getInstance(Configuration configuration) { 448 WALFactory factory = singleton.get(); 449 if (null == factory) { 450 WALFactory temp = new WALFactory(configuration); 451 if (singleton.compareAndSet(null, temp)) { 452 factory = temp; 453 } else { 454 // someone else beat us to initializing 455 try { 456 temp.close(); 457 } catch (IOException exception) { 458 LOG.debug("failed to close temporary singleton. ignoring.", exception); 459 } 460 factory = singleton.get(); 461 } 462 } 463 return factory; 464 } 465 466 /** 467 * Create a tailing reader for the given path. Mainly used in replication. 468 */ 469 public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf, 470 long startPosition) throws IOException { 471 ProtobufWALTailingReader reader = new ProtobufWALTailingReader(); 472 reader.init(fs, path, conf, startPosition); 473 return reader; 474 } 475 476 /** 477 * Create a one-way stream reader for a given path. 478 */ 479 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf) 480 throws IOException { 481 return createStreamReader(fs, path, conf, -1); 482 } 483 484 /** 485 * Create a one-way stream reader for a given path. 486 */ 487 public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf, 488 long startPosition) throws IOException { 489 return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null, 490 startPosition); 491 } 492 493 /** 494 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 495 * @return a Writer that will overwrite files. Caller must close. 496 */ 497 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 498 final Configuration configuration) throws IOException { 499 return FSHLogProvider.createWriter(configuration, fs, path, true); 500 } 501 502 /** 503 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 504 * @return a writer that won't overwrite files. Caller must close. 505 */ 506 public static Writer createWALWriter(final FileSystem fs, final Path path, 507 final Configuration configuration) throws IOException { 508 return FSHLogProvider.createWriter(configuration, fs, path, false); 509 } 510 511 public final WALProvider getWALProvider() { 512 return this.provider; 513 } 514 515 /** 516 * @return Current metaProvider... may be null if not yet initialized. 517 * @see #getMetaProvider() 518 */ 519 public final WALProvider getMetaWALProvider() { 520 return this.metaProvider.get(); 521 } 522 523 public ExcludeDatanodeManager getExcludeDatanodeManager() { 524 return excludeDatanodeManager; 525 } 526 527 @RestrictedApi(explanation = "Should only be called in tests", link = "", 528 allowedOnPath = ".*/src/test/.*") 529 public String getFactoryId() { 530 return this.factoryId; 531 } 532}