001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.client.RegionReplicaUtil;
033import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
034import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
035import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
036import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
037import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
038import org.apache.hadoop.hbase.util.CancelableProgressable;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
041import org.apache.hadoop.hbase.wal.WALProvider.Writer;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
047
048/**
049 * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
050 * particular WALProvider we use to handle wal requests. Configure which provider gets used with the
051 * configuration setting "hbase.wal.provider". Available implementations:
052 * <ul>
053 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
054 * "asyncfs"</li>
055 * <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
056 * FileSystem interface via an asynchronous client.</li>
057 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
058 * FileSystem interface via HDFS's synchronous DFSClient.</li>
059 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
060 * server.</li>
061 * </ul>
062 * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
063 */
064@InterfaceAudience.Private
065public class WALFactory {
066
067  /**
068   * Used in tests for injecting customized stream reader implementation, for example, inject fault
069   * when reading, etc.
070   * <p/>
071   * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we
072   * will also determine whether the WAL file is encrypted and we should use
073   * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the
074   * header of the WAL file, so we do not need to specify a specical reader to read the WAL file
075   * either.
076   * <p/>
077   * So typically you should not use this config in production.
078   */
079  public static final String WAL_STREAM_READER_CLASS_IMPL =
080    "hbase.regionserver.wal.stream.reader.impl";
081
082  private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
083
084  /**
085   * Maps between configuration names for providers and implementation classes.
086   */
087  enum Providers {
088    defaultProvider(AsyncFSWALProvider.class),
089    filesystem(FSHLogProvider.class),
090    multiwal(RegionGroupingProvider.class),
091    asyncfs(AsyncFSWALProvider.class);
092
093    final Class<? extends WALProvider> clazz;
094
095    Providers(Class<? extends WALProvider> clazz) {
096      this.clazz = clazz;
097    }
098  }
099
100  public static final String WAL_PROVIDER = "hbase.wal.provider";
101  static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
102
103  public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
104
105  public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider";
106
107  public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
108
109  static final String REPLICATION_WAL_PROVIDER_ID = "rep";
110
111  final String factoryId;
112  final Abortable abortable;
113  private final WALProvider provider;
114  // The meta updates are written to a different wal. If this
115  // regionserver holds meta regions, then this ref will be non-null.
116  // lazily intialized; most RegionServers don't deal with META
117  private final LazyInitializedWALProvider metaProvider;
118  // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
119  // generate a lot useless data, see HBASE-27775 for more details.
120  private final LazyInitializedWALProvider replicationProvider;
121
122  /**
123   * Configuration-specified WAL Reader used when a custom reader is requested
124   */
125  private final Class<? extends WALStreamReader> walStreamReaderClass;
126
127  /**
128   * How long to attempt opening in-recovery wals
129   */
130  private final int timeoutMillis;
131
132  private final Configuration conf;
133
134  private final ExcludeDatanodeManager excludeDatanodeManager;
135
136  // Used for the singleton WALFactory, see below.
137  private WALFactory(Configuration conf) {
138    // this code is duplicated here so we can keep our members final.
139    // until we've moved reader/writer construction down into providers, this initialization must
140    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
141    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
142    /* TODO Both of these are probably specific to the fs wal provider */
143    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
144      ProtobufWALStreamReader.class, WALStreamReader.class);
145    Preconditions.checkArgument(
146      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
147      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
148      AbstractFSWALProvider.Initializer.class.getName());
149    this.conf = conf;
150    // end required early initialization
151
152    // this instance can't create wals, just reader/writers.
153    provider = null;
154    factoryId = SINGLETON_ID;
155    this.abortable = null;
156    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
157    this.metaProvider = null;
158    this.replicationProvider = null;
159  }
160
161  Providers getDefaultProvider() {
162    return Providers.defaultProvider;
163  }
164
165  Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
166    try {
167      Providers provider = Providers.valueOf(conf.get(key, defaultValue));
168
169      // AsyncFSWALProvider is not guaranteed to work on all Hadoop versions, when it's chosen as
170      // the default and we can't use it, we want to fall back to FSHLog which we know works on
171      // all versions.
172      if (
173        provider == getDefaultProvider() && provider.clazz == AsyncFSWALProvider.class
174          && !AsyncFSWALProvider.load()
175      ) {
176        // AsyncFSWAL has better performance in most cases, and also uses less resources, we will
177        // try to use it if possible. It deeply hacks into the internal of DFSClient so will be
178        // easily broken when upgrading hadoop.
179        LOG.warn("Failed to load AsyncFSWALProvider, falling back to FSHLogProvider");
180        return FSHLogProvider.class;
181      }
182
183      // N.b. If the user specifically requested AsyncFSWALProvider but their environment doesn't
184      // support using it (e.g. AsyncFSWALProvider.load() == false), we should let this fail and
185      // not fall back to FSHLogProvider.
186      return provider.clazz;
187    } catch (IllegalArgumentException exception) {
188      // Fall back to them specifying a class name
189      // Note that the passed default class shouldn't actually be used, since the above only fails
190      // when there is a config value present.
191      return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class);
192    }
193  }
194
195  static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
196    LOG.info("Instantiating WALProvider of type {}", clazz);
197    try {
198      return clazz.getDeclaredConstructor().newInstance();
199    } catch (Exception e) {
200      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
201      LOG.debug("Exception details for failure to load WALProvider.", e);
202      throw new IOException("couldn't set up WALProvider", e);
203    }
204  }
205
206  /**
207   * Create a WALFactory.
208   */
209  @RestrictedApi(explanation = "Should only be called in tests", link = "",
210      allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java|.*/WALPerformanceEvaluation.java")
211  public WALFactory(Configuration conf, String factoryId) throws IOException {
212    // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
213    // for HMaster or HRegionServer which take system table only. See HBASE-19999
214    this(conf, factoryId, null);
215  }
216
217  /**
218   * Create a WALFactory.
219   * <p/>
220   * This is the constructor you should use when creating a WALFactory in normal code, to make sure
221   * that the {@code factoryId} is the server name. We need this assumption in some places for
222   * parsing the server name out from the wal file name.
223   * @param conf       must not be null, will keep a reference to read params in later reader/writer
224   *                   instances.
225   * @param serverName use to generate the factoryId, which will be append at the first of the final
226   *                   file name
227   * @param abortable  the server associated with this WAL file
228   */
229  public WALFactory(Configuration conf, ServerName serverName, Abortable abortable)
230    throws IOException {
231    this(conf, serverName.toString(), abortable);
232  }
233
234  /**
235   * @param conf      must not be null, will keep a reference to read params in later reader/writer
236   *                  instances.
237   * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
238   *                  to make a directory
239   * @param abortable the server associated with this WAL file
240   */
241  private WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException {
242    // until we've moved reader/writer construction down into providers, this initialization must
243    // happen prior to provider initialization, in case they need to instantiate a reader/writer.
244    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
245    /* TODO Both of these are probably specific to the fs wal provider */
246    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
247      ProtobufWALStreamReader.class, WALStreamReader.class);
248    Preconditions.checkArgument(
249      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
250      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
251      AbstractFSWALProvider.Initializer.class.getName());
252    this.conf = conf;
253    this.factoryId = factoryId;
254    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
255    this.abortable = abortable;
256    this.metaProvider = new LazyInitializedWALProvider(this,
257      AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable);
258    this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID,
259      REPLICATION_WAL_PROVIDER, this.abortable);
260    // end required early initialization
261    if (conf.getBoolean(WAL_ENABLED, true)) {
262      WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
263      provider.init(this, conf, null, this.abortable);
264      provider.addWALActionsListener(new MetricsWAL());
265      this.provider = provider;
266    } else {
267      // special handling of existing configuration behavior.
268      LOG.warn("Running with WAL disabled.");
269      provider = new DisabledWALProvider();
270      provider.init(this, conf, factoryId, null);
271    }
272  }
273
274  public Configuration getConf() {
275    return conf;
276  }
277
278  /**
279   * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
280   * replay and edits that have gone to any wals from this factory.
281   */
282  public void close() throws IOException {
283    List<IOException> ioes = new ArrayList<>();
284    // these fields could be null if the WALFactory is created only for being used in the
285    // getInstance method.
286    if (metaProvider != null) {
287      try {
288        metaProvider.close();
289      } catch (IOException e) {
290        ioes.add(e);
291      }
292    }
293    if (replicationProvider != null) {
294      try {
295        replicationProvider.close();
296      } catch (IOException e) {
297        ioes.add(e);
298      }
299    }
300    if (provider != null) {
301      try {
302        provider.close();
303      } catch (IOException e) {
304        ioes.add(e);
305      }
306    }
307    if (!ioes.isEmpty()) {
308      IOException ioe = new IOException("Failed to close WALFactory");
309      for (IOException e : ioes) {
310        ioe.addSuppressed(e);
311      }
312      throw ioe;
313    }
314  }
315
316  /**
317   * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. If you
318   * are not ending cleanly and will need to replay edits from this factory's wals, use this method
319   * if you can as it will try to leave things as tidy as possible.
320   */
321  public void shutdown() throws IOException {
322    List<IOException> ioes = new ArrayList<>();
323    // these fields could be null if the WALFactory is created only for being used in the
324    // getInstance method.
325    if (metaProvider != null) {
326      try {
327        metaProvider.shutdown();
328      } catch (IOException e) {
329        ioes.add(e);
330      }
331    }
332    if (replicationProvider != null) {
333      try {
334        replicationProvider.shutdown();
335      } catch (IOException e) {
336        ioes.add(e);
337      }
338    }
339    if (provider != null) {
340      try {
341        provider.shutdown();
342      } catch (IOException e) {
343        ioes.add(e);
344      }
345    }
346    if (!ioes.isEmpty()) {
347      IOException ioe = new IOException("Failed to shutdown WALFactory");
348      for (IOException e : ioes) {
349        ioe.addSuppressed(e);
350      }
351      throw ioe;
352    }
353  }
354
355  public List<WAL> getWALs() {
356    return provider.getWALs();
357  }
358
359  @RestrictedApi(explanation = "Should only be called in tests", link = "",
360      allowedOnPath = ".*/src/test/.*")
361  WALProvider getMetaProvider() throws IOException {
362    return metaProvider.getProvider();
363  }
364
365  @RestrictedApi(explanation = "Should only be called in tests", link = "",
366      allowedOnPath = ".*/src/test/.*")
367  WALProvider getReplicationProvider() throws IOException {
368    return replicationProvider.getProvider();
369  }
370
371  /**
372   * @param region the region which we want to get a WAL for. Could be null.
373   */
374  public WAL getWAL(RegionInfo region) throws IOException {
375    // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
376    if (region != null && RegionReplicaUtil.isDefaultReplica(region)) {
377      if (region.isMetaRegion()) {
378        return metaProvider.getProvider().getWAL(region);
379      } else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) {
380        return replicationProvider.getProvider().getWAL(region);
381      }
382    }
383    return provider.getWAL(region);
384  }
385
386  public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
387    return createStreamReader(fs, path, (CancelableProgressable) null);
388  }
389
390  /**
391   * Create a one-way stream reader for the WAL.
392   * @return A WAL reader. Close when done with it.
393   */
394  public WALStreamReader createStreamReader(FileSystem fs, Path path,
395    CancelableProgressable reporter) throws IOException {
396    return createStreamReader(fs, path, reporter, -1);
397  }
398
399  /**
400   * Create a one-way stream reader for the WAL, and start reading from the given
401   * {@code startPosition}.
402   * @return A WAL reader. Close when done with it.
403   */
404  public WALStreamReader createStreamReader(FileSystem fs, Path path,
405    CancelableProgressable reporter, long startPosition) throws IOException {
406    try {
407      // A wal file could be under recovery, so it may take several
408      // tries to get it open. Instead of claiming it is corrupted, retry
409      // to open it up to 5 minutes by default.
410      long startWaiting = EnvironmentEdgeManager.currentTime();
411      long openTimeout = timeoutMillis + startWaiting;
412      int nbAttempt = 0;
413      WALStreamReader reader = null;
414      while (true) {
415        try {
416          reader = walStreamReaderClass.getDeclaredConstructor().newInstance();
417          ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition);
418          return reader;
419        } catch (Exception e) {
420          // catch Exception so that we close reader for all exceptions. If we don't
421          // close the reader, we leak a socket.
422          if (reader != null) {
423            reader.close();
424          }
425
426          // Only inspect the Exception to consider retry when it's an IOException
427          if (e instanceof IOException) {
428            String msg = e.getMessage();
429            if (
430              msg != null && (msg.contains("Cannot obtain block length")
431                || msg.contains("Could not obtain the last block")
432                || msg.matches("Blocklist for [^ ]* has changed.*"))
433            ) {
434              if (++nbAttempt == 1) {
435                LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
436              }
437              if (reporter != null && !reporter.progress()) {
438                throw new InterruptedIOException("Operation is cancelled");
439              }
440              if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
441                LOG.error("Can't open after " + nbAttempt + " attempts and "
442                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
443              } else {
444                try {
445                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
446                  continue; // retry
447                } catch (InterruptedException ie) {
448                  InterruptedIOException iioe = new InterruptedIOException();
449                  iioe.initCause(ie);
450                  throw iioe;
451                }
452              }
453              throw new LeaseNotRecoveredException(e);
454            }
455          }
456
457          // Rethrow the original exception if we are not retrying due to HDFS-isms.
458          throw e;
459        }
460      }
461    } catch (IOException ie) {
462      throw ie;
463    } catch (Exception e) {
464      throw new IOException("Cannot get log reader", e);
465    }
466  }
467
468  /**
469   * Create a writer for the WAL. Uses defaults.
470   * <p>
471   * Should be package-private. public only for tests and
472   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
473   * @return A WAL writer. Close when done with it.
474   */
475  public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
476    return FSHLogProvider.createWriter(conf, fs, path, false);
477  }
478
479  /**
480   * Should be package-private, visible for recovery testing. Uses defaults.
481   * @return an overwritable writer for recovered edits. caller should close.
482   */
483  public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
484    throws IOException {
485    return FSHLogProvider.createWriter(conf, fs, path, true);
486  }
487
488  // These static methods are currently used where it's impractical to
489  // untangle the reliance on state in the filesystem. They rely on singleton
490  // WALFactory that just provides Reader / Writers.
491  // For now, first Configuration object wins. Practically this just impacts the reader/writer class
492  private static final AtomicReference<WALFactory> singleton = new AtomicReference<>();
493  private static final String SINGLETON_ID = WALFactory.class.getName();
494
495  // Public only for FSHLog
496  public static WALFactory getInstance(Configuration configuration) {
497    WALFactory factory = singleton.get();
498    if (null == factory) {
499      WALFactory temp = new WALFactory(configuration);
500      if (singleton.compareAndSet(null, temp)) {
501        factory = temp;
502      } else {
503        // someone else beat us to initializing
504        try {
505          temp.close();
506        } catch (IOException exception) {
507          LOG.debug("failed to close temporary singleton. ignoring.", exception);
508        }
509        factory = singleton.get();
510      }
511    }
512    return factory;
513  }
514
515  /**
516   * Create a tailing reader for the given path. Mainly used in replication.
517   */
518  public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf,
519    long startPosition) throws IOException {
520    ProtobufWALTailingReader reader = new ProtobufWALTailingReader();
521    reader.init(fs, path, conf, startPosition);
522    return reader;
523  }
524
525  /**
526   * Create a one-way stream reader for a given path.
527   */
528  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf)
529    throws IOException {
530    return createStreamReader(fs, path, conf, -1);
531  }
532
533  /**
534   * Create a one-way stream reader for a given path.
535   */
536  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf,
537    long startPosition) throws IOException {
538    return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null,
539      startPosition);
540  }
541
542  /**
543   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
544   * @return a Writer that will overwrite files. Caller must close.
545   */
546  static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
547    final Configuration configuration) throws IOException {
548    return FSHLogProvider.createWriter(configuration, fs, path, true);
549  }
550
551  /**
552   * If you already have a WALFactory, you should favor the instance method. Uses defaults.
553   * @return a writer that won't overwrite files. Caller must close.
554   */
555  public static Writer createWALWriter(final FileSystem fs, final Path path,
556    final Configuration configuration) throws IOException {
557    return FSHLogProvider.createWriter(configuration, fs, path, false);
558  }
559
560  public WALProvider getWALProvider() {
561    return this.provider;
562  }
563
564  /**
565   * Returns all the wal providers, for example, the default one, the one for hbase:meta and the one
566   * for hbase:replication.
567   */
568  public List<WALProvider> getAllWALProviders() {
569    List<WALProvider> providers = new ArrayList<>();
570    if (provider != null) {
571      providers.add(provider);
572    }
573    WALProvider meta = metaProvider.getProviderNoCreate();
574    if (meta != null) {
575      providers.add(meta);
576    }
577    WALProvider replication = replicationProvider.getProviderNoCreate();
578    if (replication != null) {
579      providers.add(replication);
580    }
581    return providers;
582  }
583
584  public ExcludeDatanodeManager getExcludeDatanodeManager() {
585    return excludeDatanodeManager;
586  }
587
588  @RestrictedApi(explanation = "Should only be called in tests", link = "",
589      allowedOnPath = ".*/src/test/.*")
590  public String getFactoryId() {
591    return this.factoryId;
592  }
593}