001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicReference;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
029import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
030import org.apache.hadoop.hbase.util.CancelableProgressable;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
033import org.apache.hadoop.hbase.wal.WAL.Reader;
034import org.apache.hadoop.hbase.wal.WALProvider.Writer;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040
041/**
042 * Entry point for users of the Write Ahead Log.
043 * Acts as the shim between internal use and the particular WALProvider we use to handle wal
044 * requests.
045 *
046 * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available
047 * implementations:
048 * <ul>
049 *   <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
050 *                                  "asyncfs"</li>
051 *   <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
052 *                             FileSystem interface via an asynchronous client.</li>
053 *   <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
054 *                             FileSystem interface via HDFS's synchronous DFSClient.</li>
055 *   <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
056 *                           server.</li>
057 * </ul>
058 *
059 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
060 */
061@InterfaceAudience.Private
062public class WALFactory {
063
064  private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
065
066  /**
067   * Maps between configuration names for providers and implementation classes.
068   */
069  static enum Providers {
070    defaultProvider(AsyncFSWALProvider.class),
071    filesystem(FSHLogProvider.class),
072    multiwal(RegionGroupingProvider.class),
073    asyncfs(AsyncFSWALProvider.class);
074
075    final Class<? extends WALProvider> clazz;
076    Providers(Class<? extends WALProvider> clazz) {
077      this.clazz = clazz;
078    }
079  }
080
081  public static final String WAL_PROVIDER = "hbase.wal.provider";
082  static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
083
084  public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
085
086  final String factoryId;
087  private final WALProvider provider;
088  // The meta updates are written to a different wal. If this
089  // regionserver holds meta regions, then this ref will be non-null.
090  // lazily intialized; most RegionServers don't deal with META
091  private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
092
093  /**
094   * Configuration-specified WAL Reader used when a custom reader is requested
095   */
096  private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass;
097
098  /**
099   * How long to attempt opening in-recovery wals
100   */
101  private final int timeoutMillis;
102
103  private final Configuration conf;
104
105  // Used for the singleton WALFactory, see below.
106  private WALFactory(Configuration conf) {
107    // this code is duplicated here so we can keep our members final.
108    // until we've moved reader/writer construction down into providers, this initialization must
109    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
110    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
111    /* TODO Both of these are probably specific to the fs wal provider */
112    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
113      AbstractFSWALProvider.Reader.class);
114    this.conf = conf;
115    // end required early initialization
116
117    // this instance can't create wals, just reader/writers.
118    provider = null;
119    factoryId = SINGLETON_ID;
120  }
121
122  @VisibleForTesting
123  Providers getDefaultProvider() {
124    return Providers.defaultProvider;
125  }
126
127  @VisibleForTesting
128  public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
129    try {
130      Providers provider = Providers.valueOf(conf.get(key, defaultValue));
131
132      // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
133      // the default and we can't use it, we want to fall back to FSHLog which we know works on
134      // all versions.
135      if (provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
136          && !AsyncFSWALProvider.load()) {
137        // AsyncFSWAL has better performance in most cases, and also uses less resources, we will
138        // try to use it if possible. It deeply hacks into the internal of DFSClient so will be
139        // easily broken when upgrading hadoop.
140        LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
141        return FSHLogProvider.class;
142      }
143
144      // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
145      // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
146      // not fall back to FSHLogProvider.
147      return provider.clazz;
148    } catch (IllegalArgumentException exception) {
149      // Fall back to them specifying a class name
150      // Note that the passed default class shouldn't actually be used, since the above only fails
151      // when there is a config value present.
152      return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
153    }
154  }
155
156  WALProvider createProvider(Class<? extends WALProvider> clazz, String providerId)
157      throws IOException {
158    LOG.info("Instantiating WALProvider of type " + clazz);
159    try {
160      final WALProvider result = clazz.getDeclaredConstructor().newInstance();
161      result.init(this, conf, providerId);
162      return result;
163    } catch (Exception e) {
164      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
165      LOG.debug("Exception details for failure to load WALProvider.", e);
166      throw new IOException("couldn't set up WALProvider", e);
167    }
168  }
169
170  /**
171   * instantiate a provider from a config property. requires conf to have already been set (as well
172   * as anything the provider might need to read).
173   */
174  WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException {
175    Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue);
176    WALProvider provider = createProvider(clazz, providerId);
177    provider.addWALActionsListener(new MetricsWAL());
178    return provider;
179  }
180
181  /**
182   * @param conf must not be null, will keep a reference to read params in later reader/writer
183   *          instances.
184   * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
185   *          to make a directory
186   */
187  public WALFactory(Configuration conf, String factoryId) throws IOException {
188    // until we've moved reader/writer construction down into providers, this initialization must
189    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
190    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
191    /* TODO Both of these are probably specific to the fs wal provider */
192    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
193      AbstractFSWALProvider.Reader.class);
194    this.conf = conf;
195    this.factoryId = factoryId;
196    // end required early initialization
197    if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
198      provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
199    } else {
200      // special handling of existing configuration behavior.
201      LOG.warn("Running with WAL disabled.");
202      provider = new DisabledWALProvider();
203      provider.init(this, conf, factoryId);
204    }
205  }
206
207  /**
208   * Shutdown all WALs and clean up any underlying storage.
209   * Use only when you will not need to replay and edits that have gone to any wals from this
210   * factory.
211   */
212  public void close() throws IOException {
213    final WALProvider metaProvider = this.metaProvider.get();
214    if (null != metaProvider) {
215      metaProvider.close();
216    }
217    // close is called on a WALFactory with null provider in the case of contention handling
218    // within the getInstance method.
219    if (null != provider) {
220      provider.close();
221    }
222  }
223
224  /**
225   * Tell the underlying WAL providers to shut down, but do not clean up underlying storage.
226   * If you are not ending cleanly and will need to replay edits from this factory's wals,
227   * use this method if you can as it will try to leave things as tidy as possible.
228   */
229  public void shutdown() throws IOException {
230    IOException exception = null;
231    final WALProvider metaProvider = this.metaProvider.get();
232    if (null != metaProvider) {
233      try {
234        metaProvider.shutdown();
235      } catch(IOException ioe) {
236        exception = ioe;
237      }
238    }
239    provider.shutdown();
240    if (null != exception) {
241      throw exception;
242    }
243  }
244
245  public List<WAL> getWALs() {
246    return provider.getWALs();
247  }
248
249  @VisibleForTesting
250  WALProvider getMetaProvider() throws IOException {
251    for (;;) {
252      WALProvider provider = this.metaProvider.get();
253      if (provider != null) {
254        return provider;
255      }
256      Class<? extends WALProvider> clz = null;
257      if (conf.get(META_WAL_PROVIDER) == null) {
258        try {
259          clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
260        } catch (Throwable t) {
261          // the WAL provider should be an enum. Proceed
262        }
263      }
264      if (clz == null){
265        clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
266      }
267      provider = createProvider(clz, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
268      if (metaProvider.compareAndSet(null, provider)) {
269        return provider;
270      } else {
271        // someone is ahead of us, close and try again.
272        provider.close();
273      }
274    }
275  }
276
277  /**
278   * @param region the region which we want to get a WAL for it. Could be null.
279   */
280  public WAL getWAL(RegionInfo region) throws IOException {
281    // use different WAL for hbase:meta
282    if (region != null && region.isMetaRegion() &&
283      region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
284      return getMetaProvider().getWAL(region);
285    } else {
286      return provider.getWAL(region);
287    }
288  }
289
290  public Reader createReader(final FileSystem fs, final Path path) throws IOException {
291    return createReader(fs, path, (CancelableProgressable)null);
292  }
293
294  /**
295   * Create a reader for the WAL. If you are reading from a file that's being written to and need
296   * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
297   * then just seek back to the last known good position.
298   * @return A WAL reader.  Close when done with it.
299   * @throws IOException
300   */
301  public Reader createReader(final FileSystem fs, final Path path,
302      CancelableProgressable reporter) throws IOException {
303    return createReader(fs, path, reporter, true);
304  }
305
306  public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter,
307      boolean allowCustom) throws IOException {
308    Class<? extends AbstractFSWALProvider.Reader> lrClass =
309        allowCustom ? logReaderClass : ProtobufLogReader.class;
310    try {
311      // A wal file could be under recovery, so it may take several
312      // tries to get it open. Instead of claiming it is corrupted, retry
313      // to open it up to 5 minutes by default.
314      long startWaiting = EnvironmentEdgeManager.currentTime();
315      long openTimeout = timeoutMillis + startWaiting;
316      int nbAttempt = 0;
317      AbstractFSWALProvider.Reader reader = null;
318      while (true) {
319        try {
320          reader = lrClass.getDeclaredConstructor().newInstance();
321          reader.init(fs, path, conf, null);
322          return reader;
323        } catch (IOException e) {
324          if (reader != null) {
325            try {
326              reader.close();
327            } catch (IOException exception) {
328              LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
329              LOG.debug("exception details", exception);
330            }
331          }
332
333          String msg = e.getMessage();
334          if (msg != null
335              && (msg.contains("Cannot obtain block length")
336                  || msg.contains("Could not obtain the last block") || msg
337                    .matches("Blocklist for [^ ]* has changed.*"))) {
338            if (++nbAttempt == 1) {
339              LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
340            }
341            if (reporter != null && !reporter.progress()) {
342              throw new InterruptedIOException("Operation is cancelled");
343            }
344            if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
345              LOG.error("Can't open after " + nbAttempt + " attempts and "
346                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
347            } else {
348              try {
349                Thread.sleep(nbAttempt < 3 ? 500 : 1000);
350                continue; // retry
351              } catch (InterruptedException ie) {
352                InterruptedIOException iioe = new InterruptedIOException();
353                iioe.initCause(ie);
354                throw iioe;
355              }
356            }
357            throw new LeaseNotRecoveredException(e);
358          } else {
359            throw e;
360          }
361        }
362      }
363    } catch (IOException ie) {
364      throw ie;
365    } catch (Exception e) {
366      throw new IOException("Cannot get log reader", e);
367    }
368  }
369
370  /**
371   * Create a writer for the WAL.
372   * Uses defaults.
373   * <p>
374   * Should be package-private. public only for tests and
375   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
376   * @return A WAL writer. Close when done with it.
377   */
378  public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
379    return FSHLogProvider.createWriter(conf, fs, path, false);
380  }
381
382  /**
383   * Should be package-private, visible for recovery testing.
384   * Uses defaults.
385   * @return an overwritable writer for recovered edits. caller should close.
386   */
387  @VisibleForTesting
388  public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
389      throws IOException {
390    return FSHLogProvider.createWriter(conf, fs, path, true);
391  }
392
393  // These static methods are currently used where it's impractical to
394  // untangle the reliance on state in the filesystem. They rely on singleton
395  // WALFactory that just provides Reader / Writers.
396  // For now, first Configuration object wins. Practically this just impacts the reader/writer class
397  private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
398  private static final String SINGLETON_ID = WALFactory.class.getName();
399  
400  // Public only for FSHLog
401  public static WALFactory getInstance(Configuration configuration) {
402    WALFactory factory = singleton.get();
403    if (null == factory) {
404      WALFactory temp = new WALFactory(configuration);
405      if (singleton.compareAndSet(null, temp)) {
406        factory = temp;
407      } else {
408        // someone else beat us to initializing
409        try {
410          temp.close();
411        } catch (IOException exception) {
412          LOG.debug("failed to close temporary singleton. ignoring.", exception);
413        }
414        factory = singleton.get();
415      }
416    }
417    return factory;
418  }
419
420  /**
421   * Create a reader for the given path, accept custom reader classes from conf.
422   * If you already have a WALFactory, you should favor the instance method.
423   * @return a WAL Reader, caller must close.
424   */
425  public static Reader createReader(final FileSystem fs, final Path path,
426      final Configuration configuration) throws IOException {
427    return getInstance(configuration).createReader(fs, path);
428  }
429
430  /**
431   * Create a reader for the given path, accept custom reader classes from conf.
432   * If you already have a WALFactory, you should favor the instance method.
433   * @return a WAL Reader, caller must close.
434   */
435  static Reader createReader(final FileSystem fs, final Path path,
436      final Configuration configuration, final CancelableProgressable reporter) throws IOException {
437    return getInstance(configuration).createReader(fs, path, reporter);
438  }
439
440  /**
441   * Create a reader for the given path, ignore custom reader classes from conf.
442   * If you already have a WALFactory, you should favor the instance method.
443   * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
444   * @return a WAL Reader, caller must close.
445   */
446  public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
447      final Configuration configuration) throws IOException {
448    return getInstance(configuration).createReader(fs, path, null, false);
449  }
450
451  /**
452   * If you already have a WALFactory, you should favor the instance method.
453   * Uses defaults.
454   * @return a Writer that will overwrite files. Caller must close.
455   */
456  static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
457      final Configuration configuration)
458      throws IOException {
459    return FSHLogProvider.createWriter(configuration, fs, path, true);
460  }
461
462  /**
463   * If you already have a WALFactory, you should favor the instance method.
464   * Uses defaults.
465   * @return a writer that won't overwrite files. Caller must close.
466   */
467  @VisibleForTesting
468  public static Writer createWALWriter(final FileSystem fs, final Path path,
469      final Configuration configuration)
470      throws IOException {
471    return FSHLogProvider.createWriter(configuration, fs, path, false);
472  }
473
474  @VisibleForTesting
475  public String getFactoryId() {
476    return factoryId;
477  }
478
479  public final WALProvider getWALProvider() {
480    return this.provider;
481  }
482
483  public final WALProvider getMetaWALProvider() {
484    return this.metaProvider.get();
485  }
486}