001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicReference; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 029import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 030import org.apache.hadoop.hbase.util.CancelableProgressable; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 033import org.apache.hadoop.hbase.wal.WAL.Reader; 034import org.apache.hadoop.hbase.wal.WALProvider.Writer; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040 041/** 042 * Entry point for users of the Write Ahead Log. 043 * Acts as the shim between internal use and the particular WALProvider we use to handle wal 044 * requests. 045 * 046 * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available 047 * implementations: 048 * <ul> 049 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 050 * "asyncfs"</li> 051 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 052 * FileSystem interface via an asynchronous client.</li> 053 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 054 * FileSystem interface via HDFS's synchronous DFSClient.</li> 055 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 056 * server.</li> 057 * </ul> 058 * 059 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 060 */ 061@InterfaceAudience.Private 062public class WALFactory { 063 064 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 065 066 /** 067 * Maps between configuration names for providers and implementation classes. 068 */ 069 static enum Providers { 070 defaultProvider(AsyncFSWALProvider.class), 071 filesystem(FSHLogProvider.class), 072 multiwal(RegionGroupingProvider.class), 073 asyncfs(AsyncFSWALProvider.class); 074 075 final Class<? extends WALProvider> clazz; 076 Providers(Class<? extends WALProvider> clazz) { 077 this.clazz = clazz; 078 } 079 } 080 081 public static final String WAL_PROVIDER = "hbase.wal.provider"; 082 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 083 084 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 085 086 final String factoryId; 087 private final WALProvider provider; 088 // The meta updates are written to a different wal. If this 089 // regionserver holds meta regions, then this ref will be non-null. 090 // lazily intialized; most RegionServers don't deal with META 091 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 092 093 /** 094 * Configuration-specified WAL Reader used when a custom reader is requested 095 */ 096 private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass; 097 098 /** 099 * How long to attempt opening in-recovery wals 100 */ 101 private final int timeoutMillis; 102 103 private final Configuration conf; 104 105 // Used for the singleton WALFactory, see below. 106 private WALFactory(Configuration conf) { 107 // this code is duplicated here so we can keep our members final. 108 // until we've moved reader/writer construction down into providers, this initialization must 109 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 110 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 111 /* TODO Both of these are probably specific to the fs wal provider */ 112 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 113 AbstractFSWALProvider.Reader.class); 114 this.conf = conf; 115 // end required early initialization 116 117 // this instance can't create wals, just reader/writers. 118 provider = null; 119 factoryId = SINGLETON_ID; 120 } 121 122 @VisibleForTesting 123 Providers getDefaultProvider() { 124 return Providers.defaultProvider; 125 } 126 127 @VisibleForTesting 128 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 129 try { 130 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 131 132 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 133 // the default and we can't use it, we want to fall back to FSHLog which we know works on 134 // all versions. 135 if (provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 136 && !AsyncFSWALProvider.load()) { 137 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 138 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 139 // easily broken when upgrading hadoop. 140 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 141 return FSHLogProvider.class; 142 } 143 144 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 145 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 146 // not fall back to FSHLogProvider. 147 return provider.clazz; 148 } catch (IllegalArgumentException exception) { 149 // Fall back to them specifying a class name 150 // Note that the passed default class shouldn't actually be used, since the above only fails 151 // when there is a config value present. 152 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 153 } 154 } 155 156 WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId) 157 throws IOException { 158 LOG.info("Instantiating WALProvider of type " + clazz); 159 try { 160 final WALProvider result = clazz.getDeclaredConstructor().newInstance(); 161 result.init(this, conf, providerId); 162 return result; 163 } catch (Exception e) { 164 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 165 LOG.debug("Exception details for failure to load WALProvider.", e); 166 throw new IOException("couldn't set up WALProvider", e); 167 } 168 } 169 170 /** 171 * instantiate a provider from a config property. requires conf to have already been set (as well 172 * as anything the provider might need to read). 173 */ 174 WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { 175 Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue); 176 WALProvider provider = createProvider(clazz, providerId); 177 provider.addWALActionsListener(new MetricsWAL()); 178 return provider; 179 } 180 181 /** 182 * @param conf must not be null, will keep a reference to read params in later reader/writer 183 * instances. 184 * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations 185 * to make a directory 186 */ 187 public WALFactory(Configuration conf, String factoryId) throws IOException { 188 // until we've moved reader/writer construction down into providers, this initialization must 189 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 190 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 191 /* TODO Both of these are probably specific to the fs wal provider */ 192 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 193 AbstractFSWALProvider.Reader.class); 194 this.conf = conf; 195 this.factoryId = factoryId; 196 // end required early initialization 197 if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { 198 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); 199 } else { 200 // special handling of existing configuration behavior. 201 LOG.warn("Running with WAL disabled."); 202 provider = new DisabledWALProvider(); 203 provider.init(this, conf, factoryId); 204 } 205 } 206 207 /** 208 * Shutdown all WALs and clean up any underlying storage. 209 * Use only when you will not need to replay and edits that have gone to any wals from this 210 * factory. 211 */ 212 public void close() throws IOException { 213 final WALProvider metaProvider = this.metaProvider.get(); 214 if (null != metaProvider) { 215 metaProvider.close(); 216 } 217 // close is called on a WALFactory with null provider in the case of contention handling 218 // within the getInstance method. 219 if (null != provider) { 220 provider.close(); 221 } 222 } 223 224 /** 225 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. 226 * If you are not ending cleanly and will need to replay edits from this factory's wals, 227 * use this method if you can as it will try to leave things as tidy as possible. 228 */ 229 public void shutdown() throws IOException { 230 IOException exception = null; 231 final WALProvider metaProvider = this.metaProvider.get(); 232 if (null != metaProvider) { 233 try { 234 metaProvider.shutdown(); 235 } catch(IOException ioe) { 236 exception = ioe; 237 } 238 } 239 provider.shutdown(); 240 if (null != exception) { 241 throw exception; 242 } 243 } 244 245 public List<WAL> getWALs() { 246 return provider.getWALs(); 247 } 248 249 @VisibleForTesting 250 WALProvider getMetaProvider() throws IOException { 251 for (;;) { 252 WALProvider provider = this.metaProvider.get(); 253 if (provider != null) { 254 return provider; 255 } 256 Class<? extends WALProvider> clz = null; 257 if (conf.get(META_WAL_PROVIDER) == null) { 258 try { 259 clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); 260 } catch (Throwable t) { 261 // the WAL provider should be an enum. Proceed 262 } 263 } 264 if (clz == null){ 265 clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 266 } 267 provider = createProvider(clz, AbstractFSWALProvider.META_WAL_PROVIDER_ID); 268 if (metaProvider.compareAndSet(null, provider)) { 269 return provider; 270 } else { 271 // someone is ahead of us, close and try again. 272 provider.close(); 273 } 274 } 275 } 276 277 /** 278 * @param region the region which we want to get a WAL for it. Could be null. 279 */ 280 public WAL getWAL(RegionInfo region) throws IOException { 281 // use different WAL for hbase:meta 282 if (region != null && region.isMetaRegion() && 283 region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { 284 return getMetaProvider().getWAL(region); 285 } else { 286 return provider.getWAL(region); 287 } 288 } 289 290 public Reader createReader(final FileSystem fs, final Path path) throws IOException { 291 return createReader(fs, path, (CancelableProgressable)null); 292 } 293 294 /** 295 * Create a reader for the WAL. If you are reading from a file that's being written to and need 296 * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method 297 * then just seek back to the last known good position. 298 * @return A WAL reader. Close when done with it. 299 * @throws IOException 300 */ 301 public Reader createReader(final FileSystem fs, final Path path, 302 CancelableProgressable reporter) throws IOException { 303 return createReader(fs, path, reporter, true); 304 } 305 306 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, 307 boolean allowCustom) throws IOException { 308 Class<? extends AbstractFSWALProvider.Reader> lrClass = 309 allowCustom ? logReaderClass : ProtobufLogReader.class; 310 try { 311 // A wal file could be under recovery, so it may take several 312 // tries to get it open. Instead of claiming it is corrupted, retry 313 // to open it up to 5 minutes by default. 314 long startWaiting = EnvironmentEdgeManager.currentTime(); 315 long openTimeout = timeoutMillis + startWaiting; 316 int nbAttempt = 0; 317 AbstractFSWALProvider.Reader reader = null; 318 while (true) { 319 try { 320 reader = lrClass.getDeclaredConstructor().newInstance(); 321 reader.init(fs, path, conf, null); 322 return reader; 323 } catch (IOException e) { 324 if (reader != null) { 325 try { 326 reader.close(); 327 } catch (IOException exception) { 328 LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); 329 LOG.debug("exception details", exception); 330 } 331 } 332 333 String msg = e.getMessage(); 334 if (msg != null 335 && (msg.contains("Cannot obtain block length") 336 || msg.contains("Could not obtain the last block") || msg 337 .matches("Blocklist for [^ ]* has changed.*"))) { 338 if (++nbAttempt == 1) { 339 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 340 } 341 if (reporter != null && !reporter.progress()) { 342 throw new InterruptedIOException("Operation is cancelled"); 343 } 344 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 345 LOG.error("Can't open after " + nbAttempt + " attempts and " 346 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 347 } else { 348 try { 349 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 350 continue; // retry 351 } catch (InterruptedException ie) { 352 InterruptedIOException iioe = new InterruptedIOException(); 353 iioe.initCause(ie); 354 throw iioe; 355 } 356 } 357 throw new LeaseNotRecoveredException(e); 358 } else { 359 throw e; 360 } 361 } 362 } 363 } catch (IOException ie) { 364 throw ie; 365 } catch (Exception e) { 366 throw new IOException("Cannot get log reader", e); 367 } 368 } 369 370 /** 371 * Create a writer for the WAL. 372 * Uses defaults. 373 * <p> 374 * Should be package-private. public only for tests and 375 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 376 * @return A WAL writer. Close when done with it. 377 */ 378 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 379 return FSHLogProvider.createWriter(conf, fs, path, false); 380 } 381 382 /** 383 * Should be package-private, visible for recovery testing. 384 * Uses defaults. 385 * @return an overwritable writer for recovered edits. caller should close. 386 */ 387 @VisibleForTesting 388 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 389 throws IOException { 390 return FSHLogProvider.createWriter(conf, fs, path, true); 391 } 392 393 // These static methods are currently used where it's impractical to 394 // untangle the reliance on state in the filesystem. They rely on singleton 395 // WALFactory that just provides Reader / Writers. 396 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 397 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 398 private static final String SINGLETON_ID = WALFactory.class.getName(); 399 400 // Public only for FSHLog 401 public static WALFactory getInstance(Configuration configuration) { 402 WALFactory factory = singleton.get(); 403 if (null == factory) { 404 WALFactory temp = new WALFactory(configuration); 405 if (singleton.compareAndSet(null, temp)) { 406 factory = temp; 407 } else { 408 // someone else beat us to initializing 409 try { 410 temp.close(); 411 } catch (IOException exception) { 412 LOG.debug("failed to close temporary singleton. ignoring.", exception); 413 } 414 factory = singleton.get(); 415 } 416 } 417 return factory; 418 } 419 420 /** 421 * Create a reader for the given path, accept custom reader classes from conf. 422 * If you already have a WALFactory, you should favor the instance method. 423 * @return a WAL Reader, caller must close. 424 */ 425 public static Reader createReader(final FileSystem fs, final Path path, 426 final Configuration configuration) throws IOException { 427 return getInstance(configuration).createReader(fs, path); 428 } 429 430 /** 431 * Create a reader for the given path, accept custom reader classes from conf. 432 * If you already have a WALFactory, you should favor the instance method. 433 * @return a WAL Reader, caller must close. 434 */ 435 static Reader createReader(final FileSystem fs, final Path path, 436 final Configuration configuration, final CancelableProgressable reporter) throws IOException { 437 return getInstance(configuration).createReader(fs, path, reporter); 438 } 439 440 /** 441 * Create a reader for the given path, ignore custom reader classes from conf. 442 * If you already have a WALFactory, you should favor the instance method. 443 * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 444 * @return a WAL Reader, caller must close. 445 */ 446 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, 447 final Configuration configuration) throws IOException { 448 return getInstance(configuration).createReader(fs, path, null, false); 449 } 450 451 /** 452 * If you already have a WALFactory, you should favor the instance method. 453 * Uses defaults. 454 * @return a Writer that will overwrite files. Caller must close. 455 */ 456 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 457 final Configuration configuration) 458 throws IOException { 459 return FSHLogProvider.createWriter(configuration, fs, path, true); 460 } 461 462 /** 463 * If you already have a WALFactory, you should favor the instance method. 464 * Uses defaults. 465 * @return a writer that won't overwrite files. Caller must close. 466 */ 467 @VisibleForTesting 468 public static Writer createWALWriter(final FileSystem fs, final Path path, 469 final Configuration configuration) 470 throws IOException { 471 return FSHLogProvider.createWriter(configuration, fs, path, false); 472 } 473 474 @VisibleForTesting 475 public String getFactoryId() { 476 return factoryId; 477 } 478 479 public final WALProvider getWALProvider() { 480 return this.provider; 481 } 482 483 public final WALProvider getMetaWALProvider() { 484 return this.metaProvider.get(); 485 } 486}