001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicReference; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 030import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 031import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 032import org.apache.hadoop.hbase.util.CancelableProgressable; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 035import org.apache.hadoop.hbase.wal.WAL.Reader; 036import org.apache.hadoop.hbase.wal.WALProvider.Writer; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the 043 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the 044 * configuration setting "hbase.wal.provider". Available implementations: 045 * <ul> 046 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 047 * "asyncfs"</li> 048 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 049 * FileSystem interface via an asynchronous client.</li> 050 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 051 * FileSystem interface via HDFS's synchronous DFSClient.</li> 052 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 053 * server.</li> 054 * </ul> 055 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 056 */ 057@InterfaceAudience.Private 058public class WALFactory { 059 060 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 061 062 /** 063 * Maps between configuration names for providers and implementation classes. 064 */ 065 enum Providers { 066 defaultProvider(AsyncFSWALProvider.class), 067 filesystem(FSHLogProvider.class), 068 multiwal(RegionGroupingProvider.class), 069 asyncfs(AsyncFSWALProvider.class); 070 071 final Class<? extends WALProvider> clazz; 072 073 Providers(Class<? extends WALProvider> clazz) { 074 this.clazz = clazz; 075 } 076 } 077 078 public static final String WAL_PROVIDER = "hbase.wal.provider"; 079 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 080 081 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 082 083 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 084 085 final String factoryId; 086 final Abortable abortable; 087 private final WALProvider provider; 088 // The meta updates are written to a different wal. If this 089 // regionserver holds meta regions, then this ref will be non-null. 090 // lazily intialized; most RegionServers don't deal with META 091 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 092 093 /** 094 * Configuration-specified WAL Reader used when a custom reader is requested 095 */ 096 private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass; 097 098 /** 099 * How long to attempt opening in-recovery wals 100 */ 101 private final int timeoutMillis; 102 103 private final Configuration conf; 104 105 private final ExcludeDatanodeManager excludeDatanodeManager; 106 107 // Used for the singleton WALFactory, see below. 108 private WALFactory(Configuration conf) { 109 // this code is duplicated here so we can keep our members final. 110 // until we've moved reader/writer construction down into providers, this initialization must 111 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 112 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 113 /* TODO Both of these are probably specific to the fs wal provider */ 114 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 115 AbstractFSWALProvider.Reader.class); 116 this.conf = conf; 117 // end required early initialization 118 119 // this instance can't create wals, just reader/writers. 120 provider = null; 121 factoryId = SINGLETON_ID; 122 this.abortable = null; 123 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 124 } 125 126 Providers getDefaultProvider() { 127 return Providers.defaultProvider; 128 } 129 130 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 131 try { 132 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 133 134 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 135 // the default and we can't use it, we want to fall back to FSHLog which we know works on 136 // all versions. 137 if ( 138 provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 139 && !AsyncFSWALProvider.load() 140 ) { 141 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 142 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 143 // easily broken when upgrading hadoop. 144 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 145 return FSHLogProvider.class; 146 } 147 148 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 149 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 150 // not fall back to FSHLogProvider. 151 return provider.clazz; 152 } catch (IllegalArgumentException exception) { 153 // Fall back to them specifying a class name 154 // Note that the passed default class shouldn't actually be used, since the above only fails 155 // when there is a config value present. 156 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 157 } 158 } 159 160 static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException { 161 LOG.info("Instantiating WALProvider of type {}", clazz); 162 try { 163 return clazz.getDeclaredConstructor().newInstance(); 164 } catch (Exception e) { 165 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 166 LOG.debug("Exception details for failure to load WALProvider.", e); 167 throw new IOException("couldn't set up WALProvider", e); 168 } 169 } 170 171 /** 172 * @param conf must not be null, will keep a reference to read params in later reader/writer 173 * instances. 174 * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations 175 * to make a directory 176 */ 177 public WALFactory(Configuration conf, String factoryId) throws IOException { 178 // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider 179 // for HMaster or HRegionServer which take system table only. See HBASE-19999 180 this(conf, factoryId, null, true); 181 } 182 183 /** 184 * @param conf must not be null, will keep a reference to read params 185 * in later reader/writer instances. 186 * @param factoryId a unique identifier for this factory. used i.e. by 187 * filesystem implementations to make a directory 188 * @param abortable the server associated with this WAL file 189 * @param enableSyncReplicationWALProvider whether wrap the wal provider to a 190 * {@link SyncReplicationWALProvider} 191 */ 192 public WALFactory(Configuration conf, String factoryId, Abortable abortable, 193 boolean enableSyncReplicationWALProvider) throws IOException { 194 // until we've moved reader/writer construction down into providers, this initialization must 195 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 196 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 197 /* TODO Both of these are probably specific to the fs wal provider */ 198 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 199 AbstractFSWALProvider.Reader.class); 200 this.conf = conf; 201 this.factoryId = factoryId; 202 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 203 this.abortable = abortable; 204 // end required early initialization 205 if (conf.getBoolean(WAL_ENABLED, true)) { 206 WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 207 if (enableSyncReplicationWALProvider) { 208 provider = new SyncReplicationWALProvider(provider); 209 } 210 provider.init(this, conf, null, this.abortable); 211 provider.addWALActionsListener(new MetricsWAL()); 212 this.provider = provider; 213 } else { 214 // special handling of existing configuration behavior. 215 LOG.warn("Running with WAL disabled."); 216 provider = new DisabledWALProvider(); 217 provider.init(this, conf, factoryId, null); 218 } 219 } 220 221 /** 222 * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to 223 * replay and edits that have gone to any wals from this factory. 224 */ 225 public void close() throws IOException { 226 final WALProvider metaProvider = this.metaProvider.get(); 227 if (null != metaProvider) { 228 metaProvider.close(); 229 } 230 // close is called on a WALFactory with null provider in the case of contention handling 231 // within the getInstance method. 232 if (null != provider) { 233 provider.close(); 234 } 235 } 236 237 /** 238 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you 239 * are not ending cleanly and will need to replay edits from this factory's wals, use this method 240 * if you can as it will try to leave things as tidy as possible. 241 */ 242 public void shutdown() throws IOException { 243 IOException exception = null; 244 final WALProvider metaProvider = this.metaProvider.get(); 245 if (null != metaProvider) { 246 try { 247 metaProvider.shutdown(); 248 } catch (IOException ioe) { 249 exception = ioe; 250 } 251 } 252 provider.shutdown(); 253 if (null != exception) { 254 throw exception; 255 } 256 } 257 258 public List<WAL> getWALs() { 259 return provider.getWALs(); 260 } 261 262 /** 263 * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of 264 * creating the first hbase:meta WAL so we can register a listener. 265 * @see #getMetaWALProvider() 266 */ 267 public WALProvider getMetaProvider() throws IOException { 268 for (;;) { 269 WALProvider provider = this.metaProvider.get(); 270 if (provider != null) { 271 return provider; 272 } 273 Class<? extends WALProvider> clz = null; 274 if (conf.get(META_WAL_PROVIDER) == null) { 275 try { 276 clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); 277 } catch (Throwable t) { 278 // the WAL provider should be an enum. Proceed 279 } 280 } 281 if (clz == null) { 282 clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 283 } 284 provider = createProvider(clz); 285 provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable); 286 provider.addWALActionsListener(new MetricsWAL()); 287 if (metaProvider.compareAndSet(null, provider)) { 288 return provider; 289 } else { 290 // someone is ahead of us, close and try again. 291 provider.close(); 292 } 293 } 294 } 295 296 /** 297 * @param region the region which we want to get a WAL for. Could be null. 298 */ 299 public WAL getWAL(RegionInfo region) throws IOException { 300 // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. 301 if ( 302 region != null && region.isMetaRegion() 303 && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID 304 ) { 305 return getMetaProvider().getWAL(region); 306 } else { 307 return provider.getWAL(region); 308 } 309 } 310 311 public Reader createReader(final FileSystem fs, final Path path) throws IOException { 312 return createReader(fs, path, (CancelableProgressable) null); 313 } 314 315 /** 316 * Create a reader for the WAL. If you are reading from a file that's being written to and need to 317 * reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method then just seek 318 * back to the last known good position. 319 * @return A WAL reader. Close when done with it. 320 */ 321 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter) 322 throws IOException { 323 return createReader(fs, path, reporter, true); 324 } 325 326 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, 327 boolean allowCustom) throws IOException { 328 Class<? extends AbstractFSWALProvider.Reader> lrClass = 329 allowCustom ? logReaderClass : ProtobufLogReader.class; 330 try { 331 // A wal file could be under recovery, so it may take several 332 // tries to get it open. Instead of claiming it is corrupted, retry 333 // to open it up to 5 minutes by default. 334 long startWaiting = EnvironmentEdgeManager.currentTime(); 335 long openTimeout = timeoutMillis + startWaiting; 336 int nbAttempt = 0; 337 AbstractFSWALProvider.Reader reader = null; 338 while (true) { 339 try { 340 reader = lrClass.getDeclaredConstructor().newInstance(); 341 reader.init(fs, path, conf, null); 342 return reader; 343 } catch (Exception e) { 344 // catch Exception so that we close reader for all exceptions. If we don't 345 // close the reader, we leak a socket. 346 if (reader != null) { 347 try { 348 reader.close(); 349 } catch (IOException exception) { 350 LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); 351 LOG.debug("exception details", exception); 352 } 353 } 354 355 // Only inspect the Exception to consider retry when it's an IOException 356 if (e instanceof IOException) { 357 String msg = e.getMessage(); 358 if ( 359 msg != null && (msg.contains("Cannot obtain block length") 360 || msg.contains("Could not obtain the last block") 361 || msg.matches("Blocklist for [^ ]* has changed.*")) 362 ) { 363 if (++nbAttempt == 1) { 364 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 365 } 366 if (reporter != null && !reporter.progress()) { 367 throw new InterruptedIOException("Operation is cancelled"); 368 } 369 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 370 LOG.error("Can't open after " + nbAttempt + " attempts and " 371 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 372 } else { 373 try { 374 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 375 continue; // retry 376 } catch (InterruptedException ie) { 377 InterruptedIOException iioe = new InterruptedIOException(); 378 iioe.initCause(ie); 379 throw iioe; 380 } 381 } 382 throw new LeaseNotRecoveredException(e); 383 } 384 } 385 386 // Rethrow the original exception if we are not retrying due to HDFS-isms. 387 throw e; 388 } 389 } 390 } catch (IOException ie) { 391 throw ie; 392 } catch (Exception e) { 393 throw new IOException("Cannot get log reader", e); 394 } 395 } 396 397 /** 398 * Create a writer for the WAL. Uses defaults. 399 * <p> 400 * Should be package-private. public only for tests and 401 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 402 * @return A WAL writer. Close when done with it. 403 */ 404 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 405 return FSHLogProvider.createWriter(conf, fs, path, false); 406 } 407 408 /** 409 * Should be package-private, visible for recovery testing. Uses defaults. 410 * @return an overwritable writer for recovered edits. caller should close. 411 */ 412 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 413 throws IOException { 414 return FSHLogProvider.createWriter(conf, fs, path, true); 415 } 416 417 // These static methods are currently used where it's impractical to 418 // untangle the reliance on state in the filesystem. They rely on singleton 419 // WALFactory that just provides Reader / Writers. 420 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 421 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 422 private static final String SINGLETON_ID = WALFactory.class.getName(); 423 424 // Public only for FSHLog 425 public static WALFactory getInstance(Configuration configuration) { 426 WALFactory factory = singleton.get(); 427 if (null == factory) { 428 WALFactory temp = new WALFactory(configuration); 429 if (singleton.compareAndSet(null, temp)) { 430 factory = temp; 431 } else { 432 // someone else beat us to initializing 433 try { 434 temp.close(); 435 } catch (IOException exception) { 436 LOG.debug("failed to close temporary singleton. ignoring.", exception); 437 } 438 factory = singleton.get(); 439 } 440 } 441 return factory; 442 } 443 444 /** 445 * Create a reader for the given path, accept custom reader classes from conf. If you already have 446 * a WALFactory, you should favor the instance method. 447 * @return a WAL Reader, caller must close. 448 */ 449 public static Reader createReader(final FileSystem fs, final Path path, 450 final Configuration configuration) throws IOException { 451 return getInstance(configuration).createReader(fs, path); 452 } 453 454 /** 455 * Create a reader for the given path, accept custom reader classes from conf. If you already have 456 * a WALFactory, you should favor the instance method. 457 * @return a WAL Reader, caller must close. 458 */ 459 static Reader createReader(final FileSystem fs, final Path path, 460 final Configuration configuration, final CancelableProgressable reporter) throws IOException { 461 return getInstance(configuration).createReader(fs, path, reporter); 462 } 463 464 /** 465 * Create a reader for the given path, ignore custom reader classes from conf. If you already have 466 * a WALFactory, you should favor the instance method. only public pending move of 467 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 468 * @return a WAL Reader, caller must close. 469 */ 470 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, 471 final Configuration configuration) throws IOException { 472 return getInstance(configuration).createReader(fs, path, null, false); 473 } 474 475 /** 476 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 477 * @return a Writer that will overwrite files. Caller must close. 478 */ 479 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 480 final Configuration configuration) throws IOException { 481 return FSHLogProvider.createWriter(configuration, fs, path, true); 482 } 483 484 /** 485 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 486 * @return a writer that won't overwrite files. Caller must close. 487 */ 488 public static Writer createWALWriter(final FileSystem fs, final Path path, 489 final Configuration configuration) throws IOException { 490 return FSHLogProvider.createWriter(configuration, fs, path, false); 491 } 492 493 public final WALProvider getWALProvider() { 494 return this.provider; 495 } 496 497 /** 498 * @return Current metaProvider... may be null if not yet initialized. 499 * @see #getMetaProvider() 500 */ 501 public final WALProvider getMetaWALProvider() { 502 return this.metaProvider.get(); 503 } 504 505 public ExcludeDatanodeManager getExcludeDatanodeManager() { 506 return excludeDatanodeManager; 507 } 508}