001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicReference;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
031import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
032import org.apache.hadoop.hbase.util.CancelableProgressable;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
035import org.apache.hadoop.hbase.wal.WAL.Reader;
036import org.apache.hadoop.hbase.wal.WALProvider.Writer;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Entry point for users of the Write Ahead Log.
043 * Acts as the shim between internal use and the particular WALProvider we use to handle wal
044 * requests.
045 *
046 * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available
047 * implementations:
048 * <ul>
049 *   <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
050 *                                  "asyncfs"</li>
051 *   <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
052 *                             FileSystem interface via an asynchronous client.</li>
053 *   <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
054 *                             FileSystem interface via HDFS's synchronous DFSClient.</li>
055 *   <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
056 *                           server.</li>
057 * </ul>
058 *
059 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
060 */
061@InterfaceAudience.Private
062public class WALFactory {
063
064  private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
065
066  /**
067   * Maps between configuration names for providers and implementation classes.
068   */
069  enum Providers {
070    defaultProvider(AsyncFSWALProvider.class),
071    filesystem(FSHLogProvider.class),
072    multiwal(RegionGroupingProvider.class),
073    asyncfs(AsyncFSWALProvider.class);
074
075    final Class<? extends WALProvider> clazz;
076    Providers(Class<? extends WALProvider> clazz) {
077      this.clazz = clazz;
078    }
079  }
080
081  public static final String WAL_PROVIDER = "hbase.wal.provider";
082  static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
083
084  public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
085
086  public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
087
088  final String factoryId;
089  final Abortable abortable;
090  private final WALProvider provider;
091  // The meta updates are written to a different wal. If this
092  // regionserver holds meta regions, then this ref will be non-null.
093  // lazily intialized; most RegionServers don't deal with META
094  private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
095
096  /**
097   * Configuration-specified WAL Reader used when a custom reader is requested
098   */
099  private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass;
100
101  /**
102   * How long to attempt opening in-recovery wals
103   */
104  private final int timeoutMillis;
105
106  private final Configuration conf;
107
108  // Used for the singleton WALFactory, see below.
109  private WALFactory(Configuration conf) {
110    // this code is duplicated here so we can keep our members final.
111    // until we've moved reader/writer construction down into providers, this initialization must
112    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
113    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
114    /* TODO Both of these are probably specific to the fs wal provider */
115    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
116      AbstractFSWALProvider.Reader.class);
117    this.conf = conf;
118    // end required early initialization
119
120    // this instance can't create wals, just reader/writers.
121    provider = null;
122    factoryId = SINGLETON_ID;
123    this.abortable = null;
124  }
125
126  Providers getDefaultProvider() {
127    return Providers.defaultProvider;
128  }
129
130  public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
131    try {
132      Providers provider = Providers.valueOf(conf.get(key, defaultValue));
133
134      // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
135      // the default and we can't use it, we want to fall back to FSHLog which we know works on
136      // all versions.
137      if (provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
138          && !AsyncFSWALProvider.load()) {
139        // AsyncFSWAL has better performance in most cases, and also uses less resources, we will
140        // try to use it if possible. It deeply hacks into the internal of DFSClient so will be
141        // easily broken when upgrading hadoop.
142        LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
143        return FSHLogProvider.class;
144      }
145
146      // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
147      // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
148      // not fall back to FSHLogProvider.
149      return provider.clazz;
150    } catch (IllegalArgumentException exception) {
151      // Fall back to them specifying a class name
152      // Note that the passed default class shouldn't actually be used, since the above only fails
153      // when there is a config value present.
154      return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
155    }
156  }
157
158  static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
159    LOG.info("Instantiating WALProvider of type {}", clazz);
160    try {
161      return clazz.getDeclaredConstructor().newInstance();
162    } catch (Exception e) {
163      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
164      LOG.debug("Exception details for failure to load WALProvider.", e);
165      throw new IOException("couldn't set up WALProvider", e);
166    }
167  }
168
169  /**
170   * @param conf must not be null, will keep a reference to read params in later reader/writer
171   *          instances.
172   * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
173   *          to make a directory
174   */
175  public WALFactory(Configuration conf, String factoryId) throws IOException {
176    // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
177    // for HMaster or HRegionServer which take system table only. See HBASE-19999
178    this(conf, factoryId, null, true);
179  }
180
181  /**
182   * @param conf must not be null, will keep a reference to read params in later reader/writer
183   *          instances.
184   * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
185   *          to make a directory
186   * @param abortable the server associated with this WAL file
187   * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
188   *          {@link SyncReplicationWALProvider}
189   */
190  public WALFactory(Configuration conf, String factoryId, Abortable abortable,
191      boolean enableSyncReplicationWALProvider) throws IOException {
192    // until we've moved reader/writer construction down into providers, this initialization must
193    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
194    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
195    /* TODO Both of these are probably specific to the fs wal provider */
196    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
197      AbstractFSWALProvider.Reader.class);
198    this.conf = conf;
199    this.factoryId = factoryId;
200    this.abortable = abortable;
201    // end required early initialization
202    if (conf.getBoolean(WAL_ENABLED, true)) {
203      WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
204      if (enableSyncReplicationWALProvider) {
205        provider = new SyncReplicationWALProvider(provider);
206      }
207      provider.init(this, conf, null, this.abortable);
208      provider.addWALActionsListener(new MetricsWAL());
209      this.provider = provider;
210    } else {
211      // special handling of existing configuration behavior.
212      LOG.warn("Running with WAL disabled.");
213      provider = new DisabledWALProvider();
214      provider.init(this, conf, factoryId, null);
215    }
216  }
217
218  /**
219   * Shutdown all WALs and clean up any underlying storage.
220   * Use only when you will not need to replay and edits that have gone to any wals from this
221   * factory.
222   */
223  public void close() throws IOException {
224    final WALProvider metaProvider = this.metaProvider.get();
225    if (null != metaProvider) {
226      metaProvider.close();
227    }
228    // close is called on a WALFactory with null provider in the case of contention handling
229    // within the getInstance method.
230    if (null != provider) {
231      provider.close();
232    }
233  }
234
235  /**
236   * Tell the underlying WAL providers to shut down, but do not clean up underlying storage.
237   * If you are not ending cleanly and will need to replay edits from this factory's wals,
238   * use this method if you can as it will try to leave things as tidy as possible.
239   */
240  public void shutdown() throws IOException {
241    IOException exception = null;
242    final WALProvider metaProvider = this.metaProvider.get();
243    if (null != metaProvider) {
244      try {
245        metaProvider.shutdown();
246      } catch(IOException ioe) {
247        exception = ioe;
248      }
249    }
250    provider.shutdown();
251    if (null != exception) {
252      throw exception;
253    }
254  }
255
256  public List<WAL> getWALs() {
257    return provider.getWALs();
258  }
259
260  /**
261   * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
262   * creating the first hbase:meta WAL so we can register a listener.
263   * @see #getMetaWALProvider()
264   */
265  public WALProvider getMetaProvider() throws IOException {
266    for (;;) {
267      WALProvider provider = this.metaProvider.get();
268      if (provider != null) {
269        return provider;
270      }
271      Class<? extends WALProvider> clz = null;
272      if (conf.get(META_WAL_PROVIDER) == null) {
273        try {
274          clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
275        } catch (Throwable t) {
276          // the WAL provider should be an enum. Proceed
277        }
278      } 
279      if (clz == null){
280        clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
281      }
282      provider = createProvider(clz);
283      provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
284      provider.addWALActionsListener(new MetricsWAL());
285      if (metaProvider.compareAndSet(null, provider)) {
286        return provider;
287      } else {
288        // someone is ahead of us, close and try again.
289        provider.close();
290      }
291    }
292  }
293
294  /**
295   * @param region the region which we want to get a WAL for. Could be null.
296   */
297  public WAL getWAL(RegionInfo region) throws IOException {
298    // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
299    if (region != null && region.isMetaRegion() &&
300      region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
301      return getMetaProvider().getWAL(region);
302    } else {
303      return provider.getWAL(region);
304    }
305  }
306
307  public Reader createReader(final FileSystem fs, final Path path) throws IOException {
308    return createReader(fs, path, (CancelableProgressable)null);
309  }
310
311  /**
312   * Create a reader for the WAL. If you are reading from a file that's being written to and need
313   * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
314   * then just seek back to the last known good position.
315   * @return A WAL reader.  Close when done with it.
316   */
317  public Reader createReader(final FileSystem fs, final Path path,
318      CancelableProgressable reporter) throws IOException {
319    return createReader(fs, path, reporter, true);
320  }
321
322  public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter,
323      boolean allowCustom) throws IOException {
324    Class<? extends AbstractFSWALProvider.Reader> lrClass =
325        allowCustom ? logReaderClass : ProtobufLogReader.class;
326    try {
327      // A wal file could be under recovery, so it may take several
328      // tries to get it open. Instead of claiming it is corrupted, retry
329      // to open it up to 5 minutes by default.
330      long startWaiting = EnvironmentEdgeManager.currentTime();
331      long openTimeout = timeoutMillis + startWaiting;
332      int nbAttempt = 0;
333      AbstractFSWALProvider.Reader reader = null;
334      while (true) {
335        try {
336          reader = lrClass.getDeclaredConstructor().newInstance();
337          reader.init(fs, path, conf, null);
338          return reader;
339        } catch (Exception e) {
340          // catch Exception so that we close reader for all exceptions. If we don't
341          // close the reader, we leak a socket.
342          if (reader != null) {
343            try {
344              reader.close();
345            } catch (IOException exception) {
346              LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
347              LOG.debug("exception details", exception);
348            }
349          }
350
351          // Only inspect the Exception to consider retry when it's an IOException
352          if (e instanceof IOException) {
353            String msg = e.getMessage();
354            if (msg != null
355                && (msg.contains("Cannot obtain block length")
356                    || msg.contains("Could not obtain the last block") || msg
357                      .matches("Blocklist for [^ ]* has changed.*"))) {
358              if (++nbAttempt == 1) {
359                LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
360              }
361              if (reporter != null && !reporter.progress()) {
362                throw new InterruptedIOException("Operation is cancelled");
363              }
364              if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
365                LOG.error("Can't open after " + nbAttempt + " attempts and "
366                    + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
367              } else {
368                try {
369                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
370                  continue; // retry
371                } catch (InterruptedException ie) {
372                  InterruptedIOException iioe = new InterruptedIOException();
373                  iioe.initCause(ie);
374                  throw iioe;
375                }
376              }
377              throw new LeaseNotRecoveredException(e);
378            }
379          }
380
381          // Rethrow the original exception if we are not retrying due to HDFS-isms.
382          throw e;
383        }
384      }
385    } catch (IOException ie) {
386      throw ie;
387    } catch (Exception e) {
388      throw new IOException("Cannot get log reader", e);
389    }
390  }
391
392  /**
393   * Create a writer for the WAL.
394   * Uses defaults.
395   * <p>
396   * Should be package-private. public only for tests and
397   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
398   * @return A WAL writer. Close when done with it.
399   */
400  public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
401    return FSHLogProvider.createWriter(conf, fs, path, false);
402  }
403
404  /**
405   * Should be package-private, visible for recovery testing.
406   * Uses defaults.
407   * @return an overwritable writer for recovered edits. caller should close.
408   */
409  public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
410      throws IOException {
411    return FSHLogProvider.createWriter(conf, fs, path, true);
412  }
413
414  // These static methods are currently used where it's impractical to
415  // untangle the reliance on state in the filesystem. They rely on singleton
416  // WALFactory that just provides Reader / Writers.
417  // For now, first Configuration object wins. Practically this just impacts the reader/writer class
418  private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
419  private static final String SINGLETON_ID = WALFactory.class.getName();
420  
421  // Public only for FSHLog
422  public static WALFactory getInstance(Configuration configuration) {
423    WALFactory factory = singleton.get();
424    if (null == factory) {
425      WALFactory temp = new WALFactory(configuration);
426      if (singleton.compareAndSet(null, temp)) {
427        factory = temp;
428      } else {
429        // someone else beat us to initializing
430        try {
431          temp.close();
432        } catch (IOException exception) {
433          LOG.debug("failed to close temporary singleton. ignoring.", exception);
434        }
435        factory = singleton.get();
436      }
437    }
438    return factory;
439  }
440
441  /**
442   * Create a reader for the given path, accept custom reader classes from conf.
443   * If you already have a WALFactory, you should favor the instance method.
444   * @return a WAL Reader, caller must close.
445   */
446  public static Reader createReader(final FileSystem fs, final Path path,
447      final Configuration configuration) throws IOException {
448    return getInstance(configuration).createReader(fs, path);
449  }
450
451  /**
452   * Create a reader for the given path, accept custom reader classes from conf.
453   * If you already have a WALFactory, you should favor the instance method.
454   * @return a WAL Reader, caller must close.
455   */
456  static Reader createReader(final FileSystem fs, final Path path,
457      final Configuration configuration, final CancelableProgressable reporter) throws IOException {
458    return getInstance(configuration).createReader(fs, path, reporter);
459  }
460
461  /**
462   * Create a reader for the given path, ignore custom reader classes from conf.
463   * If you already have a WALFactory, you should favor the instance method.
464   * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
465   * @return a WAL Reader, caller must close.
466   */
467  public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
468      final Configuration configuration) throws IOException {
469    return getInstance(configuration).createReader(fs, path, null, false);
470  }
471
472  /**
473   * If you already have a WALFactory, you should favor the instance method.
474   * Uses defaults.
475   * @return a Writer that will overwrite files. Caller must close.
476   */
477  static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
478      final Configuration configuration)
479      throws IOException {
480    return FSHLogProvider.createWriter(configuration, fs, path, true);
481  }
482
483  /**
484   * If you already have a WALFactory, you should favor the instance method.
485   * Uses defaults.
486   * @return a writer that won't overwrite files. Caller must close.
487   */
488  public static Writer createWALWriter(final FileSystem fs, final Path path,
489      final Configuration configuration)
490      throws IOException {
491    return FSHLogProvider.createWriter(configuration, fs, path, false);
492  }
493
494  public final WALProvider getWALProvider() {
495    return this.provider;
496  }
497
498  /**
499   * @return Current metaProvider... may be null if not yet initialized.
500   * @see #getMetaProvider()
501   */
502  public final WALProvider getMetaWALProvider() {
503    return this.metaProvider.get();
504  }
505}