001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.client.RegionReplicaUtil;
033import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
034import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
035import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
036import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
037import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
038import org.apache.hadoop.hbase.util.CancelableProgressable;
039import org.apache.hadoop.hbase.util.CommonFSUtils;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
042import org.apache.hadoop.hbase.wal.WALProvider.Writer;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
048
049/**
050 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
051 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the
052 * configuration setting "hbase.wal.provider". Available implementations:
053 * <ul>
054 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
055 * "asyncfs"</li>
056 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
057 * FileSystem interface via an asynchronous client.</li>
058 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
059 * FileSystem interface via HDFS's synchronous DFSClient.</li>
060 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
061 * server.</li>
062 * </ul>
063 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
064 */
065@InterfaceAudience.Private
066public class WALFactory {
067
068  /**
069   * Used in tests for injecting customized stream reader implementation, for example, inject fault
070   * when reading, etc.
071   * <p/>
072   * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we
073   * will also determine whether the WAL file is encrypted and we should use
074   * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the
075   * header of the WAL file, so we do not need to specify a specical reader to read the WAL file
076   * either.
077   * <p/>
078   * So typically you should not use this config in production.
079   */
080  public static final String WAL_STREAM_READER_CLASS_IMPL =
081    "hbase.regionserver.wal.stream.reader.impl";
082
083  private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
084
085  /**
086   * Maps between configuration names for providers and implementation classes.
087   */
088  enum Providers {
089    defaultProvider(AsyncFSWALProvider.class),
090    filesystem(FSHLogProvider.class),
091    multiwal(RegionGroupingProvider.class),
092    asyncfs(AsyncFSWALProvider.class);
093
094    final Class<? extends WALProvider> clazz;
095
096    Providers(Class<? extends WALProvider> clazz) {
097      this.clazz = clazz;
098    }
099  }
100
101  public static final String WAL_PROVIDER = "hbase.wal.provider";
102  static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
103
104  public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
105
106  public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider";
107
108  public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
109
110  static final String REPLICATION_WAL_PROVIDER_ID = "rep";
111
112  final String factoryId;
113  final Abortable abortable;
114  private final WALProvider provider;
115  // The meta updates are written to a different wal. If this
116  // regionserver holds meta regions, then this ref will be non-null.
117  // lazily intialized; most RegionServers don't deal with META
118  private final LazyInitializedWALProvider metaProvider;
119  // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
120  // generate a lot useless data, see HBASE-27775 for more details.
121  private final LazyInitializedWALProvider replicationProvider;
122
123  /**
124   * Configuration-specified WAL Reader used when a custom reader is requested
125   */
126  private final Class<? extends WALStreamReader> walStreamReaderClass;
127
128  /**
129   * How long to attempt opening in-recovery wals
130   */
131  private final int timeoutMillis;
132
133  private final Configuration conf;
134
135  private final ExcludeDatanodeManager excludeDatanodeManager;
136
137  // Used for the singleton WALFactory, see below.
138  private WALFactory(Configuration conf) {
139    // this code is duplicated here so we can keep our members final.
140    // until we've moved reader/writer construction down into providers, this initialization must
141    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
142    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
143    /* TODO Both of these are probably specific to the fs wal provider */
144    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
145      ProtobufWALStreamReader.class, WALStreamReader.class);
146    Preconditions.checkArgument(
147      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
148      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
149      AbstractFSWALProvider.Initializer.class.getName());
150    this.conf = conf;
151    // end required early initialization
152
153    // this instance can't create wals, just reader/writers.
154    provider = null;
155    factoryId = SINGLETON_ID;
156    this.abortable = null;
157    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
158    this.metaProvider = null;
159    this.replicationProvider = null;
160  }
161
162  Providers getDefaultProvider() {
163    return Providers.defaultProvider;
164  }
165
166  Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
167    try {
168      Providers provider = Providers.valueOf(conf.get(key, defaultValue));
169
170      // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
171      // the default and we can't use it, we want to fall back to FSHLog which we know works on
172      // all versions.
173      if (
174        provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
175          && !AsyncFSWALProvider.load()
176      ) {
177        // AsyncFSWAL has better performance in most cases, and also uses less resources, we will
178        // try to use it if possible. It deeply hacks into the internal of DFSClient so will be
179        // easily broken when upgrading hadoop.
180        LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
181        return FSHLogProvider.class;
182      }
183
184      // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
185      // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
186      // not fall back to FSHLogProvider.
187      return provider.clazz;
188    } catch (IllegalArgumentException exception) {
189      // Fall back to them specifying a class name
190      // Note that the passed default class shouldn't actually be used, since the above only fails
191      // when there is a config value present.
192      return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
193    }
194  }
195
196  static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
197    LOG.info("Instantiating WALProvider of type {}", clazz);
198    try {
199      return clazz.getDeclaredConstructor().newInstance();
200    } catch (Exception e) {
201      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
202      LOG.debug("Exception details for failure to load WALProvider.", e);
203      throw new IOException("couldn't set up WALProvider", e);
204    }
205  }
206
207  /**
208   * Create a WALFactory.
209   */
210  @RestrictedApi(explanation = "Should only be called in tests", link = "",
211      allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java|.*/WALPerformanceEvaluation.java")
212  public WALFactory(Configuration conf, String factoryId) throws IOException {
213    // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
214    // for HMaster or HRegionServer which take system table only. See HBASE-19999
215    this(conf, factoryId, null, true);
216  }
217
218  /**
219   * Create a WALFactory.
220   * <p/>
221   * This is the constructor you should use when creating a WALFactory in normal code, to make sure
222   * that the {@code factoryId} is the server name. We need this assumption in some places for
223   * parsing the server name out from the wal file name.
224   * @param conf       must not be null, will keep a reference to read params in later reader/writer
225   *                   instances.
226   * @param serverName use to generate the factoryId, which will be append at the first of the final
227   *                   file name
228   * @param abortable  the server associated with this WAL file
229   */
230  public WALFactory(Configuration conf, ServerName serverName, Abortable abortable)
231    throws IOException {
232    this(conf, serverName.toString(), abortable, false);
233  }
234
235  private static void createWALDirectory(Configuration conf, String factoryId) throws IOException {
236    FileSystem walFs = CommonFSUtils.getWALFileSystem(conf);
237    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
238    Path walDir = new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(factoryId));
239    if (!walFs.exists(walDir) && !walFs.mkdirs(walDir)) {
240      throw new IOException("Can not create wal directory " + walDir);
241    }
242  }
243
244  /**
245   * @param conf               must not be null, will keep a reference to read params in later
246   *                           reader/writer instances.
247   * @param factoryId          a unique identifier for this factory. used i.e. by filesystem
248   *                           implementations to make a directory
249   * @param abortable          the server associated with this WAL file
250   * @param createWalDirectory pass {@code true} for testing purpose, to create the wal directory
251   *                           automatically. In normal code path, we should create it in
252   *                           HRegionServer setup.
253   */
254  private WALFactory(Configuration conf, String factoryId, Abortable abortable,
255    boolean createWalDirectory) throws IOException {
256    // until we've moved reader/writer construction down into providers, this initialization must
257    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
258    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
259    /* TODO Both of these are probably specific to the fs wal provider */
260    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
261      ProtobufWALStreamReader.class, WALStreamReader.class);
262    Preconditions.checkArgument(
263      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
264      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
265      AbstractFSWALProvider.Initializer.class.getName());
266    this.conf = conf;
267    this.factoryId = factoryId;
268    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
269    this.abortable = abortable;
270    this.metaProvider = new LazyInitializedWALProvider(this,
271      AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable);
272    this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID,
273      REPLICATION_WAL_PROVIDER, this.abortable);
274    // end required early initialization
275    if (conf.getBoolean(WAL_ENABLED, true)) {
276      if (createWalDirectory) {
277        // for testing only
278        createWALDirectory(conf, factoryId);
279      }
280      WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
281      provider.init(this, conf, null, this.abortable);
282      provider.addWALActionsListener(new MetricsWAL());
283      this.provider = provider;
284    } else {
285      // special handling of existing configuration behavior.
286      LOG.warn("Running with WAL disabled.");
287      provider = new DisabledWALProvider();
288      provider.init(this, conf, factoryId, null);
289    }
290  }
291
292  public Configuration getConf() {
293    return conf;
294  }
295
296  /**
297   * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
298   * replay and edits that have gone to any wals from this factory.
299   */
300  public void close() throws IOException {
301    List<IOException> ioes = new ArrayList<>();
302    // these fields could be null if the WALFactory is created only for being used in the
303    // getInstance method.
304    if (metaProvider != null) {
305      try {
306        metaProvider.close();
307      } catch (IOException e) {
308        ioes.add(e);
309      }
310    }
311    if (replicationProvider != null) {
312      try {
313        replicationProvider.close();
314      } catch (IOException e) {
315        ioes.add(e);
316      }
317    }
318    if (provider != null) {
319      try {
320        provider.close();
321      } catch (IOException e) {
322        ioes.add(e);
323      }
324    }
325    if (!ioes.isEmpty()) {
326      IOException ioe = new IOException("Failed to close WALFactory");
327      for (IOException e : ioes) {
328        ioe.addSuppressed(e);
329      }
330      throw ioe;
331    }
332  }
333
334  /**
335   * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you
336   * are not ending cleanly and will need to replay edits from this factory's wals, use this method
337   * if you can as it will try to leave things as tidy as possible.
338   */
339  public void shutdown() throws IOException {
340    List<IOException> ioes = new ArrayList<>();
341    // these fields could be null if the WALFactory is created only for being used in the
342    // getInstance method.
343    if (metaProvider != null) {
344      try {
345        metaProvider.shutdown();
346      } catch (IOException e) {
347        ioes.add(e);
348      }
349    }
350    if (replicationProvider != null) {
351      try {
352        replicationProvider.shutdown();
353      } catch (IOException e) {
354        ioes.add(e);
355      }
356    }
357    if (provider != null) {
358      try {
359        provider.shutdown();
360      } catch (IOException e) {
361        ioes.add(e);
362      }
363    }
364    if (!ioes.isEmpty()) {
365      IOException ioe = new IOException("Failed to shutdown WALFactory");
366      for (IOException e : ioes) {
367        ioe.addSuppressed(e);
368      }
369      throw ioe;
370    }
371  }
372
373  public List<WAL> getWALs() {
374    return provider.getWALs();
375  }
376
377  @RestrictedApi(explanation = "Should only be called in tests", link = "",
378      allowedOnPath = ".*/src/test/.*")
379  WALProvider getMetaProvider() throws IOException {
380    return metaProvider.getProvider();
381  }
382
383  @RestrictedApi(explanation = "Should only be called in tests", link = "",
384      allowedOnPath = ".*/src/test/.*")
385  WALProvider getReplicationProvider() throws IOException {
386    return replicationProvider.getProvider();
387  }
388
389  /**
390   * @param region the region which we want to get a WAL for. Could be null.
391   */
392  public WAL getWAL(RegionInfo region) throws IOException {
393    // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
394    if (region != null && RegionReplicaUtil.isDefaultReplica(region)) {
395      if (region.isMetaRegion()) {
396        return metaProvider.getProvider().getWAL(region);
397      } else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) {
398        return replicationProvider.getProvider().getWAL(region);
399      }
400    }
401    return provider.getWAL(region);
402  }
403
404  public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
405    return createStreamReader(fs, path, (CancelableProgressable) null);
406  }
407
408  /**
409   * Create a one-way stream reader for the WAL.
410   * @return A WAL reader. Close when done with it.
411   */
412  public WALStreamReader createStreamReader(FileSystem fs, Path path,
413    CancelableProgressable reporter) throws IOException {
414    return createStreamReader(fs, path, reporter, -1);
415  }
416
417  /**
418   * Create a one-way stream reader for the WAL, and start reading from the given
419   * {@code startPosition}.
420   * @return A WAL reader. Close when done with it.
421   */
422  public WALStreamReader createStreamReader(FileSystem fs, Path path,
423    CancelableProgressable reporter, long startPosition) throws IOException {
424    try {
425      // A wal file could be under recovery, so it may take several
426      // tries to get it open. Instead of claiming it is corrupted, retry
427      // to open it up to 5 minutes by default.
428      long startWaiting = EnvironmentEdgeManager.currentTime();
429      long openTimeout = timeoutMillis + startWaiting;
430      int nbAttempt = 0;
431      WALStreamReader reader = null;
432      while (true) {
433        try {
434          reader = walStreamReaderClass.getDeclaredConstructor().newInstance();
435          ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition);
436          return reader;
437        } catch (Exception e) {
438          // catch Exception so that we close reader for all exceptions. If we don't
439          // close the reader, we leak a socket.
440          if (reader != null) {
441            reader.close();
442          }
443
444          // Only inspect the Exception to consider retry when it's an IOException
445          if (e instanceof IOException) {
446            String msg = e.getMessage();
447            if (
448              msg != null && (msg.contains("Cannot obtain block length")
449                || msg.contains("Could not obtain the last block")
450                || msg.matches("Blocklist for [^ ]* has changed.*"))
451            ) {
452              if (++nbAttempt == 1) {
453                LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
454              }
455              if (reporter != null && !reporter.progress()) {
456                throw new InterruptedIOException("Operation is cancelled");
457              }
458              if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
459                LOG.error("Can't open after " + nbAttempt + " attempts and "
460                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
461              } else {
462                try {
463                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
464                  continue; // retry
465                } catch (InterruptedException ie) {
466                  InterruptedIOException iioe = new InterruptedIOException();
467                  iioe.initCause(ie);
468                  throw iioe;
469                }
470              }
471              throw new LeaseNotRecoveredException(e);
472            }
473          }
474
475          // Rethrow the original exception if we are not retrying due to HDFS-isms.
476          throw e;
477        }
478      }
479    } catch (IOException ie) {
480      throw ie;
481    } catch (Exception e) {
482      throw new IOException("Cannot get log reader", e);
483    }
484  }
485
486  /**
487   * Create a writer for the WAL. Uses defaults.
488   * <p>
489   * Should be package-private. public only for tests and
490   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
491   * @return A WAL writer. Close when done with it.
492   */
493  public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
494    return FSHLogProvider.createWriter(conf, fs, path, false);
495  }
496
497  /**
498   * Should be package-private, visible for recovery testing. Uses defaults.
499   * @return an overwritable writer for recovered edits. caller should close.
500   */
501  public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
502    throws IOException {
503    return FSHLogProvider.createWriter(conf, fs, path, true);
504  }
505
506  // These static methods are currently used where it's impractical to
507  // untangle the reliance on state in the filesystem. They rely on singleton
508  // WALFactory that just provides Reader / Writers.
509  // For now, first Configuration object wins. Practically this just impacts the reader/writer class
510  private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
511  private static final String SINGLETON_ID = WALFactory.class.getName();
512
513  // Public only for FSHLog
514  public static WALFactory getInstance(Configuration configuration) {
515    WALFactory factory = singleton.get();
516    if (null == factory) {
517      WALFactory temp = new WALFactory(configuration);
518      if (singleton.compareAndSet(null, temp)) {
519        factory = temp;
520      } else {
521        // someone else beat us to initializing
522        try {
523          temp.close();
524        } catch (IOException exception) {
525          LOG.debug("failed to close temporary singleton. ignoring.", exception);
526        }
527        factory = singleton.get();
528      }
529    }
530    return factory;
531  }
532
533  /**
534   * Create a tailing reader for the given path. Mainly used in replication.
535   */
536  public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf,
537    long startPosition) throws IOException {
538    ProtobufWALTailingReader reader = new ProtobufWALTailingReader();
539    reader.init(fs, path, conf, startPosition);
540    return reader;
541  }
542
543  /**
544   * Create a one-way stream reader for a given path.
545   */
546  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf)
547    throws IOException {
548    return createStreamReader(fs, path, conf, -1);
549  }
550
551  /**
552   * Create a one-way stream reader for a given path.
553   */
554  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf,
555    long startPosition) throws IOException {
556    return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null,
557      startPosition);
558  }
559
560  /**
561   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
562   * @return a Writer that will overwrite files. Caller must close.
563   */
564  static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
565    final Configuration configuration) throws IOException {
566    return FSHLogProvider.createWriter(configuration, fs, path, true);
567  }
568
569  /**
570   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
571   * @return a writer that won't overwrite files. Caller must close.
572   */
573  public static Writer createWALWriter(final FileSystem fs, final Path path,
574    final Configuration configuration) throws IOException {
575    return FSHLogProvider.createWriter(configuration, fs, path, false);
576  }
577
578  public WALProvider getWALProvider() {
579    return this.provider;
580  }
581
582  /**
583   * Returns all the wal providers, for example, the default one, the one for hbase:meta and the one
584   * for hbase:replication.
585   */
586  public List<WALProvider> getAllWALProviders() {
587    List<WALProvider> providers = new ArrayList<>();
588    if (provider != null) {
589      providers.add(provider);
590    }
591    WALProvider meta = metaProvider.getProviderNoCreate();
592    if (meta != null) {
593      providers.add(meta);
594    }
595    WALProvider replication = replicationProvider.getProviderNoCreate();
596    if (replication != null) {
597      providers.add(replication);
598    }
599    return providers;
600  }
601
602  public ExcludeDatanodeManager getExcludeDatanodeManager() {
603    return excludeDatanodeManager;
604  }
605
606  @RestrictedApi(explanation = "Should only be called in tests", link = "",
607      allowedOnPath = ".*/src/test/.*")
608  public String getFactoryId() {
609    return this.factoryId;
610  }
611}