001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicReference; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 029import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 030import org.apache.hadoop.hbase.util.CancelableProgressable; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 033import org.apache.hadoop.hbase.wal.WAL.Reader; 034import org.apache.hadoop.hbase.wal.WALProvider.Writer; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040 041/** 042 * Entry point for users of the Write Ahead Log. 043 * Acts as the shim between internal use and the particular WALProvider we use to handle wal 044 * requests. 045 * 046 * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available 047 * implementations: 048 * <ul> 049 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 050 * "asyncfs"</li> 051 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop 052 * FileSystem interface via an asynchronous client.</li> 053 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 054 * FileSystem interface via HDFS's synchronous DFSClient.</li> 055 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 056 * server.</li> 057 * </ul> 058 * 059 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name. 060 */ 061@InterfaceAudience.Private 062public class WALFactory { 063 064 private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class); 065 066 /** 067 * Maps between configuration names for providers and implementation classes. 068 */ 069 static enum Providers { 070 defaultProvider(AsyncFSWALProvider.class), 071 filesystem(FSHLogProvider.class), 072 multiwal(RegionGroupingProvider.class), 073 asyncfs(AsyncFSWALProvider.class); 074 075 final Class<? extends WALProvider> clazz; 076 Providers(Class<? extends WALProvider> clazz) { 077 this.clazz = clazz; 078 } 079 } 080 081 public static final String WAL_PROVIDER = "hbase.wal.provider"; 082 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 083 084 public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 085 086 final String factoryId; 087 private final WALProvider provider; 088 // The meta updates are written to a different wal. If this 089 // regionserver holds meta regions, then this ref will be non-null. 090 // lazily intialized; most RegionServers don't deal with META 091 private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>(); 092 093 /** 094 * Configuration-specified WAL Reader used when a custom reader is requested 095 */ 096 private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass; 097 098 /** 099 * How long to attempt opening in-recovery wals 100 */ 101 private final int timeoutMillis; 102 103 private final Configuration conf; 104 105 // Used for the singleton WALFactory, see below. 106 private WALFactory(Configuration conf) { 107 // this code is duplicated here so we can keep our members final. 108 // until we've moved reader/writer construction down into providers, this initialization must 109 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 110 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 111 /* TODO Both of these are probably specific to the fs wal provider */ 112 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 113 AbstractFSWALProvider.Reader.class); 114 this.conf = conf; 115 // end required early initialization 116 117 // this instance can't create wals, just reader/writers. 118 provider = null; 119 factoryId = SINGLETON_ID; 120 } 121 122 @VisibleForTesting 123 Providers getDefaultProvider() { 124 return Providers.defaultProvider; 125 } 126 127 @VisibleForTesting 128 public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) { 129 try { 130 Providers provider = Providers.valueOf(conf.get(key, defaultValue)); 131 132 // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as 133 // the default and we can't use it, we want to fall back to FSHLog which we know works on 134 // all versions. 135 if (provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class 136 && !AsyncFSWALProvider.load()) { 137 // AsyncFSWAL has better performance in most cases, and also uses less resources, we will 138 // try to use it if possible. It deeply hacks into the internal of DFSClient so will be 139 // easily broken when upgrading hadoop. 140 LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider"); 141 return FSHLogProvider.class; 142 } 143 144 // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't 145 // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and 146 // not fall back to FSHLogProvider. 147 return provider.clazz; 148 } catch (IllegalArgumentException exception) { 149 // Fall back to them specifying a class name 150 // Note that the passed default class shouldn't actually be used, since the above only fails 151 // when there is a config value present. 152 return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); 153 } 154 } 155 156 WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId) 157 throws IOException { 158 LOG.info("Instantiating WALProvider of type " + clazz); 159 try { 160 final WALProvider result = clazz.getDeclaredConstructor().newInstance(); 161 result.init(this, conf, providerId); 162 return result; 163 } catch (Exception e) { 164 LOG.error("couldn't set up WALProvider, the configured class is " + clazz); 165 LOG.debug("Exception details for failure to load WALProvider.", e); 166 throw new IOException("couldn't set up WALProvider", e); 167 } 168 } 169 170 /** 171 * instantiate a provider from a config property. requires conf to have already been set (as well 172 * as anything the provider might need to read). 173 */ 174 WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { 175 Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue); 176 WALProvider provider = createProvider(clazz, providerId); 177 provider.addWALActionsListener(new MetricsWAL()); 178 return provider; 179 } 180 181 /** 182 * @param conf must not be null, will keep a reference to read params in later reader/writer 183 * instances. 184 * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations 185 * to make a directory 186 */ 187 public WALFactory(Configuration conf, String factoryId) throws IOException { 188 // until we've moved reader/writer construction down into providers, this initialization must 189 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 190 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 191 /* TODO Both of these are probably specific to the fs wal provider */ 192 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 193 AbstractFSWALProvider.Reader.class); 194 this.conf = conf; 195 this.factoryId = factoryId; 196 // end required early initialization 197 if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { 198 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); 199 } else { 200 // special handling of existing configuration behavior. 201 LOG.warn("Running with WAL disabled."); 202 provider = new DisabledWALProvider(); 203 provider.init(this, conf, factoryId); 204 } 205 } 206 207 /** 208 * Shutdown all WALs and clean up any underlying storage. 209 * Use only when you will not need to replay and edits that have gone to any wals from this 210 * factory. 211 */ 212 public void close() throws IOException { 213 final WALProvider metaProvider = this.metaProvider.get(); 214 if (null != metaProvider) { 215 metaProvider.close(); 216 } 217 // close is called on a WALFactory with null provider in the case of contention handling 218 // within the getInstance method. 219 if (null != provider) { 220 provider.close(); 221 } 222 } 223 224 /** 225 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. 226 * If you are not ending cleanly and will need to replay edits from this factory's wals, 227 * use this method if you can as it will try to leave things as tidy as possible. 228 */ 229 public void shutdown() throws IOException { 230 IOException exception = null; 231 final WALProvider metaProvider = this.metaProvider.get(); 232 if (null != metaProvider) { 233 try { 234 metaProvider.shutdown(); 235 } catch(IOException ioe) { 236 exception = ioe; 237 } 238 } 239 provider.shutdown(); 240 if (null != exception) { 241 throw exception; 242 } 243 } 244 245 public List<WAL> getWALs() { 246 return provider.getWALs(); 247 } 248 249 @VisibleForTesting 250 WALProvider getMetaProvider() throws IOException { 251 for (;;) { 252 WALProvider provider = this.metaProvider.get(); 253 if (provider != null) { 254 return provider; 255 } 256 provider = getProvider(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER), 257 AbstractFSWALProvider.META_WAL_PROVIDER_ID); 258 if (metaProvider.compareAndSet(null, provider)) { 259 return provider; 260 } else { 261 // someone is ahead of us, close and try again. 262 provider.close(); 263 } 264 } 265 } 266 267 /** 268 * @param region the region which we want to get a WAL for it. Could be null. 269 */ 270 public WAL getWAL(RegionInfo region) throws IOException { 271 // use different WAL for hbase:meta 272 if (region != null && region.isMetaRegion() && 273 region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) { 274 return getMetaProvider().getWAL(region); 275 } else { 276 return provider.getWAL(region); 277 } 278 } 279 280 public Reader createReader(final FileSystem fs, final Path path) throws IOException { 281 return createReader(fs, path, (CancelableProgressable)null); 282 } 283 284 /** 285 * Create a reader for the WAL. If you are reading from a file that's being written to and need 286 * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method 287 * then just seek back to the last known good position. 288 * @return A WAL reader. Close when done with it. 289 * @throws IOException 290 */ 291 public Reader createReader(final FileSystem fs, final Path path, 292 CancelableProgressable reporter) throws IOException { 293 return createReader(fs, path, reporter, true); 294 } 295 296 public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, 297 boolean allowCustom) throws IOException { 298 Class<? extends AbstractFSWALProvider.Reader> lrClass = 299 allowCustom ? logReaderClass : ProtobufLogReader.class; 300 try { 301 // A wal file could be under recovery, so it may take several 302 // tries to get it open. Instead of claiming it is corrupted, retry 303 // to open it up to 5 minutes by default. 304 long startWaiting = EnvironmentEdgeManager.currentTime(); 305 long openTimeout = timeoutMillis + startWaiting; 306 int nbAttempt = 0; 307 AbstractFSWALProvider.Reader reader = null; 308 while (true) { 309 try { 310 reader = lrClass.getDeclaredConstructor().newInstance(); 311 reader.init(fs, path, conf, null); 312 return reader; 313 } catch (IOException e) { 314 if (reader != null) { 315 try { 316 reader.close(); 317 } catch (IOException exception) { 318 LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); 319 LOG.debug("exception details", exception); 320 } 321 } 322 323 String msg = e.getMessage(); 324 if (msg != null 325 && (msg.contains("Cannot obtain block length") 326 || msg.contains("Could not obtain the last block") || msg 327 .matches("Blocklist for [^ ]* has changed.*"))) { 328 if (++nbAttempt == 1) { 329 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 330 } 331 if (reporter != null && !reporter.progress()) { 332 throw new InterruptedIOException("Operation is cancelled"); 333 } 334 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 335 LOG.error("Can't open after " + nbAttempt + " attempts and " 336 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); 337 } else { 338 try { 339 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 340 continue; // retry 341 } catch (InterruptedException ie) { 342 InterruptedIOException iioe = new InterruptedIOException(); 343 iioe.initCause(ie); 344 throw iioe; 345 } 346 } 347 throw new LeaseNotRecoveredException(e); 348 } else { 349 throw e; 350 } 351 } 352 } 353 } catch (IOException ie) { 354 throw ie; 355 } catch (Exception e) { 356 throw new IOException("Cannot get log reader", e); 357 } 358 } 359 360 /** 361 * Create a writer for the WAL. 362 * Uses defaults. 363 * <p> 364 * Should be package-private. public only for tests and 365 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 366 * @return A WAL writer. Close when done with it. 367 */ 368 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 369 return FSHLogProvider.createWriter(conf, fs, path, false); 370 } 371 372 /** 373 * Should be package-private, visible for recovery testing. 374 * Uses defaults. 375 * @return an overwritable writer for recovered edits. caller should close. 376 */ 377 @VisibleForTesting 378 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 379 throws IOException { 380 return FSHLogProvider.createWriter(conf, fs, path, true); 381 } 382 383 // These static methods are currently used where it's impractical to 384 // untangle the reliance on state in the filesystem. They rely on singleton 385 // WALFactory that just provides Reader / Writers. 386 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 387 private static final AtomicReference<WALFactory> singleton = new AtomicReference<>(); 388 private static final String SINGLETON_ID = WALFactory.class.getName(); 389 390 // Public only for FSHLog 391 public static WALFactory getInstance(Configuration configuration) { 392 WALFactory factory = singleton.get(); 393 if (null == factory) { 394 WALFactory temp = new WALFactory(configuration); 395 if (singleton.compareAndSet(null, temp)) { 396 factory = temp; 397 } else { 398 // someone else beat us to initializing 399 try { 400 temp.close(); 401 } catch (IOException exception) { 402 LOG.debug("failed to close temporary singleton. ignoring.", exception); 403 } 404 factory = singleton.get(); 405 } 406 } 407 return factory; 408 } 409 410 /** 411 * Create a reader for the given path, accept custom reader classes from conf. 412 * If you already have a WALFactory, you should favor the instance method. 413 * @return a WAL Reader, caller must close. 414 */ 415 public static Reader createReader(final FileSystem fs, final Path path, 416 final Configuration configuration) throws IOException { 417 return getInstance(configuration).createReader(fs, path); 418 } 419 420 /** 421 * Create a reader for the given path, accept custom reader classes from conf. 422 * If you already have a WALFactory, you should favor the instance method. 423 * @return a WAL Reader, caller must close. 424 */ 425 static Reader createReader(final FileSystem fs, final Path path, 426 final Configuration configuration, final CancelableProgressable reporter) throws IOException { 427 return getInstance(configuration).createReader(fs, path, reporter); 428 } 429 430 /** 431 * Create a reader for the given path, ignore custom reader classes from conf. 432 * If you already have a WALFactory, you should favor the instance method. 433 * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 434 * @return a WAL Reader, caller must close. 435 */ 436 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, 437 final Configuration configuration) throws IOException { 438 return getInstance(configuration).createReader(fs, path, null, false); 439 } 440 441 /** 442 * If you already have a WALFactory, you should favor the instance method. 443 * Uses defaults. 444 * @return a Writer that will overwrite files. Caller must close. 445 */ 446 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 447 final Configuration configuration) 448 throws IOException { 449 return FSHLogProvider.createWriter(configuration, fs, path, true); 450 } 451 452 /** 453 * If you already have a WALFactory, you should favor the instance method. 454 * Uses defaults. 455 * @return a writer that won't overwrite files. Caller must close. 456 */ 457 @VisibleForTesting 458 public static Writer createWALWriter(final FileSystem fs, final Path path, 459 final Configuration configuration) 460 throws IOException { 461 return FSHLogProvider.createWriter(configuration, fs, path, false); 462 } 463 464 public final WALProvider getWALProvider() { 465 return this.provider; 466 } 467 468 public final WALProvider getMetaWALProvider() { 469 return this.metaProvider.get(); 470 } 471}