001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicReference; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 029import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 030import org.apache.hadoop.hbase.util.CancelableProgressable; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 033import org.apache.hadoop.hbase.wal.WAL.Reader; 034import org.apache.hadoop.hbase.wal.WALProvider.Writer; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040 041/** 042 * Entry point for users of the Write Ahead Log. 043 * Acts as the shim between internal use and the particular WALProvider we use to handle wal 044 * requests. 045 * 046 * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available 047 * implementations: 048 * <ul> 049 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 050 * "asyncfs"</li> 051 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 052 * FileSystem interface via an asynchronous client.</li> 053 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 054 * FileSystem interface via HDFS's synchronous DFSClient.</li> 055 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 056 * server.</li> 057 * </ul> 058 * 059 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 060 */ 061@InterfaceAudience.Private 062public class WALFactory { 063 064 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 065 066 /** 067 * Maps between configuration names for providers and implementation classes. 068 */ 069 static enum Providers { 070 defaultProvider(AsyncFSWALProvider.class), 071 filesystem(FSHLogProvider.class), 072 multiwal(RegionGroupingProvider.class), 073 asyncfs(AsyncFSWALProvider.class); 074 075 final Class<? extends WALProvider> clazz; 076 Providers(Class<? extends WALProvider> clazz) { 077 this.clazz = clazz; 078 } 079 } 080 081 public static final String WAL_PROVIDER = "hbase.wal.provider"; 082 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 083 084 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 085 086 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 087 088 final String factoryId; 089 private final WALProvider provider; 090 // The meta updates are written to a different wal. If this 091 // regionserver holds meta regions, then this ref will be non-null. 092 // lazily intialized; most RegionServers don't deal with META 093 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 094 095 /** 096 * Configuration-specified WAL Reader used when a custom reader is requested 097 */ 098 private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass; 099 100 /** 101 * How long to attempt opening in-recovery wals 102 */ 103 private final int timeoutMillis; 104 105 private final Configuration conf; 106 107 // Used for the singleton WALFactory, see below. 108 private WALFactory(Configuration conf) { 109 // this code is duplicated here so we can keep our members final. 110 // until we've moved reader/writer construction down into providers, this initialization must 111 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 112 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 113 /* TODO Both of these are probably specific to the fs wal provider */ 114 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 115 AbstractFSWALProvider.Reader.class); 116 this.conf = conf; 117 // end required early initialization 118 119 // this instance can't create wals, just reader/writers. 120 provider = null; 121 factoryId = SINGLETON_ID; 122 } 123 124 @VisibleForTesting 125 Providers getDefaultProvider() { 126 return Providers.defaultProvider; 127 } 128 129 @VisibleForTesting 130 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 131 try { 132 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 133 134 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 135 // the default and we can't use it, we want to fall back to FSHLog which we know works on 136 // all versions. 137 if (provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 138 && !AsyncFSWALProvider.load()) { 139 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 140 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 141 // easily broken when upgrading hadoop. 142 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 143 return FSHLogProvider.class; 144 } 145 146 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 147 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 148 // not fall back to FSHLogProvider. 149 return provider.clazz; 150 } catch (IllegalArgumentException exception) { 151 // Fall back to them specifying a class name 152 // Note that the passed default class shouldn't actually be used, since the above only fails 153 // when there is a config value present. 154 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 155 } 156 } 157 158 WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId) 159 throws IOException { 160 LOG.info("Instantiating WALProvider of type " + clazz); 161 try { 162 final WALProvider result = clazz.getDeclaredConstructor().newInstance(); 163 result.init(this, conf, providerId); 164 return result; 165 } catch (Exception e) { 166 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 167 LOG.debug("Exception details for failure to load WALProvider.", e); 168 throw new IOException("couldn't set up WALProvider", e); 169 } 170 } 171 172 /** 173 * instantiate a provider from a config property. requires conf to have already been set (as well 174 * as anything the provider might need to read). 175 */ 176 WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { 177 Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue); 178 WALProvider provider = createProvider(clazz, providerId); 179 provider.addWALActionsListener(new MetricsWAL()); 180 return provider; 181 } 182 183 /** 184 * @param conf must not be null, will keep a reference to read params in later reader/writer 185 * instances. 186 * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations 187 * to make a directory 188 */ 189 public WALFactory(Configuration conf, String factoryId) throws IOException { 190 // until we've moved reader/writer construction down into providers, this initialization must 191 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 192 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 193 /* TODO Both of these are probably specific to the fs wal provider */ 194 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 195 AbstractFSWALProvider.Reader.class); 196 this.conf = conf; 197 this.factoryId = factoryId; 198 // end required early initialization 199 if (conf.getBoolean(WAL_ENABLED, true)) { 200 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); 201 } else { 202 // special handling of existing configuration behavior. 203 LOG.warn("Running with WAL disabled."); 204 provider = new DisabledWALProvider(); 205 provider.init(this, conf, factoryId); 206 } 207 } 208 209 /** 210 * Shutdown all WALs and clean up any underlying storage. 211 * Use only when you will not need to replay and edits that have gone to any wals from this 212 * factory. 213 */ 214 public void close() throws IOException { 215 final WALProvider metaProvider = this.metaProvider.get(); 216 if (null != metaProvider) { 217 metaProvider.close(); 218 } 219 // close is called on a WALFactory with null provider in the case of contention handling 220 // within the getInstance method. 221 if (null != provider) { 222 provider.close(); 223 } 224 } 225 226 /** 227 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. 228 * If you are not ending cleanly and will need to replay edits from this factory's wals, 229 * use this method if you can as it will try to leave things as tidy as possible. 230 */ 231 public void shutdown() throws IOException { 232 IOException exception = null; 233 final WALProvider metaProvider = this.metaProvider.get(); 234 if (null != metaProvider) { 235 try { 236 metaProvider.shutdown(); 237 } catch(IOException ioe) { 238 exception = ioe; 239 } 240 } 241 provider.shutdown(); 242 if (null != exception) { 243 throw exception; 244 } 245 } 246 247 public List<WAL> getWALs() { 248 return provider.getWALs(); 249 } 250 251 @VisibleForTesting 252 WALProvider getMetaProvider() throws IOException { 253 for (;;) { 254 WALProvider provider = this.metaProvider.get(); 255 if (provider != null) { 256 return provider; 257 } 258 Class<? extends WALProvider> clz = null; 259 if (conf.get(META_WAL_PROVIDER) == null) { 260 try { 261 clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); 262 } catch (Throwable t) { 263 // the WAL provider should be an enum. Proceed 264 } 265 } 266 if (clz == null){ 267 clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 268 } 269 provider = createProvider(clz, AbstractFSWALProvider.META_WAL_PROVIDER_ID); 270 if (metaProvider.compareAndSet(null, provider)) { 271 return provider; 272 } else { 273 // someone is ahead of us, close and try again. 274 provider.close(); 275 } 276 } 277 } 278 279 /** 280 * @param region the region which we want to get a WAL for it. Could be null. 281 */ 282 public WAL getWAL(RegionInfo region) throws IOException { 283 // use different WAL for hbase:meta 284 if (region != null && region.isMetaRegion() && 285 region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { 286 return getMetaProvider().getWAL(region); 287 } else { 288 return provider.getWAL(region); 289 } 290 } 291 292 public Reader createReader(final FileSystem fs, final Path path) throws IOException { 293 return createReader(fs, path, (CancelableProgressable)null); 294 } 295 296 /** 297 * Create a reader for the WAL. If you are reading from a file that's being written to and need 298 * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method 299 * then just seek back to the last known good position. 300 * @return A WAL reader. Close when done with it. 301 * @throws IOException 302 */ 303 public Reader createReader(final FileSystem fs, final Path path, 304 CancelableProgressable reporter) throws IOException { 305 return createReader(fs, path, reporter, true); 306 } 307 308 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, 309 boolean allowCustom) throws IOException { 310 Class<? extends AbstractFSWALProvider.Reader> lrClass = 311 allowCustom ? logReaderClass : ProtobufLogReader.class; 312 try { 313 // A wal file could be under recovery, so it may take several 314 // tries to get it open. Instead of claiming it is corrupted, retry 315 // to open it up to 5 minutes by default. 316 long startWaiting = EnvironmentEdgeManager.currentTime(); 317 long openTimeout = timeoutMillis + startWaiting; 318 int nbAttempt = 0; 319 AbstractFSWALProvider.Reader reader = null; 320 while (true) { 321 try { 322 reader = lrClass.getDeclaredConstructor().newInstance(); 323 reader.init(fs, path, conf, null); 324 return reader; 325 } catch (IOException e) { 326 if (reader != null) { 327 try { 328 reader.close(); 329 } catch (IOException exception) { 330 LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); 331 LOG.debug("exception details", exception); 332 } 333 } 334 335 String msg = e.getMessage(); 336 if (msg != null 337 && (msg.contains("Cannot obtain block length") 338 || msg.contains("Could not obtain the last block") || msg 339 .matches("Blocklist for [^ ]* has changed.*"))) { 340 if (++nbAttempt == 1) { 341 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 342 } 343 if (reporter != null && !reporter.progress()) { 344 throw new InterruptedIOException("Operation is cancelled"); 345 } 346 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 347 LOG.error("Can't open after " + nbAttempt + " attempts and " 348 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 349 } else { 350 try { 351 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 352 continue; // retry 353 } catch (InterruptedException ie) { 354 InterruptedIOException iioe = new InterruptedIOException(); 355 iioe.initCause(ie); 356 throw iioe; 357 } 358 } 359 throw new LeaseNotRecoveredException(e); 360 } else { 361 throw e; 362 } 363 } 364 } 365 } catch (IOException ie) { 366 throw ie; 367 } catch (Exception e) { 368 throw new IOException("Cannot get log reader", e); 369 } 370 } 371 372 /** 373 * Create a writer for the WAL. 374 * Uses defaults. 375 * <p> 376 * Should be package-private. public only for tests and 377 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 378 * @return A WAL writer. Close when done with it. 379 */ 380 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 381 return FSHLogProvider.createWriter(conf, fs, path, false); 382 } 383 384 /** 385 * Should be package-private, visible for recovery testing. 386 * Uses defaults. 387 * @return an overwritable writer for recovered edits. caller should close. 388 */ 389 @VisibleForTesting 390 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 391 throws IOException { 392 return FSHLogProvider.createWriter(conf, fs, path, true); 393 } 394 395 // These static methods are currently used where it's impractical to 396 // untangle the reliance on state in the filesystem. They rely on singleton 397 // WALFactory that just provides Reader / Writers. 398 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 399 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 400 private static final String SINGLETON_ID = WALFactory.class.getName(); 401 402 // Public only for FSHLog 403 public static WALFactory getInstance(Configuration configuration) { 404 WALFactory factory = singleton.get(); 405 if (null == factory) { 406 WALFactory temp = new WALFactory(configuration); 407 if (singleton.compareAndSet(null, temp)) { 408 factory = temp; 409 } else { 410 // someone else beat us to initializing 411 try { 412 temp.close(); 413 } catch (IOException exception) { 414 LOG.debug("failed to close temporary singleton. ignoring.", exception); 415 } 416 factory = singleton.get(); 417 } 418 } 419 return factory; 420 } 421 422 /** 423 * Create a reader for the given path, accept custom reader classes from conf. 424 * If you already have a WALFactory, you should favor the instance method. 425 * @return a WAL Reader, caller must close. 426 */ 427 public static Reader createReader(final FileSystem fs, final Path path, 428 final Configuration configuration) throws IOException { 429 return getInstance(configuration).createReader(fs, path); 430 } 431 432 /** 433 * Create a reader for the given path, accept custom reader classes from conf. 434 * If you already have a WALFactory, you should favor the instance method. 435 * @return a WAL Reader, caller must close. 436 */ 437 static Reader createReader(final FileSystem fs, final Path path, 438 final Configuration configuration, final CancelableProgressable reporter) throws IOException { 439 return getInstance(configuration).createReader(fs, path, reporter); 440 } 441 442 /** 443 * Create a reader for the given path, ignore custom reader classes from conf. 444 * If you already have a WALFactory, you should favor the instance method. 445 * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 446 * @return a WAL Reader, caller must close. 447 */ 448 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, 449 final Configuration configuration) throws IOException { 450 return getInstance(configuration).createReader(fs, path, null, false); 451 } 452 453 /** 454 * If you already have a WALFactory, you should favor the instance method. 455 * Uses defaults. 456 * @return a Writer that will overwrite files. Caller must close. 457 */ 458 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 459 final Configuration configuration) 460 throws IOException { 461 return FSHLogProvider.createWriter(configuration, fs, path, true); 462 } 463 464 /** 465 * If you already have a WALFactory, you should favor the instance method. 466 * Uses defaults. 467 * @return a writer that won't overwrite files. Caller must close. 468 */ 469 @VisibleForTesting 470 public static Writer createWALWriter(final FileSystem fs, final Path path, 471 final Configuration configuration) 472 throws IOException { 473 return FSHLogProvider.createWriter(configuration, fs, path, false); 474 } 475 476 @VisibleForTesting 477 public String getFactoryId() { 478 return factoryId; 479 } 480 481 public final WALProvider getWALProvider() { 482 return this.provider; 483 } 484 485 public final WALProvider getMetaWALProvider() { 486 return this.metaProvider.get(); 487 } 488}