001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicReference;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
031import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
032import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
033import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
034import org.apache.hadoop.hbase.util.CancelableProgressable;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
037import org.apache.hadoop.hbase.wal.WALProvider.Writer;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
043
044/**
045 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
046 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the
047 * configuration setting "hbase.wal.provider". Available implementations:
048 * <ul>
049 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
050 * "asyncfs"</li>
051 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
052 * FileSystem interface via an asynchronous client.</li>
053 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
054 * FileSystem interface via HDFS's synchronous DFSClient.</li>
055 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
056 * server.</li>
057 * </ul>
058 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
059 */
060@InterfaceAudience.Private
061public class WALFactory {
062
063  /**
064   * Used in tests for injecting customized stream reader implementation, for example, inject fault
065   * when reading, etc.
066   * <p/>
067   * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we
068   * will also determine whether the WAL file is encrypted and we should use
069   * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the
070   * header of the WAL file, so we do not need to specify a specical reader to read the WAL file
071   * either.
072   * <p/>
073   * So typically you should not use this config in production.
074   */
075  public static final String WAL_STREAM_READER_CLASS_IMPL =
076    "hbase.regionserver.wal.stream.reader.impl";
077
078  private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
079
080  /**
081   * Maps between configuration names for providers and implementation classes.
082   */
083  enum Providers {
084    defaultProvider(AsyncFSWALProvider.class),
085    filesystem(FSHLogProvider.class),
086    multiwal(RegionGroupingProvider.class),
087    asyncfs(AsyncFSWALProvider.class);
088
089    final Class<? extends WALProvider> clazz;
090
091    Providers(Class<? extends WALProvider> clazz) {
092      this.clazz = clazz;
093    }
094  }
095
096  public static final String WAL_PROVIDER = "hbase.wal.provider";
097  static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
098
099  public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
100
101  public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
102
103  final String factoryId;
104  final Abortable abortable;
105  private final WALProvider provider;
106  // The meta updates are written to a different wal. If this
107  // regionserver holds meta regions, then this ref will be non-null.
108  // lazily intialized; most RegionServers don't deal with META
109  private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
110
111  /**
112   * Configuration-specified WAL Reader used when a custom reader is requested
113   */
114  private final Class<? extends WALStreamReader> walStreamReaderClass;
115
116  /**
117   * How long to attempt opening in-recovery wals
118   */
119  private final int timeoutMillis;
120
121  private final Configuration conf;
122
123  private final ExcludeDatanodeManager excludeDatanodeManager;
124
125  // Used for the singleton WALFactory, see below.
126  private WALFactory(Configuration conf) {
127    // this code is duplicated here so we can keep our members final.
128    // until we've moved reader/writer construction down into providers, this initialization must
129    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
130    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
131    /* TODO Both of these are probably specific to the fs wal provider */
132    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
133      ProtobufWALStreamReader.class, WALStreamReader.class);
134    Preconditions.checkArgument(
135      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
136      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
137      AbstractFSWALProvider.Initializer.class.getName());
138    this.conf = conf;
139    // end required early initialization
140
141    // this instance can't create wals, just reader/writers.
142    provider = null;
143    factoryId = SINGLETON_ID;
144    this.abortable = null;
145    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
146  }
147
148  Providers getDefaultProvider() {
149    return Providers.defaultProvider;
150  }
151
152  public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
153    try {
154      Providers provider = Providers.valueOf(conf.get(key, defaultValue));
155
156      // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
157      // the default and we can't use it, we want to fall back to FSHLog which we know works on
158      // all versions.
159      if (
160        provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
161          && !AsyncFSWALProvider.load()
162      ) {
163        // AsyncFSWAL has better performance in most cases, and also uses less resources, we will
164        // try to use it if possible. It deeply hacks into the internal of DFSClient so will be
165        // easily broken when upgrading hadoop.
166        LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
167        return FSHLogProvider.class;
168      }
169
170      // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
171      // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
172      // not fall back to FSHLogProvider.
173      return provider.clazz;
174    } catch (IllegalArgumentException exception) {
175      // Fall back to them specifying a class name
176      // Note that the passed default class shouldn't actually be used, since the above only fails
177      // when there is a config value present.
178      return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
179    }
180  }
181
182  static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
183    LOG.info("Instantiating WALProvider of type {}", clazz);
184    try {
185      return clazz.getDeclaredConstructor().newInstance();
186    } catch (Exception e) {
187      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
188      LOG.debug("Exception details for failure to load WALProvider.", e);
189      throw new IOException("couldn't set up WALProvider", e);
190    }
191  }
192
193  /**
194   * @param conf      must not be null, will keep a reference to read params in later reader/writer
195   *                  instances.
196   * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
197   *                  to make a directory
198   */
199  public WALFactory(Configuration conf, String factoryId) throws IOException {
200    // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
201    // for HMaster or HRegionServer which take system table only. See HBASE-19999
202    this(conf, factoryId, null, true);
203  }
204
205  /**
206   * @param conf                             must not be null, will keep a reference to read params
207   *                                         in later reader/writer instances.
208   * @param factoryId                        a unique identifier for this factory. used i.e. by
209   *                                         filesystem implementations to make a directory
210   * @param abortable                        the server associated with this WAL file
211   * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
212   *                                         {@link SyncReplicationWALProvider}
213   */
214  public WALFactory(Configuration conf, String factoryId, Abortable abortable,
215    boolean enableSyncReplicationWALProvider) throws IOException {
216    // until we've moved reader/writer construction down into providers, this initialization must
217    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
218    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
219    /* TODO Both of these are probably specific to the fs wal provider */
220    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
221      ProtobufWALStreamReader.class, WALStreamReader.class);
222    Preconditions.checkArgument(
223      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
224      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
225      AbstractFSWALProvider.Initializer.class.getName());
226    this.conf = conf;
227    this.factoryId = factoryId;
228    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
229    this.abortable = abortable;
230    // end required early initialization
231    if (conf.getBoolean(WAL_ENABLED, true)) {
232      WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
233      if (enableSyncReplicationWALProvider) {
234        provider = new SyncReplicationWALProvider(provider);
235      }
236      provider.init(this, conf, null, this.abortable);
237      provider.addWALActionsListener(new MetricsWAL());
238      this.provider = provider;
239    } else {
240      // special handling of existing configuration behavior.
241      LOG.warn("Running with WAL disabled.");
242      provider = new DisabledWALProvider();
243      provider.init(this, conf, factoryId, null);
244    }
245  }
246
247  /**
248   * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
249   * replay and edits that have gone to any wals from this factory.
250   */
251  public void close() throws IOException {
252    final WALProvider metaProvider = this.metaProvider.get();
253    if (null != metaProvider) {
254      metaProvider.close();
255    }
256    // close is called on a WALFactory with null provider in the case of contention handling
257    // within the getInstance method.
258    if (null != provider) {
259      provider.close();
260    }
261  }
262
263  /**
264   * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you
265   * are not ending cleanly and will need to replay edits from this factory's wals, use this method
266   * if you can as it will try to leave things as tidy as possible.
267   */
268  public void shutdown() throws IOException {
269    IOException exception = null;
270    final WALProvider metaProvider = this.metaProvider.get();
271    if (null != metaProvider) {
272      try {
273        metaProvider.shutdown();
274      } catch (IOException ioe) {
275        exception = ioe;
276      }
277    }
278    provider.shutdown();
279    if (null != exception) {
280      throw exception;
281    }
282  }
283
284  public List<WAL> getWALs() {
285    return provider.getWALs();
286  }
287
288  /**
289   * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
290   * creating the first hbase:meta WAL so we can register a listener.
291   * @see #getMetaWALProvider()
292   */
293  public WALProvider getMetaProvider() throws IOException {
294    for (;;) {
295      WALProvider provider = this.metaProvider.get();
296      if (provider != null) {
297        return provider;
298      }
299      Class<? extends WALProvider> clz = null;
300      if (conf.get(META_WAL_PROVIDER) == null) {
301        try {
302          clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
303        } catch (Throwable t) {
304          // the WAL provider should be an enum. Proceed
305        }
306      }
307      if (clz == null) {
308        clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
309      }
310      provider = createProvider(clz);
311      provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
312      provider.addWALActionsListener(new MetricsWAL());
313      if (metaProvider.compareAndSet(null, provider)) {
314        return provider;
315      } else {
316        // someone is ahead of us, close and try again.
317        provider.close();
318      }
319    }
320  }
321
322  /**
323   * @param region the region which we want to get a WAL for. Could be null.
324   */
325  public WAL getWAL(RegionInfo region) throws IOException {
326    // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
327    if (
328      region != null && region.isMetaRegion()
329        && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID
330    ) {
331      return getMetaProvider().getWAL(region);
332    } else {
333      return provider.getWAL(region);
334    }
335  }
336
337  public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
338    return createStreamReader(fs, path, (CancelableProgressable) null);
339  }
340
341  /**
342   * Create a one-way stream reader for the WAL.
343   * @return A WAL reader. Close when done with it.
344   */
345  public WALStreamReader createStreamReader(FileSystem fs, Path path,
346    CancelableProgressable reporter) throws IOException {
347    return createStreamReader(fs, path, reporter, -1);
348  }
349
350  /**
351   * Create a one-way stream reader for the WAL, and start reading from the given
352   * {@code startPosition}.
353   * @return A WAL reader. Close when done with it.
354   */
355  public WALStreamReader createStreamReader(FileSystem fs, Path path,
356    CancelableProgressable reporter, long startPosition) throws IOException {
357    try {
358      // A wal file could be under recovery, so it may take several
359      // tries to get it open. Instead of claiming it is corrupted, retry
360      // to open it up to 5 minutes by default.
361      long startWaiting = EnvironmentEdgeManager.currentTime();
362      long openTimeout = timeoutMillis + startWaiting;
363      int nbAttempt = 0;
364      WALStreamReader reader = null;
365      while (true) {
366        try {
367          reader = walStreamReaderClass.getDeclaredConstructor().newInstance();
368          ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition);
369          return reader;
370        } catch (Exception e) {
371          // catch Exception so that we close reader for all exceptions. If we don't
372          // close the reader, we leak a socket.
373          if (reader != null) {
374            reader.close();
375          }
376
377          // Only inspect the Exception to consider retry when it's an IOException
378          if (e instanceof IOException) {
379            String msg = e.getMessage();
380            if (
381              msg != null && (msg.contains("Cannot obtain block length")
382                || msg.contains("Could not obtain the last block")
383                || msg.matches("Blocklist for [^ ]* has changed.*"))
384            ) {
385              if (++nbAttempt == 1) {
386                LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
387              }
388              if (reporter != null && !reporter.progress()) {
389                throw new InterruptedIOException("Operation is cancelled");
390              }
391              if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
392                LOG.error("Can't open after " + nbAttempt + " attempts and "
393                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
394              } else {
395                try {
396                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
397                  continue; // retry
398                } catch (InterruptedException ie) {
399                  InterruptedIOException iioe = new InterruptedIOException();
400                  iioe.initCause(ie);
401                  throw iioe;
402                }
403              }
404              throw new LeaseNotRecoveredException(e);
405            }
406          }
407
408          // Rethrow the original exception if we are not retrying due to HDFS-isms.
409          throw e;
410        }
411      }
412    } catch (IOException ie) {
413      throw ie;
414    } catch (Exception e) {
415      throw new IOException("Cannot get log reader", e);
416    }
417  }
418
419  /**
420   * Create a writer for the WAL. Uses defaults.
421   * <p>
422   * Should be package-private. public only for tests and
423   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
424   * @return A WAL writer. Close when done with it.
425   */
426  public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
427    return FSHLogProvider.createWriter(conf, fs, path, false);
428  }
429
430  /**
431   * Should be package-private, visible for recovery testing. Uses defaults.
432   * @return an overwritable writer for recovered edits. caller should close.
433   */
434  public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
435    throws IOException {
436    return FSHLogProvider.createWriter(conf, fs, path, true);
437  }
438
439  // These static methods are currently used where it's impractical to
440  // untangle the reliance on state in the filesystem. They rely on singleton
441  // WALFactory that just provides Reader / Writers.
442  // For now, first Configuration object wins. Practically this just impacts the reader/writer class
443  private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
444  private static final String SINGLETON_ID = WALFactory.class.getName();
445
446  // Public only for FSHLog
447  public static WALFactory getInstance(Configuration configuration) {
448    WALFactory factory = singleton.get();
449    if (null == factory) {
450      WALFactory temp = new WALFactory(configuration);
451      if (singleton.compareAndSet(null, temp)) {
452        factory = temp;
453      } else {
454        // someone else beat us to initializing
455        try {
456          temp.close();
457        } catch (IOException exception) {
458          LOG.debug("failed to close temporary singleton. ignoring.", exception);
459        }
460        factory = singleton.get();
461      }
462    }
463    return factory;
464  }
465
466  /**
467   * Create a tailing reader for the given path. Mainly used in replication.
468   */
469  public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf,
470    long startPosition) throws IOException {
471    ProtobufWALTailingReader reader = new ProtobufWALTailingReader();
472    reader.init(fs, path, conf, startPosition);
473    return reader;
474  }
475
476  /**
477   * Create a one-way stream reader for a given path.
478   */
479  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf)
480    throws IOException {
481    return createStreamReader(fs, path, conf, -1);
482  }
483
484  /**
485   * Create a one-way stream reader for a given path.
486   */
487  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf,
488    long startPosition) throws IOException {
489    return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null,
490      startPosition);
491  }
492
493  /**
494   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
495   * @return a Writer that will overwrite files. Caller must close.
496   */
497  static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
498    final Configuration configuration) throws IOException {
499    return FSHLogProvider.createWriter(configuration, fs, path, true);
500  }
501
502  /**
503   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
504   * @return a writer that won't overwrite files. Caller must close.
505   */
506  public static Writer createWALWriter(final FileSystem fs, final Path path,
507    final Configuration configuration) throws IOException {
508    return FSHLogProvider.createWriter(configuration, fs, path, false);
509  }
510
511  public final WALProvider getWALProvider() {
512    return this.provider;
513  }
514
515  /**
516   * @return Current metaProvider... may be null if not yet initialized.
517   * @see #getMetaProvider()
518   */
519  public final WALProvider getMetaWALProvider() {
520    return this.metaProvider.get();
521  }
522
523  public ExcludeDatanodeManager getExcludeDatanodeManager() {
524    return excludeDatanodeManager;
525  }
526
527  @RestrictedApi(explanation = "Should only be called in tests", link = "",
528      allowedOnPath = ".*/src/test/.*")
529  public String getFactoryId() {
530    return this.factoryId;
531  }
532}