001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicReference; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; 030import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 031import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 032import org.apache.hadoop.hbase.util.CancelableProgressable; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 035import org.apache.hadoop.hbase.wal.WAL.Reader; 036import org.apache.hadoop.hbase.wal.WALProvider.Writer; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the 043 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the 044 * configuration setting "hbase.wal.provider". Available implementations: 045 * <ul> 046 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 047 * "asyncfs"</li> 048 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 049 * FileSystem interface via an asynchronous client.</li> 050 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 051 * FileSystem interface via HDFS's synchronous DFSClient.</li> 052 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 053 * server.</li> 054 * </ul> 055 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 056 */ 057@InterfaceAudience.Private 058public class WALFactory { 059 060 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 061 062 /** 063 * Maps between configuration names for providers and implementation classes. 064 */ 065 enum Providers { 066 defaultProvider(AsyncFSWALProvider.class), 067 filesystem(FSHLogProvider.class), 068 multiwal(RegionGroupingProvider.class), 069 asyncfs(AsyncFSWALProvider.class); 070 071 final Class<? extends WALProvider> clazz; 072 073 Providers(Class<? extends WALProvider> clazz) { 074 this.clazz = clazz; 075 } 076 } 077 078 public static final String WAL_PROVIDER = "hbase.wal.provider"; 079 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 080 081 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 082 083 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 084 085 final String factoryId; 086 final Abortable abortable; 087 private final WALProvider provider; 088 // The meta updates are written to a different wal. If this 089 // regionserver holds meta regions, then this ref will be non-null. 090 // lazily intialized; most RegionServers don't deal with META 091 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 092 093 /** 094 * Configuration-specified WAL Reader used when a custom reader is requested 095 */ 096 private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass; 097 098 /** 099 * How long to attempt opening in-recovery wals 100 */ 101 private final int timeoutMillis; 102 103 private final Configuration conf; 104 105 private final ExcludeDatanodeManager excludeDatanodeManager; 106 107 // Used for the singleton WALFactory, see below. 108 private WALFactory(Configuration conf) { 109 // this code is duplicated here so we can keep our members final. 110 // until we've moved reader/writer construction down into providers, this initialization must 111 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 112 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 113 /* TODO Both of these are probably specific to the fs wal provider */ 114 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 115 AbstractFSWALProvider.Reader.class); 116 this.conf = conf; 117 // end required early initialization 118 119 // this instance can't create wals, just reader/writers. 120 provider = null; 121 factoryId = SINGLETON_ID; 122 this.abortable = null; 123 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 124 } 125 126 Providers getDefaultProvider() { 127 return Providers.defaultProvider; 128 } 129 130 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 131 try { 132 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 133 134 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 135 // the default and we can't use it, we want to fall back to FSHLog which we know works on 136 // all versions. 137 if ( 138 provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 139 && !AsyncFSWALProvider.load() 140 ) { 141 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 142 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 143 // easily broken when upgrading hadoop. 144 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 145 return FSHLogProvider.class; 146 } 147 148 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 149 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 150 // not fall back to FSHLogProvider. 151 return provider.clazz; 152 } catch (IllegalArgumentException exception) { 153 // Fall back to them specifying a class name 154 // Note that the passed default class shouldn't actually be used, since the above only fails 155 // when there is a config value present. 156 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 157 } 158 } 159 160 WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId) 161 throws IOException { 162 LOG.info("Instantiating WALProvider of type " + clazz); 163 try { 164 final WALProvider result = clazz.getDeclaredConstructor().newInstance(); 165 result.init(this, conf, providerId, this.abortable); 166 return result; 167 } catch (Exception e) { 168 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 169 LOG.debug("Exception details for failure to load WALProvider.", e); 170 throw new IOException("couldn't set up WALProvider", e); 171 } 172 } 173 174 /** 175 * instantiate a provider from a config property. requires conf to have already been set (as well 176 * as anything the provider might need to read). 177 */ 178 WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { 179 Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue); 180 WALProvider provider = createProvider(clazz, providerId); 181 provider.addWALActionsListener(new MetricsWAL()); 182 return provider; 183 } 184 185 public WALFactory(Configuration conf, String factoryId) throws IOException { 186 this(conf, factoryId, null); 187 } 188 189 /** 190 * @param conf must not be null, will keep a reference to read params in later reader/writer 191 * instances. 192 * @param abortable the server to abort 193 */ 194 public WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException { 195 // until we've moved reader/writer construction down into providers, this initialization must 196 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 197 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 198 /* TODO Both of these are probably specific to the fs wal provider */ 199 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 200 AbstractFSWALProvider.Reader.class); 201 this.conf = conf; 202 this.factoryId = factoryId; 203 this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); 204 this.abortable = abortable; 205 // end required early initialization 206 if (conf.getBoolean(WAL_ENABLED, true)) { 207 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); 208 } else { 209 // special handling of existing configuration behavior. 210 LOG.warn("Running with WAL disabled."); 211 provider = new DisabledWALProvider(); 212 provider.init(this, conf, factoryId, null); 213 } 214 } 215 216 /** 217 * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to 218 * replay and edits that have gone to any wals from this factory. 219 */ 220 public void close() throws IOException { 221 final WALProvider metaProvider = this.metaProvider.get(); 222 if (null != metaProvider) { 223 metaProvider.close(); 224 } 225 // close is called on a WALFactory with null provider in the case of contention handling 226 // within the getInstance method. 227 if (null != provider) { 228 provider.close(); 229 } 230 } 231 232 /** 233 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you 234 * are not ending cleanly and will need to replay edits from this factory's wals, use this method 235 * if you can as it will try to leave things as tidy as possible. 236 */ 237 public void shutdown() throws IOException { 238 IOException exception = null; 239 final WALProvider metaProvider = this.metaProvider.get(); 240 if (null != metaProvider) { 241 try { 242 metaProvider.shutdown(); 243 } catch (IOException ioe) { 244 exception = ioe; 245 } 246 } 247 provider.shutdown(); 248 if (null != exception) { 249 throw exception; 250 } 251 } 252 253 public List<WAL> getWALs() { 254 return provider.getWALs(); 255 } 256 257 /** 258 * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of 259 * creating the first hbase:meta WAL so we can register a listener. 260 * @see #getMetaWALProvider() 261 */ 262 public WALProvider getMetaProvider() throws IOException { 263 for (;;) { 264 WALProvider provider = this.metaProvider.get(); 265 if (provider != null) { 266 return provider; 267 } 268 Class<? extends WALProvider> clz = null; 269 if (conf.get(META_WAL_PROVIDER) == null) { 270 try { 271 clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); 272 } catch (Throwable t) { 273 // the WAL provider should be an enum. Proceed 274 } 275 } 276 if (clz == null) { 277 clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 278 } 279 provider = createProvider(clz, AbstractFSWALProvider.META_WAL_PROVIDER_ID); 280 if (metaProvider.compareAndSet(null, provider)) { 281 return provider; 282 } else { 283 // someone is ahead of us, close and try again. 284 provider.close(); 285 } 286 } 287 } 288 289 /** 290 * @param region the region which we want to get a WAL for. Could be null. 291 */ 292 public WAL getWAL(RegionInfo region) throws IOException { 293 // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. 294 if ( 295 region != null && region.isMetaRegion() 296 && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID 297 ) { 298 return getMetaProvider().getWAL(region); 299 } else { 300 return provider.getWAL(region); 301 } 302 } 303 304 public Reader createReader(final FileSystem fs, final Path path) throws IOException { 305 return createReader(fs, path, (CancelableProgressable) null); 306 } 307 308 /** 309 * Create a reader for the WAL. If you are reading from a file that's being written to and need to 310 * reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method then just seek 311 * back to the last known good position. 312 * @return A WAL reader. Close when done with it. 313 */ 314 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter) 315 throws IOException { 316 return createReader(fs, path, reporter, true); 317 } 318 319 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, 320 boolean allowCustom) throws IOException { 321 Class<? extends AbstractFSWALProvider.Reader> lrClass = 322 allowCustom ? logReaderClass : ProtobufLogReader.class; 323 try { 324 // A wal file could be under recovery, so it may take several 325 // tries to get it open. Instead of claiming it is corrupted, retry 326 // to open it up to 5 minutes by default. 327 long startWaiting = EnvironmentEdgeManager.currentTime(); 328 long openTimeout = timeoutMillis + startWaiting; 329 int nbAttempt = 0; 330 AbstractFSWALProvider.Reader reader = null; 331 while (true) { 332 try { 333 reader = lrClass.getDeclaredConstructor().newInstance(); 334 reader.init(fs, path, conf, null); 335 return reader; 336 } catch (Exception e) { 337 // catch Exception so that we close reader for all exceptions. If we don't 338 // close the reader, we leak a socket. 339 if (reader != null) { 340 try { 341 reader.close(); 342 } catch (IOException exception) { 343 LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); 344 LOG.debug("exception details", exception); 345 } 346 } 347 348 // Only inspect the Exception to consider retry when it's an IOException 349 if (e instanceof IOException) { 350 String msg = e.getMessage(); 351 if ( 352 msg != null && (msg.contains("Cannot obtain block length") 353 || msg.contains("Could not obtain the last block") 354 || msg.matches("Blocklist for [^ ]* has changed.*")) 355 ) { 356 if (++nbAttempt == 1) { 357 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 358 } 359 if (reporter != null && !reporter.progress()) { 360 throw new InterruptedIOException("Operation is cancelled"); 361 } 362 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 363 LOG.error("Can't open after " + nbAttempt + " attempts and " 364 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 365 } else { 366 try { 367 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 368 continue; // retry 369 } catch (InterruptedException ie) { 370 InterruptedIOException iioe = new InterruptedIOException(); 371 iioe.initCause(ie); 372 throw iioe; 373 } 374 } 375 throw new LeaseNotRecoveredException(e); 376 } 377 } 378 379 // Rethrow the original exception if we are not retrying due to HDFS-isms. 380 throw e; 381 } 382 } 383 } catch (IOException ie) { 384 throw ie; 385 } catch (Exception e) { 386 throw new IOException("Cannot get log reader", e); 387 } 388 } 389 390 /** 391 * Create a writer for the WAL. Uses defaults. 392 * <p> 393 * Should be package-private. public only for tests and 394 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 395 * @return A WAL writer. Close when done with it. 396 */ 397 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 398 return FSHLogProvider.createWriter(conf, fs, path, false); 399 } 400 401 /** 402 * Should be package-private, visible for recovery testing. Uses defaults. 403 * @return an overwritable writer for recovered edits. caller should close. 404 */ 405 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 406 throws IOException { 407 return FSHLogProvider.createWriter(conf, fs, path, true); 408 } 409 410 // These static methods are currently used where it's impractical to 411 // untangle the reliance on state in the filesystem. They rely on singleton 412 // WALFactory that just provides Reader / Writers. 413 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 414 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 415 private static final String SINGLETON_ID = WALFactory.class.getName(); 416 417 // Public only for FSHLog 418 public static WALFactory getInstance(Configuration configuration) { 419 WALFactory factory = singleton.get(); 420 if (null == factory) { 421 WALFactory temp = new WALFactory(configuration); 422 if (singleton.compareAndSet(null, temp)) { 423 factory = temp; 424 } else { 425 // someone else beat us to initializing 426 try { 427 temp.close(); 428 } catch (IOException exception) { 429 LOG.debug("failed to close temporary singleton. ignoring.", exception); 430 } 431 factory = singleton.get(); 432 } 433 } 434 return factory; 435 } 436 437 /** 438 * Create a reader for the given path, accept custom reader classes from conf. If you already have 439 * a WALFactory, you should favor the instance method. 440 * @return a WAL Reader, caller must close. 441 */ 442 public static Reader createReader(final FileSystem fs, final Path path, 443 final Configuration configuration) throws IOException { 444 return getInstance(configuration).createReader(fs, path); 445 } 446 447 /** 448 * Create a reader for the given path, accept custom reader classes from conf. If you already have 449 * a WALFactory, you should favor the instance method. 450 * @return a WAL Reader, caller must close. 451 */ 452 static Reader createReader(final FileSystem fs, final Path path, 453 final Configuration configuration, final CancelableProgressable reporter) throws IOException { 454 return getInstance(configuration).createReader(fs, path, reporter); 455 } 456 457 /** 458 * Create a reader for the given path, ignore custom reader classes from conf. If you already have 459 * a WALFactory, you should favor the instance method. only public pending move of 460 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 461 * @return a WAL Reader, caller must close. 462 */ 463 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, 464 final Configuration configuration) throws IOException { 465 return getInstance(configuration).createReader(fs, path, null, false); 466 } 467 468 /** 469 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 470 * @return a Writer that will overwrite files. Caller must close. 471 */ 472 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 473 final Configuration configuration) throws IOException { 474 return FSHLogProvider.createWriter(configuration, fs, path, true); 475 } 476 477 /** 478 * If you already have a WALFactory, you should favor the instance method. Uses defaults. 479 * @return a writer that won't overwrite files. Caller must close. 480 */ 481 public static Writer createWALWriter(final FileSystem fs, final Path path, 482 final Configuration configuration) throws IOException { 483 return FSHLogProvider.createWriter(configuration, fs, path, false); 484 } 485 486 public String getFactoryId() { 487 return factoryId; 488 } 489 490 public final WALProvider getWALProvider() { 491 return this.provider; 492 } 493 494 /** 495 * @return Current metaProvider... may be null if not yet initialized. 496 * @see #getMetaProvider() 497 */ 498 public final WALProvider getMetaWALProvider() { 499 return this.metaProvider.get(); 500 } 501 502 public ExcludeDatanodeManager getExcludeDatanodeManager() { 503 return excludeDatanodeManager; 504 } 505}