001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicReference; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 031import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 032import org.apache.hadoop.hbase.util.CancelableProgressable; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 035import org.apache.hadoop.hbase.wal.WAL.Reader; 036import org.apache.hadoop.hbase.wal.WALProvider.Writer; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Entry point for users of the Write Ahead Log. 043 * Acts as the shim between internal use and the particular WALProvider we use to handle wal 044 * requests. 045 * 046 * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available 047 * implementations: 048 * <ul> 049 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 050 * "asyncfs"</li> 051 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 052 * FileSystem interface via an asynchronous client.</li> 053 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 054 * FileSystem interface via HDFS's synchronous DFSClient.</li> 055 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 056 * server.</li> 057 * </ul> 058 * 059 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 060 */ 061@InterfaceAudience.Private 062public class WALFactory { 063 064 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 065 066 /** 067 * Maps between configuration names for providers and implementation classes. 068 */ 069 enum Providers { 070 defaultProvider(AsyncFSWALProvider.class), 071 filesystem(FSHLogProvider.class), 072 multiwal(RegionGroupingProvider.class), 073 asyncfs(AsyncFSWALProvider.class); 074 075 final Class<? extends WALProvider> clazz; 076 Providers(Class<? extends WALProvider> clazz) { 077 this.clazz = clazz; 078 } 079 } 080 081 public static final String WAL_PROVIDER = "hbase.wal.provider"; 082 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 083 084 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 085 086 public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; 087 088 final String factoryId; 089 final Abortable abortable; 090 private final WALProvider provider; 091 // The meta updates are written to a different wal. If this 092 // regionserver holds meta regions, then this ref will be non-null. 093 // lazily intialized; most RegionServers don't deal with META 094 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 095 096 /** 097 * Configuration-specified WAL Reader used when a custom reader is requested 098 */ 099 private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass; 100 101 /** 102 * How long to attempt opening in-recovery wals 103 */ 104 private final int timeoutMillis; 105 106 private final Configuration conf; 107 108 // Used for the singleton WALFactory, see below. 109 private WALFactory(Configuration conf) { 110 // this code is duplicated here so we can keep our members final. 111 // until we've moved reader/writer construction down into providers, this initialization must 112 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 113 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 114 /* TODO Both of these are probably specific to the fs wal provider */ 115 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 116 AbstractFSWALProvider.Reader.class); 117 this.conf = conf; 118 // end required early initialization 119 120 // this instance can't create wals, just reader/writers. 121 provider = null; 122 factoryId = SINGLETON_ID; 123 this.abortable = null; 124 } 125 126 Providers getDefaultProvider() { 127 return Providers.defaultProvider; 128 } 129 130 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 131 try { 132 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 133 134 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 135 // the default and we can't use it, we want to fall back to FSHLog which we know works on 136 // all versions. 137 if (provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 138 && !AsyncFSWALProvider.load()) { 139 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 140 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 141 // easily broken when upgrading hadoop. 142 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 143 return FSHLogProvider.class; 144 } 145 146 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 147 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 148 // not fall back to FSHLogProvider. 149 return provider.clazz; 150 } catch (IllegalArgumentException exception) { 151 // Fall back to them specifying a class name 152 // Note that the passed default class shouldn't actually be used, since the above only fails 153 // when there is a config value present. 154 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 155 } 156 } 157 158 WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId) 159 throws IOException { 160 LOG.info("Instantiating WALProvider of type " + clazz); 161 try { 162 final WALProvider result = clazz.getDeclaredConstructor().newInstance(); 163 result.init(this, conf, providerId, this.abortable); 164 return result; 165 } catch (Exception e) { 166 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 167 LOG.debug("Exception details for failure to load WALProvider.", e); 168 throw new IOException("couldn't set up WALProvider", e); 169 } 170 } 171 172 /** 173 * instantiate a provider from a config property. requires conf to have already been set (as well 174 * as anything the provider might need to read). 175 */ 176 WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { 177 Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue); 178 WALProvider provider = createProvider(clazz, providerId); 179 provider.addWALActionsListener(new MetricsWAL()); 180 return provider; 181 } 182 183 public WALFactory(Configuration conf, String factoryId) throws IOException { 184 this(conf, factoryId, null); 185 } 186 187 /** 188 * @param conf must not be null, will keep a reference to read params in later reader/writer 189 * instances. 190 * @param abortable the server to abort 191 */ 192 public WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException { 193 // until we've moved reader/writer construction down into providers, this initialization must 194 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 195 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 196 /* TODO Both of these are probably specific to the fs wal provider */ 197 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 198 AbstractFSWALProvider.Reader.class); 199 this.conf = conf; 200 this.factoryId = factoryId; 201 this.abortable = abortable; 202 // end required early initialization 203 if (conf.getBoolean(WAL_ENABLED, true)) { 204 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); 205 } else { 206 // special handling of existing configuration behavior. 207 LOG.warn("Running with WAL disabled."); 208 provider = new DisabledWALProvider(); 209 provider.init(this, conf, factoryId, null); 210 } 211 } 212 213 /** 214 * Shutdown all WALs and clean up any underlying storage. 215 * Use only when you will not need to replay and edits that have gone to any wals from this 216 * factory. 217 */ 218 public void close() throws IOException { 219 final WALProvider metaProvider = this.metaProvider.get(); 220 if (null != metaProvider) { 221 metaProvider.close(); 222 } 223 // close is called on a WALFactory with null provider in the case of contention handling 224 // within the getInstance method. 225 if (null != provider) { 226 provider.close(); 227 } 228 } 229 230 /** 231 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. 232 * If you are not ending cleanly and will need to replay edits from this factory's wals, 233 * use this method if you can as it will try to leave things as tidy as possible. 234 */ 235 public void shutdown() throws IOException { 236 IOException exception = null; 237 final WALProvider metaProvider = this.metaProvider.get(); 238 if (null != metaProvider) { 239 try { 240 metaProvider.shutdown(); 241 } catch(IOException ioe) { 242 exception = ioe; 243 } 244 } 245 provider.shutdown(); 246 if (null != exception) { 247 throw exception; 248 } 249 } 250 251 public List<WAL> getWALs() { 252 return provider.getWALs(); 253 } 254 255 /** 256 * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of 257 * creating the first hbase:meta WAL so we can register a listener. 258 * @see #getMetaWALProvider() 259 */ 260 public WALProvider getMetaProvider() throws IOException { 261 for (;;) { 262 WALProvider provider = this.metaProvider.get(); 263 if (provider != null) { 264 return provider; 265 } 266 Class<? extends WALProvider> clz = null; 267 if (conf.get(META_WAL_PROVIDER) == null) { 268 try { 269 clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class); 270 } catch (Throwable t) { 271 // the WAL provider should be an enum. Proceed 272 } 273 } 274 if (clz == null){ 275 clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); 276 } 277 provider = createProvider(clz, AbstractFSWALProvider.META_WAL_PROVIDER_ID); 278 if (metaProvider.compareAndSet(null, provider)) { 279 return provider; 280 } else { 281 // someone is ahead of us, close and try again. 282 provider.close(); 283 } 284 } 285 } 286 287 /** 288 * @param region the region which we want to get a WAL for. Could be null. 289 */ 290 public WAL getWAL(RegionInfo region) throws IOException { 291 // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up. 292 if (region != null && region.isMetaRegion() && 293 region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { 294 return getMetaProvider().getWAL(region); 295 } else { 296 return provider.getWAL(region); 297 } 298 } 299 300 public Reader createReader(final FileSystem fs, final Path path) throws IOException { 301 return createReader(fs, path, (CancelableProgressable)null); 302 } 303 304 /** 305 * Create a reader for the WAL. If you are reading from a file that's being written to and need 306 * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method 307 * then just seek back to the last known good position. 308 * @return A WAL reader. Close when done with it. 309 */ 310 public Reader createReader(final FileSystem fs, final Path path, 311 CancelableProgressable reporter) throws IOException { 312 return createReader(fs, path, reporter, true); 313 } 314 315 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, 316 boolean allowCustom) throws IOException { 317 Class<? extends AbstractFSWALProvider.Reader> lrClass = 318 allowCustom ? logReaderClass : ProtobufLogReader.class; 319 try { 320 // A wal file could be under recovery, so it may take several 321 // tries to get it open. Instead of claiming it is corrupted, retry 322 // to open it up to 5 minutes by default. 323 long startWaiting = EnvironmentEdgeManager.currentTime(); 324 long openTimeout = timeoutMillis + startWaiting; 325 int nbAttempt = 0; 326 AbstractFSWALProvider.Reader reader = null; 327 while (true) { 328 try { 329 reader = lrClass.getDeclaredConstructor().newInstance(); 330 reader.init(fs, path, conf, null); 331 return reader; 332 } catch (IOException e) { 333 if (reader != null) { 334 try { 335 reader.close(); 336 } catch (IOException exception) { 337 LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); 338 LOG.debug("exception details", exception); 339 } 340 } 341 342 String msg = e.getMessage(); 343 if (msg != null 344 && (msg.contains("Cannot obtain block length") 345 || msg.contains("Could not obtain the last block") || msg 346 .matches("Blocklist for [^ ]* has changed.*"))) { 347 if (++nbAttempt == 1) { 348 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 349 } 350 if (reporter != null && !reporter.progress()) { 351 throw new InterruptedIOException("Operation is cancelled"); 352 } 353 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 354 LOG.error("Can't open after " + nbAttempt + " attempts and " 355 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 356 } else { 357 try { 358 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 359 continue; // retry 360 } catch (InterruptedException ie) { 361 InterruptedIOException iioe = new InterruptedIOException(); 362 iioe.initCause(ie); 363 throw iioe; 364 } 365 } 366 throw new LeaseNotRecoveredException(e); 367 } else { 368 throw e; 369 } 370 } 371 } 372 } catch (IOException ie) { 373 throw ie; 374 } catch (Exception e) { 375 throw new IOException("Cannot get log reader", e); 376 } 377 } 378 379 /** 380 * Create a writer for the WAL. 381 * Uses defaults. 382 * <p> 383 * Should be package-private. public only for tests and 384 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 385 * @return A WAL writer. Close when done with it. 386 */ 387 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 388 return FSHLogProvider.createWriter(conf, fs, path, false); 389 } 390 391 /** 392 * Should be package-private, visible for recovery testing. 393 * Uses defaults. 394 * @return an overwritable writer for recovered edits. caller should close. 395 */ 396 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 397 throws IOException { 398 return FSHLogProvider.createWriter(conf, fs, path, true); 399 } 400 401 // These static methods are currently used where it's impractical to 402 // untangle the reliance on state in the filesystem. They rely on singleton 403 // WALFactory that just provides Reader / Writers. 404 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 405 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 406 private static final String SINGLETON_ID = WALFactory.class.getName(); 407 408 // Public only for FSHLog 409 public static WALFactory getInstance(Configuration configuration) { 410 WALFactory factory = singleton.get(); 411 if (null == factory) { 412 WALFactory temp = new WALFactory(configuration); 413 if (singleton.compareAndSet(null, temp)) { 414 factory = temp; 415 } else { 416 // someone else beat us to initializing 417 try { 418 temp.close(); 419 } catch (IOException exception) { 420 LOG.debug("failed to close temporary singleton. ignoring.", exception); 421 } 422 factory = singleton.get(); 423 } 424 } 425 return factory; 426 } 427 428 /** 429 * Create a reader for the given path, accept custom reader classes from conf. 430 * If you already have a WALFactory, you should favor the instance method. 431 * @return a WAL Reader, caller must close. 432 */ 433 public static Reader createReader(final FileSystem fs, final Path path, 434 final Configuration configuration) throws IOException { 435 return getInstance(configuration).createReader(fs, path); 436 } 437 438 /** 439 * Create a reader for the given path, accept custom reader classes from conf. 440 * If you already have a WALFactory, you should favor the instance method. 441 * @return a WAL Reader, caller must close. 442 */ 443 static Reader createReader(final FileSystem fs, final Path path, 444 final Configuration configuration, final CancelableProgressable reporter) throws IOException { 445 return getInstance(configuration).createReader(fs, path, reporter); 446 } 447 448 /** 449 * Create a reader for the given path, ignore custom reader classes from conf. 450 * If you already have a WALFactory, you should favor the instance method. 451 * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 452 * @return a WAL Reader, caller must close. 453 */ 454 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, 455 final Configuration configuration) throws IOException { 456 return getInstance(configuration).createReader(fs, path, null, false); 457 } 458 459 /** 460 * If you already have a WALFactory, you should favor the instance method. 461 * Uses defaults. 462 * @return a Writer that will overwrite files. Caller must close. 463 */ 464 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 465 final Configuration configuration) 466 throws IOException { 467 return FSHLogProvider.createWriter(configuration, fs, path, true); 468 } 469 470 /** 471 * If you already have a WALFactory, you should favor the instance method. 472 * Uses defaults. 473 * @return a writer that won't overwrite files. Caller must close. 474 */ 475 public static Writer createWALWriter(final FileSystem fs, final Path path, 476 final Configuration configuration) 477 throws IOException { 478 return FSHLogProvider.createWriter(configuration, fs, path, false); 479 } 480 481 public String getFactoryId() { 482 return factoryId; 483 } 484 485 public final WALProvider getWALProvider() { 486 return this.provider; 487 } 488 489 /** 490 * @return Current metaProvider... may be null if not yet initialized. 491 * @see #getMetaProvider() 492 */ 493 public final WALProvider getMetaWALProvider() { 494 return this.metaProvider.get(); 495 } 496}