001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Objects;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.locks.ReadWriteLock;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
039import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
040import org.apache.hadoop.hbase.util.CancelableProgressable;
041import org.apache.hadoop.hbase.util.FSUtils;
042import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.apache.yetus.audience.InterfaceStability;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
053 * default, this implementation picks a directory in Hadoop FS based on a combination of
054 * <ul>
055 * <li>the HBase root directory
056 * <li>HConstants.HREGION_LOGDIR_NAME
057 * <li>the given factory's factoryId (usually identifying the regionserver by host:port)
058 * </ul>
059 * It also uses the providerId to differentiate among files.
060 */
061@InterfaceAudience.Private
062@InterfaceStability.Evolving
063public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider {
064
065  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
066
067  /** Separate old log into different dir by regionserver name **/
068  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
069  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
070
071  // Only public so classes back in regionserver.wal can access
072  public interface Reader extends WAL.Reader {
073    /**
074     * @param fs File system.
075     * @param path Path.
076     * @param c Configuration.
077     * @param s Input stream that may have been pre-opened by the caller; may be null.
078     */
079    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
080  }
081
082  protected volatile T wal;
083  protected WALFactory factory;
084  protected Configuration conf;
085  protected List<WALActionsListener> listeners = new ArrayList<>();
086  protected String providerId;
087  protected AtomicBoolean initialized = new AtomicBoolean(false);
088  // for default wal provider, logPrefix won't change
089  protected String logPrefix;
090
091  /**
092   * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
093   * missing the newly created WAL, see HBASE-21503 for more details.
094   */
095  private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
096
097  /**
098   * @param factory factory that made us, identity used for FS layout. may not be null
099   * @param conf may not be null
100   * @param providerId differentiate between providers from one factory, used for FS layout. may be
101   *          null
102   */
103  @Override
104  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
105    if (!initialized.compareAndSet(false, true)) {
106      throw new IllegalStateException("WALProvider.init should only be called once.");
107    }
108    this.factory = factory;
109    this.conf = conf;
110    this.providerId = providerId;
111    // get log prefix
112    StringBuilder sb = new StringBuilder().append(factory.factoryId);
113    if (providerId != null) {
114      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
115        sb.append(providerId);
116      } else {
117        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
118      }
119    }
120    logPrefix = sb.toString();
121    doInit(conf);
122  }
123
124  @Override
125  public List<WAL> getWALs() {
126    if (wal != null) {
127      return Lists.newArrayList(wal);
128    }
129    walCreateLock.readLock().lock();
130    try {
131      if (wal == null) {
132        return Collections.emptyList();
133      } else {
134        return Lists.newArrayList(wal);
135      }
136    } finally {
137      walCreateLock.readLock().unlock();
138    }
139  }
140
141  @Override
142  public T getWAL(RegionInfo region) throws IOException {
143    T walCopy = wal;
144    if (walCopy != null) {
145      return walCopy;
146    }
147    walCreateLock.writeLock().lock();
148    try {
149      walCopy = wal;
150      if (walCopy != null) {
151        return walCopy;
152      }
153      walCopy = createWAL();
154      boolean succ = false;
155      try {
156        walCopy.init();
157        succ = true;
158      } finally {
159        if (!succ) {
160          try {
161            walCopy.close();
162          } catch (Throwable t) {
163            throw new FailedCloseWALAfterInitializedErrorException(
164              "Failed close after init wal failed.", t);
165          }
166        }
167      }
168      wal = walCopy;
169      return walCopy;
170    } finally {
171      walCreateLock.writeLock().unlock();
172    }
173  }
174
175  protected abstract T createWAL() throws IOException;
176
177  protected abstract void doInit(Configuration conf) throws IOException;
178
179  @Override
180  public void shutdown() throws IOException {
181    T log = this.wal;
182    if (log != null) {
183      log.shutdown();
184    }
185  }
186
187  @Override
188  public void close() throws IOException {
189    T log = this.wal;
190    if (log != null) {
191      log.close();
192    }
193  }
194
195  /**
196   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
197   * number of files (rolled and active). if either of them aren't, count 0 for that provider.
198   */
199  @Override
200  public long getNumLogFiles() {
201    T log = this.wal;
202    return log == null ? 0 : log.getNumLogFiles();
203  }
204
205  /**
206   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
207   * size of files (only rolled). if either of them aren't, count 0 for that provider.
208   */
209  @Override
210  public long getLogFileSize() {
211    T log = this.wal;
212    return log == null ? 0 : log.getLogFileSize();
213  }
214
215  /**
216   * returns the number of rolled WAL files.
217   */
218  @VisibleForTesting
219  public static int getNumRolledLogFiles(WAL wal) {
220    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
221  }
222
223  /**
224   * returns the size of rolled WAL files.
225   */
226  @VisibleForTesting
227  public static long getLogFileSize(WAL wal) {
228    return ((AbstractFSWAL<?>) wal).getLogFileSize();
229  }
230
231  /**
232   * return the current filename from the current wal.
233   */
234  @VisibleForTesting
235  public static Path getCurrentFileName(final WAL wal) {
236    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
237  }
238
239  /**
240   * request a log roll, but don't actually do it.
241   */
242  @VisibleForTesting
243  static void requestLogRoll(final WAL wal) {
244    ((AbstractFSWAL<?>) wal).requestLogRoll();
245  }
246
247  // should be package private; more visible for use in AbstractFSWAL
248  public static final String WAL_FILE_NAME_DELIMITER = ".";
249  /** The hbase:meta region's WAL filename extension */
250  @VisibleForTesting
251  public static final String META_WAL_PROVIDER_ID = ".meta";
252  static final String DEFAULT_PROVIDER_ID = "default";
253
254  // Implementation details that currently leak in tests or elsewhere follow
255  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
256  public static final String SPLITTING_EXT = "-splitting";
257
258  /**
259   * It returns the file create timestamp from the file name. For name format see
260   * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal
261   * @param wal must not be null
262   * @return the file number that is part of the WAL file name
263   */
264  @VisibleForTesting
265  public static long extractFileNumFromWAL(final WAL wal) {
266    final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
267    if (walName == null) {
268      throw new IllegalArgumentException("The WAL path couldn't be null");
269    }
270    final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
271    return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]);
272  }
273
274  /**
275   * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
276   * description.
277   */
278  private static final Pattern pattern =
279    Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
280
281  /**
282   * A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
283   * &lt;file-creation-timestamp&gt;[.meta]. provider-name is usually made up of a server-name and a
284   * provider-id
285   * @param filename name of the file to validate
286   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
287   */
288  public static boolean validateWALFilename(String filename) {
289    return pattern.matcher(filename).matches();
290  }
291
292  /**
293   * Construct the directory name for all WALs on a given server. Dir names currently look like this
294   * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
295   * @param serverName Server name formatted as described in {@link ServerName}
296   * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
297   *         <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
298   */
299  public static String getWALDirectoryName(final String serverName) {
300    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
301    dirName.append("/");
302    dirName.append(serverName);
303    return dirName.toString();
304  }
305
306  /**
307   * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
308   * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
309   * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
310   * @param conf
311   * @param serverName Server name formatted as described in {@link ServerName}
312   * @return the relative WAL directory name
313   */
314  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
315    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
316    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
317      dirName.append(Path.SEPARATOR);
318      dirName.append(serverName);
319    }
320    return dirName.toString();
321  }
322
323  /**
324   * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
325   * this method ignores the format of the logfile component. Current format: [base directory for
326   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
327   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
328   * server-specific directories.
329   * @return null if it's not a log file. Returns the ServerName of the region server that created
330   *         this log file otherwise.
331   */
332  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
333      throws IOException {
334    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
335      return null;
336    }
337
338    if (conf == null) {
339      throw new IllegalArgumentException("parameter conf must be set");
340    }
341
342    final String rootDir = conf.get(HConstants.HBASE_DIR);
343    if (rootDir == null || rootDir.isEmpty()) {
344      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
345    }
346
347    final StringBuilder startPathSB = new StringBuilder(rootDir);
348    if (!rootDir.endsWith("/")) {
349      startPathSB.append('/');
350    }
351    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
352    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
353      startPathSB.append('/');
354    }
355    final String startPath = startPathSB.toString();
356
357    String fullPath;
358    try {
359      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
360    } catch (IllegalArgumentException e) {
361      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
362      return null;
363    }
364
365    if (!fullPath.startsWith(startPath)) {
366      return null;
367    }
368
369    final String serverNameAndFile = fullPath.substring(startPath.length());
370
371    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
372      // Either it's a file (not a directory) or it's not a ServerName format
373      return null;
374    }
375
376    Path p = new Path(path);
377    return getServerNameFromWALDirectoryName(p);
378  }
379
380  /**
381   * This function returns region server name from a log file name which is in one of the following
382   * formats:
383   * <ul>
384   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;-splitting/...</li>
385   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
386   * </ul>
387   * @return null if the passed in logFile isn't a valid WAL file path
388   */
389  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
390    String logDirName = logFile.getParent().getName();
391    // We were passed the directory and not a file in it.
392    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
393      logDirName = logFile.getName();
394    }
395    ServerName serverName = null;
396    if (logDirName.endsWith(SPLITTING_EXT)) {
397      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
398    }
399    try {
400      serverName = ServerName.parseServerName(logDirName);
401    } catch (IllegalArgumentException | IllegalStateException ex) {
402      serverName = null;
403      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
404    }
405    if (serverName != null && serverName.getStartcode() < 0) {
406      LOG.warn("Invalid log file path=" + logFile);
407      serverName = null;
408    }
409    return serverName;
410  }
411
412  public static boolean isMetaFile(Path p) {
413    return isMetaFile(p.getName());
414  }
415
416  public static boolean isMetaFile(String p) {
417    if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
418      return true;
419    }
420    return false;
421  }
422
423  public static boolean isArchivedLogFile(Path p) {
424    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
425    return p.toString().contains(oldLog);
426  }
427
428  /**
429   * Get the archived WAL file path
430   * @param path - active WAL file path
431   * @param conf - configuration
432   * @return archived path if exists, path - otherwise
433   * @throws IOException exception
434   */
435  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
436    Path walRootDir = FSUtils.getWALRootDir(conf);
437    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
438    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
439      ServerName serverName = getServerNameFromWALDirectoryName(path);
440      if (serverName == null) {
441        LOG.error("Couldn't locate log: " + path);
442        return path;
443      }
444      oldLogDir = new Path(oldLogDir, serverName.getServerName());
445    }
446    Path archivedLogLocation = new Path(oldLogDir, path.getName());
447    final FileSystem fs = FSUtils.getWALFileSystem(conf);
448
449    if (fs.exists(archivedLogLocation)) {
450      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
451      return archivedLogLocation;
452    } else {
453      LOG.error("Couldn't locate log: " + path);
454      return path;
455    }
456  }
457
458  /**
459   * Opens WAL reader with retries and additional exception handling
460   * @param path path to WAL file
461   * @param conf configuration
462   * @return WAL Reader instance
463   * @throws IOException
464   */
465  public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
466      throws IOException
467
468  {
469    long retryInterval = 2000; // 2 sec
470    int maxAttempts = 30;
471    int attempt = 0;
472    Exception ee = null;
473    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
474    while (reader == null && attempt++ < maxAttempts) {
475      try {
476        // Detect if this is a new file, if so get a new reader else
477        // reset the current reader so that we see the new data
478        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
479        return reader;
480      } catch (FileNotFoundException fnfe) {
481        // If the log was archived, continue reading from there
482        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
483        if (!Objects.equals(path, archivedLog)) {
484          return openReader(archivedLog, conf);
485        } else {
486          throw fnfe;
487        }
488      } catch (LeaseNotRecoveredException lnre) {
489        // HBASE-15019 the WAL was not closed due to some hiccup.
490        LOG.warn("Try to recover the WAL lease " + path, lnre);
491        recoverLease(conf, path);
492        reader = null;
493        ee = lnre;
494      } catch (NullPointerException npe) {
495        // Workaround for race condition in HDFS-4380
496        // which throws a NPE if we open a file before any data node has the most recent block
497        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
498        LOG.warn("Got NPE opening reader, will retry.");
499        reader = null;
500        ee = npe;
501      }
502      if (reader == null) {
503        // sleep before next attempt
504        try {
505          Thread.sleep(retryInterval);
506        } catch (InterruptedException e) {
507        }
508      }
509    }
510    throw new IOException("Could not open reader", ee);
511  }
512
513  // For HBASE-15019
514  private static void recoverLease(final Configuration conf, final Path path) {
515    try {
516      final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
517      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
518      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
519        @Override
520        public boolean progress() {
521          LOG.debug("Still trying to recover WAL lease: " + path);
522          return true;
523        }
524      });
525    } catch (IOException e) {
526      LOG.warn("unable to recover lease for WAL: " + path, e);
527    }
528  }
529
530  @Override
531  public void addWALActionsListener(WALActionsListener listener) {
532    listeners.add(listener);
533  }
534
535  /**
536   * Get prefix of the log from its name, assuming WAL name in format of
537   * log_prefix.filenumber.log_suffix
538   * @param name Name of the WAL to parse
539   * @return prefix of the log
540   * @see AbstractFSWAL#getCurrentFileName()
541   */
542  public static String getWALPrefixFromWALName(String name) {
543    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
544    return name.substring(0, endIndex);
545  }
546}