001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicReference;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
030import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
031import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
032import org.apache.hadoop.hbase.util.CancelableProgressable;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
035import org.apache.hadoop.hbase.wal.WAL.Reader;
036import org.apache.hadoop.hbase.wal.WALProvider.Writer;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
043 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the
044 * configuration setting "hbase.wal.provider". Available implementations:
045 * <ul>
046 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
047 * "asyncfs"</li>
048 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
049 * FileSystem interface via an asynchronous client.</li>
050 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
051 * FileSystem interface via HDFS's synchronous DFSClient.</li>
052 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
053 * server.</li>
054 * </ul>
055 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
056 */
057@InterfaceAudience.Private
058public class WALFactory {
059
060  private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
061
062  /**
063   * Maps between configuration names for providers and implementation classes.
064   */
065  enum Providers {
066    defaultProvider(AsyncFSWALProvider.class),
067    filesystem(FSHLogProvider.class),
068    multiwal(RegionGroupingProvider.class),
069    asyncfs(AsyncFSWALProvider.class);
070
071    final Class<? extends WALProvider> clazz;
072
073    Providers(Class<? extends WALProvider> clazz) {
074      this.clazz = clazz;
075    }
076  }
077
078  public static final String WAL_PROVIDER = "hbase.wal.provider";
079  static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
080
081  public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
082
083  public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
084
085  final String factoryId;
086  final Abortable abortable;
087  private final WALProvider provider;
088  // The meta updates are written to a different wal. If this
089  // regionserver holds meta regions, then this ref will be non-null.
090  // lazily intialized; most RegionServers don't deal with META
091  private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
092
093  /**
094   * Configuration-specified WAL Reader used when a custom reader is requested
095   */
096  private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass;
097
098  /**
099   * How long to attempt opening in-recovery wals
100   */
101  private final int timeoutMillis;
102
103  private final Configuration conf;
104
105  private final ExcludeDatanodeManager excludeDatanodeManager;
106
107  // Used for the singleton WALFactory, see below.
108  private WALFactory(Configuration conf) {
109    // this code is duplicated here so we can keep our members final.
110    // until we've moved reader/writer construction down into providers, this initialization must
111    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
112    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
113    /* TODO Both of these are probably specific to the fs wal provider */
114    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
115      AbstractFSWALProvider.Reader.class);
116    this.conf = conf;
117    // end required early initialization
118
119    // this instance can't create wals, just reader/writers.
120    provider = null;
121    factoryId = SINGLETON_ID;
122    this.abortable = null;
123    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
124  }
125
126  Providers getDefaultProvider() {
127    return Providers.defaultProvider;
128  }
129
130  public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
131    try {
132      Providers provider = Providers.valueOf(conf.get(key, defaultValue));
133
134      // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
135      // the default and we can't use it, we want to fall back to FSHLog which we know works on
136      // all versions.
137      if (
138        provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
139          && !AsyncFSWALProvider.load()
140      ) {
141        // AsyncFSWAL has better performance in most cases, and also uses less resources, we will
142        // try to use it if possible. It deeply hacks into the internal of DFSClient so will be
143        // easily broken when upgrading hadoop.
144        LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
145        return FSHLogProvider.class;
146      }
147
148      // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
149      // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
150      // not fall back to FSHLogProvider.
151      return provider.clazz;
152    } catch (IllegalArgumentException exception) {
153      // Fall back to them specifying a class name
154      // Note that the passed default class shouldn't actually be used, since the above only fails
155      // when there is a config value present.
156      return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
157    }
158  }
159
160  static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
161    LOG.info("Instantiating WALProvider of type {}", clazz);
162    try {
163      return clazz.getDeclaredConstructor().newInstance();
164    } catch (Exception e) {
165      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
166      LOG.debug("Exception details for failure to load WALProvider.", e);
167      throw new IOException("couldn't set up WALProvider", e);
168    }
169  }
170
171  /**
172   * @param conf      must not be null, will keep a reference to read params in later reader/writer
173   *                  instances.
174   * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
175   *                  to make a directory
176   */
177  public WALFactory(Configuration conf, String factoryId) throws IOException {
178    // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
179    // for HMaster or HRegionServer which take system table only. See HBASE-19999
180    this(conf, factoryId, null, true);
181  }
182
183  /**
184   * @param conf                             must not be null, will keep a reference to read params
185   *                                         in later reader/writer instances.
186   * @param factoryId                        a unique identifier for this factory. used i.e. by
187   *                                         filesystem implementations to make a directory
188   * @param abortable                        the server associated with this WAL file
189   * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
190   *                                         {@link SyncReplicationWALProvider}
191   */
192  public WALFactory(Configuration conf, String factoryId, Abortable abortable,
193    boolean enableSyncReplicationWALProvider) throws IOException {
194    // until we've moved reader/writer construction down into providers, this initialization must
195    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
196    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
197    /* TODO Both of these are probably specific to the fs wal provider */
198    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
199      AbstractFSWALProvider.Reader.class);
200    this.conf = conf;
201    this.factoryId = factoryId;
202    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
203    this.abortable = abortable;
204    // end required early initialization
205    if (conf.getBoolean(WAL_ENABLED, true)) {
206      WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
207      if (enableSyncReplicationWALProvider) {
208        provider = new SyncReplicationWALProvider(provider);
209      }
210      provider.init(this, conf, null, this.abortable);
211      provider.addWALActionsListener(new MetricsWAL());
212      this.provider = provider;
213    } else {
214      // special handling of existing configuration behavior.
215      LOG.warn("Running with WAL disabled.");
216      provider = new DisabledWALProvider();
217      provider.init(this, conf, factoryId, null);
218    }
219  }
220
221  /**
222   * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
223   * replay and edits that have gone to any wals from this factory.
224   */
225  public void close() throws IOException {
226    final WALProvider metaProvider = this.metaProvider.get();
227    if (null != metaProvider) {
228      metaProvider.close();
229    }
230    // close is called on a WALFactory with null provider in the case of contention handling
231    // within the getInstance method.
232    if (null != provider) {
233      provider.close();
234    }
235  }
236
237  /**
238   * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you
239   * are not ending cleanly and will need to replay edits from this factory's wals, use this method
240   * if you can as it will try to leave things as tidy as possible.
241   */
242  public void shutdown() throws IOException {
243    IOException exception = null;
244    final WALProvider metaProvider = this.metaProvider.get();
245    if (null != metaProvider) {
246      try {
247        metaProvider.shutdown();
248      } catch (IOException ioe) {
249        exception = ioe;
250      }
251    }
252    provider.shutdown();
253    if (null != exception) {
254      throw exception;
255    }
256  }
257
258  public List<WAL> getWALs() {
259    return provider.getWALs();
260  }
261
262  /**
263   * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
264   * creating the first hbase:meta WAL so we can register a listener.
265   * @see #getMetaWALProvider()
266   */
267  public WALProvider getMetaProvider() throws IOException {
268    for (;;) {
269      WALProvider provider = this.metaProvider.get();
270      if (provider != null) {
271        return provider;
272      }
273      Class<? extends WALProvider> clz = null;
274      if (conf.get(META_WAL_PROVIDER) == null) {
275        try {
276          clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
277        } catch (Throwable t) {
278          // the WAL provider should be an enum. Proceed
279        }
280      }
281      if (clz == null) {
282        clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
283      }
284      provider = createProvider(clz);
285      provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
286      provider.addWALActionsListener(new MetricsWAL());
287      if (metaProvider.compareAndSet(null, provider)) {
288        return provider;
289      } else {
290        // someone is ahead of us, close and try again.
291        provider.close();
292      }
293    }
294  }
295
296  /**
297   * @param region the region which we want to get a WAL for. Could be null.
298   */
299  public WAL getWAL(RegionInfo region) throws IOException {
300    // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
301    if (
302      region != null && region.isMetaRegion()
303        && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID
304    ) {
305      return getMetaProvider().getWAL(region);
306    } else {
307      return provider.getWAL(region);
308    }
309  }
310
311  public Reader createReader(final FileSystem fs, final Path path) throws IOException {
312    return createReader(fs, path, (CancelableProgressable) null);
313  }
314
315  /**
316   * Create a reader for the WAL. If you are reading from a file that's being written to and need to
317   * reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method then just seek
318   * back to the last known good position.
319   * @return A WAL reader. Close when done with it.
320   */
321  public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter)
322    throws IOException {
323    return createReader(fs, path, reporter, true);
324  }
325
326  public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter,
327    boolean allowCustom) throws IOException {
328    Class<? extends AbstractFSWALProvider.Reader> lrClass =
329      allowCustom ? logReaderClass : ProtobufLogReader.class;
330    try {
331      // A wal file could be under recovery, so it may take several
332      // tries to get it open. Instead of claiming it is corrupted, retry
333      // to open it up to 5 minutes by default.
334      long startWaiting = EnvironmentEdgeManager.currentTime();
335      long openTimeout = timeoutMillis + startWaiting;
336      int nbAttempt = 0;
337      AbstractFSWALProvider.Reader reader = null;
338      while (true) {
339        try {
340          reader = lrClass.getDeclaredConstructor().newInstance();
341          reader.init(fs, path, conf, null);
342          return reader;
343        } catch (Exception e) {
344          // catch Exception so that we close reader for all exceptions. If we don't
345          // close the reader, we leak a socket.
346          if (reader != null) {
347            try {
348              reader.close();
349            } catch (IOException exception) {
350              LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
351              LOG.debug("exception details", exception);
352            }
353          }
354
355          // Only inspect the Exception to consider retry when it's an IOException
356          if (e instanceof IOException) {
357            String msg = e.getMessage();
358            if (
359              msg != null && (msg.contains("Cannot obtain block length")
360                || msg.contains("Could not obtain the last block")
361                || msg.matches("Blocklist for [^ ]* has changed.*"))
362            ) {
363              if (++nbAttempt == 1) {
364                LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
365              }
366              if (reporter != null && !reporter.progress()) {
367                throw new InterruptedIOException("Operation is cancelled");
368              }
369              if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
370                LOG.error("Can't open after " + nbAttempt + " attempts and "
371                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
372              } else {
373                try {
374                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
375                  continue; // retry
376                } catch (InterruptedException ie) {
377                  InterruptedIOException iioe = new InterruptedIOException();
378                  iioe.initCause(ie);
379                  throw iioe;
380                }
381              }
382              throw new LeaseNotRecoveredException(e);
383            }
384          }
385
386          // Rethrow the original exception if we are not retrying due to HDFS-isms.
387          throw e;
388        }
389      }
390    } catch (IOException ie) {
391      throw ie;
392    } catch (Exception e) {
393      throw new IOException("Cannot get log reader", e);
394    }
395  }
396
397  /**
398   * Create a writer for the WAL. Uses defaults.
399   * <p>
400   * Should be package-private. public only for tests and
401   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
402   * @return A WAL writer. Close when done with it.
403   */
404  public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
405    return FSHLogProvider.createWriter(conf, fs, path, false);
406  }
407
408  /**
409   * Should be package-private, visible for recovery testing. Uses defaults.
410   * @return an overwritable writer for recovered edits. caller should close.
411   */
412  public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
413    throws IOException {
414    return FSHLogProvider.createWriter(conf, fs, path, true);
415  }
416
417  // These static methods are currently used where it's impractical to
418  // untangle the reliance on state in the filesystem. They rely on singleton
419  // WALFactory that just provides Reader / Writers.
420  // For now, first Configuration object wins. Practically this just impacts the reader/writer class
421  private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
422  private static final String SINGLETON_ID = WALFactory.class.getName();
423
424  // Public only for FSHLog
425  public static WALFactory getInstance(Configuration configuration) {
426    WALFactory factory = singleton.get();
427    if (null == factory) {
428      WALFactory temp = new WALFactory(configuration);
429      if (singleton.compareAndSet(null, temp)) {
430        factory = temp;
431      } else {
432        // someone else beat us to initializing
433        try {
434          temp.close();
435        } catch (IOException exception) {
436          LOG.debug("failed to close temporary singleton. ignoring.", exception);
437        }
438        factory = singleton.get();
439      }
440    }
441    return factory;
442  }
443
444  /**
445   * Create a reader for the given path, accept custom reader classes from conf. If you already have
446   * a WALFactory, you should favor the instance method.
447   * @return a WAL Reader, caller must close.
448   */
449  public static Reader createReader(final FileSystem fs, final Path path,
450    final Configuration configuration) throws IOException {
451    return getInstance(configuration).createReader(fs, path);
452  }
453
454  /**
455   * Create a reader for the given path, accept custom reader classes from conf. If you already have
456   * a WALFactory, you should favor the instance method.
457   * @return a WAL Reader, caller must close.
458   */
459  static Reader createReader(final FileSystem fs, final Path path,
460    final Configuration configuration, final CancelableProgressable reporter) throws IOException {
461    return getInstance(configuration).createReader(fs, path, reporter);
462  }
463
464  /**
465   * Create a reader for the given path, ignore custom reader classes from conf. If you already have
466   * a WALFactory, you should favor the instance method. only public pending move of
467   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
468   * @return a WAL Reader, caller must close.
469   */
470  public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
471    final Configuration configuration) throws IOException {
472    return getInstance(configuration).createReader(fs, path, null, false);
473  }
474
475  /**
476   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
477   * @return a Writer that will overwrite files. Caller must close.
478   */
479  static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
480    final Configuration configuration) throws IOException {
481    return FSHLogProvider.createWriter(configuration, fs, path, true);
482  }
483
484  /**
485   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
486   * @return a writer that won't overwrite files. Caller must close.
487   */
488  public static Writer createWALWriter(final FileSystem fs, final Path path,
489    final Configuration configuration) throws IOException {
490    return FSHLogProvider.createWriter(configuration, fs, path, false);
491  }
492
493  public final WALProvider getWALProvider() {
494    return this.provider;
495  }
496
497  /**
498   * @return Current metaProvider... may be null if not yet initialized.
499   * @see #getMetaProvider()
500   */
501  public final WALProvider getMetaWALProvider() {
502    return this.metaProvider.get();
503  }
504
505  public ExcludeDatanodeManager getExcludeDatanodeManager() {
506    return excludeDatanodeManager;
507  }
508}