001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Objects;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.locks.ReadWriteLock;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
039import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
040import org.apache.hadoop.hbase.util.CancelableProgressable;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
043import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.yetus.audience.InterfaceStability;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052/**
053 * Base class of a WAL Provider that returns a single thread safe WAL that writes to Hadoop FS. By
054 * default, this implementation picks a directory in Hadoop FS based on a combination of
055 * <ul>
056 * <li>the HBase root directory
057 * <li>HConstants.HREGION_LOGDIR_NAME
058 * <li>the given factory's factoryId (usually identifying the regionserver by host:port)
059 * </ul>
060 * It also uses the providerId to differentiate among files.
061 */
062@InterfaceAudience.Private
063@InterfaceStability.Evolving
064public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider {
065
066  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
067
068  /** Separate old log into different dir by regionserver name **/
069  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
070  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
071
072  // Only public so classes back in regionserver.wal can access
073  public interface Reader extends WAL.Reader {
074    /**
075     * @param fs File system.
076     * @param path Path.
077     * @param c Configuration.
078     * @param s Input stream that may have been pre-opened by the caller; may be null.
079     */
080    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
081  }
082
083  protected volatile T wal;
084  protected WALFactory factory;
085  protected Configuration conf;
086  protected List<WALActionsListener> listeners = new ArrayList<>();
087  protected String providerId;
088  protected AtomicBoolean initialized = new AtomicBoolean(false);
089  // for default wal provider, logPrefix won't change
090  protected String logPrefix;
091
092  /**
093   * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
094   * missing the newly created WAL, see HBASE-21503 for more details.
095   */
096  private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
097
098  /**
099   * @param factory factory that made us, identity used for FS layout. may not be null
100   * @param conf may not be null
101   * @param providerId differentiate between providers from one factory, used for FS layout. may be
102   *          null
103   */
104  @Override
105  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
106    if (!initialized.compareAndSet(false, true)) {
107      throw new IllegalStateException("WALProvider.init should only be called once.");
108    }
109    this.factory = factory;
110    this.conf = conf;
111    this.providerId = providerId;
112    // get log prefix
113    StringBuilder sb = new StringBuilder().append(factory.factoryId);
114    if (providerId != null) {
115      if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
116        sb.append(providerId);
117      } else {
118        sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
119      }
120    }
121    logPrefix = sb.toString();
122    doInit(conf);
123  }
124
125  @Override
126  public List<WAL> getWALs() {
127    if (wal != null) {
128      return Lists.newArrayList(wal);
129    }
130    walCreateLock.readLock().lock();
131    try {
132      if (wal == null) {
133        return Collections.emptyList();
134      } else {
135        return Lists.newArrayList(wal);
136      }
137    } finally {
138      walCreateLock.readLock().unlock();
139    }
140  }
141
142  @Override
143  public T getWAL(RegionInfo region) throws IOException {
144    T walCopy = wal;
145    if (walCopy != null) {
146      return walCopy;
147    }
148    walCreateLock.writeLock().lock();
149    try {
150      walCopy = wal;
151      if (walCopy != null) {
152        return walCopy;
153      }
154      walCopy = createWAL();
155      boolean succ = false;
156      try {
157        walCopy.init();
158        succ = true;
159      } finally {
160        if (!succ) {
161          try {
162            walCopy.close();
163          } catch (Throwable t) {
164            throw new FailedCloseWALAfterInitializedErrorException(
165              "Failed close after init wal failed.", t);
166          }
167        }
168      }
169      wal = walCopy;
170      return walCopy;
171    } finally {
172      walCreateLock.writeLock().unlock();
173    }
174  }
175
176  protected abstract T createWAL() throws IOException;
177
178  protected abstract void doInit(Configuration conf) throws IOException;
179
180  @Override
181  public void shutdown() throws IOException {
182    T log = this.wal;
183    if (log != null) {
184      log.shutdown();
185    }
186  }
187
188  @Override
189  public void close() throws IOException {
190    T log = this.wal;
191    if (log != null) {
192      log.close();
193    }
194  }
195
196  /**
197   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
198   * number of files (rolled and active). if either of them aren't, count 0 for that provider.
199   */
200  @Override
201  public long getNumLogFiles() {
202    T log = this.wal;
203    return log == null ? 0 : log.getNumLogFiles();
204  }
205
206  /**
207   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
208   * size of files (only rolled). if either of them aren't, count 0 for that provider.
209   */
210  @Override
211  public long getLogFileSize() {
212    T log = this.wal;
213    return log == null ? 0 : log.getLogFileSize();
214  }
215
216  /**
217   * returns the number of rolled WAL files.
218   */
219  @VisibleForTesting
220  public static int getNumRolledLogFiles(WAL wal) {
221    return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
222  }
223
224  /**
225   * returns the size of rolled WAL files.
226   */
227  @VisibleForTesting
228  public static long getLogFileSize(WAL wal) {
229    return ((AbstractFSWAL<?>) wal).getLogFileSize();
230  }
231
232  /**
233   * return the current filename from the current wal.
234   */
235  @VisibleForTesting
236  public static Path getCurrentFileName(final WAL wal) {
237    return ((AbstractFSWAL<?>) wal).getCurrentFileName();
238  }
239
240  /**
241   * request a log roll, but don't actually do it.
242   */
243  @VisibleForTesting
244  static void requestLogRoll(final WAL wal) {
245    ((AbstractFSWAL<?>) wal).requestLogRoll();
246  }
247
248  // should be package private; more visible for use in AbstractFSWAL
249  public static final String WAL_FILE_NAME_DELIMITER = ".";
250  /** The hbase:meta region's WAL filename extension */
251  @VisibleForTesting
252  public static final String META_WAL_PROVIDER_ID = ".meta";
253  static final String DEFAULT_PROVIDER_ID = "default";
254
255  // Implementation details that currently leak in tests or elsewhere follow
256  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
257  public static final String SPLITTING_EXT = "-splitting";
258
259  /**
260   * It returns the file create timestamp from the file name. For name format see
261   * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal
262   * @param wal must not be null
263   * @return the file number that is part of the WAL file name
264   */
265  @VisibleForTesting
266  public static long extractFileNumFromWAL(final WAL wal) {
267    final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
268    if (walName == null) {
269      throw new IllegalArgumentException("The WAL path couldn't be null");
270    }
271    final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
272    return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]);
273  }
274
275  /**
276   * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
277   * description.
278   */
279  private static final Pattern pattern =
280    Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
281
282  /**
283   * A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
284   * &lt;file-creation-timestamp&gt;[.meta]. provider-name is usually made up of a server-name and a
285   * provider-id
286   * @param filename name of the file to validate
287   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
288   */
289  public static boolean validateWALFilename(String filename) {
290    return pattern.matcher(filename).matches();
291  }
292
293  /**
294   * Construct the directory name for all WALs on a given server. Dir names currently look like this
295   * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
296   * @param serverName Server name formatted as described in {@link ServerName}
297   * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if
298   *         <code>serverName</code> passed is <code>1.example.org,60030,12345</code>
299   */
300  public static String getWALDirectoryName(final String serverName) {
301    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
302    dirName.append("/");
303    dirName.append(serverName);
304    return dirName.toString();
305  }
306
307  /**
308   * Construct the directory name for all old WALs on a given server. The default old WALs dir looks
309   * like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver to
310   * true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
311   * @param conf
312   * @param serverName Server name formatted as described in {@link ServerName}
313   * @return the relative WAL directory name
314   */
315  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
316    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
317    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
318      dirName.append(Path.SEPARATOR);
319      dirName.append(serverName);
320    }
321    return dirName.toString();
322  }
323
324  /**
325   * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
326   * this method ignores the format of the logfile component. Current format: [base directory for
327   * hbase]/hbase/.logs/ServerName/logfile or [base directory for
328   * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and
329   * server-specific directories.
330   * @return null if it's not a log file. Returns the ServerName of the region server that created
331   *         this log file otherwise.
332   */
333  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
334      throws IOException {
335    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
336      return null;
337    }
338
339    if (conf == null) {
340      throw new IllegalArgumentException("parameter conf must be set");
341    }
342
343    final String rootDir = conf.get(HConstants.HBASE_DIR);
344    if (rootDir == null || rootDir.isEmpty()) {
345      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
346    }
347
348    final StringBuilder startPathSB = new StringBuilder(rootDir);
349    if (!rootDir.endsWith("/")) {
350      startPathSB.append('/');
351    }
352    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
353    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) {
354      startPathSB.append('/');
355    }
356    final String startPath = startPathSB.toString();
357
358    String fullPath;
359    try {
360      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
361    } catch (IllegalArgumentException e) {
362      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
363      return null;
364    }
365
366    if (!fullPath.startsWith(startPath)) {
367      return null;
368    }
369
370    final String serverNameAndFile = fullPath.substring(startPath.length());
371
372    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
373      // Either it's a file (not a directory) or it's not a ServerName format
374      return null;
375    }
376
377    Path p = new Path(path);
378    return getServerNameFromWALDirectoryName(p);
379  }
380
381  /**
382   * This function returns region server name from a log file name which is in one of the following
383   * formats:
384   * <ul>
385   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;-splitting/...</li>
386   * <li>hdfs://&lt;name node&gt;/hbase/.logs/&lt;server name&gt;/...</li>
387   * </ul>
388   * @return null if the passed in logFile isn't a valid WAL file path
389   */
390  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
391    String logDirName = logFile.getParent().getName();
392    // We were passed the directory and not a file in it.
393    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
394      logDirName = logFile.getName();
395    }
396    ServerName serverName = null;
397    if (logDirName.endsWith(SPLITTING_EXT)) {
398      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
399    }
400    try {
401      serverName = ServerName.parseServerName(logDirName);
402    } catch (IllegalArgumentException | IllegalStateException ex) {
403      serverName = null;
404      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
405    }
406    if (serverName != null && serverName.getStartcode() < 0) {
407      LOG.warn("Invalid log file path=" + logFile);
408      serverName = null;
409    }
410    return serverName;
411  }
412
413  public static boolean isMetaFile(Path p) {
414    return isMetaFile(p.getName());
415  }
416
417  public static boolean isMetaFile(String p) {
418    if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
419      return true;
420    }
421    return false;
422  }
423
424  public static boolean isArchivedLogFile(Path p) {
425    String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
426    return p.toString().contains(oldLog);
427  }
428
429  /**
430   * Get the archived WAL file path
431   * @param path - active WAL file path
432   * @param conf - configuration
433   * @return archived path if exists, path - otherwise
434   * @throws IOException exception
435   */
436  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
437    Path rootDir = CommonFSUtils.getWALRootDir(conf);
438    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
439    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
440      ServerName serverName = getServerNameFromWALDirectoryName(path);
441      if (serverName == null) {
442        LOG.error("Couldn't locate log: " + path);
443        return path;
444      }
445      oldLogDir = new Path(oldLogDir, serverName.getServerName());
446    }
447    Path archivedLogLocation = new Path(oldLogDir, path.getName());
448    final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
449
450    if (fs.exists(archivedLogLocation)) {
451      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
452      return archivedLogLocation;
453    } else {
454      LOG.error("Couldn't locate log: " + path);
455      return path;
456    }
457  }
458
459  /**
460   * Opens WAL reader with retries and additional exception handling
461   * @param path path to WAL file
462   * @param conf configuration
463   * @return WAL Reader instance
464   * @throws IOException
465   */
466  public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
467      throws IOException
468
469  {
470    long retryInterval = 2000; // 2 sec
471    int maxAttempts = 30;
472    int attempt = 0;
473    Exception ee = null;
474    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
475    while (reader == null && attempt++ < maxAttempts) {
476      try {
477        // Detect if this is a new file, if so get a new reader else
478        // reset the current reader so that we see the new data
479        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
480        return reader;
481      } catch (FileNotFoundException fnfe) {
482        // If the log was archived, continue reading from there
483        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
484        if (!Objects.equals(path, archivedLog)) {
485          return openReader(archivedLog, conf);
486        } else {
487          throw fnfe;
488        }
489      } catch (LeaseNotRecoveredException lnre) {
490        // HBASE-15019 the WAL was not closed due to some hiccup.
491        LOG.warn("Try to recover the WAL lease " + path, lnre);
492        recoverLease(conf, path);
493        reader = null;
494        ee = lnre;
495      } catch (NullPointerException npe) {
496        // Workaround for race condition in HDFS-4380
497        // which throws a NPE if we open a file before any data node has the most recent block
498        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
499        LOG.warn("Got NPE opening reader, will retry.");
500        reader = null;
501        ee = npe;
502      }
503      if (reader == null) {
504        // sleep before next attempt
505        try {
506          Thread.sleep(retryInterval);
507        } catch (InterruptedException e) {
508        }
509      }
510    }
511    throw new IOException("Could not open reader", ee);
512  }
513
514  // For HBASE-15019
515  private static void recoverLease(final Configuration conf, final Path path) {
516    try {
517      final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
518      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
519        @Override
520        public boolean progress() {
521          LOG.debug("Still trying to recover WAL lease: " + path);
522          return true;
523        }
524      });
525    } catch (IOException e) {
526      LOG.warn("unable to recover lease for WAL: " + path, e);
527    }
528  }
529
530  @Override
531  public void addWALActionsListener(WALActionsListener listener) {
532    listeners.add(listener);
533  }
534
535  /**
536   * Get prefix of the log from its name, assuming WAL name in format of
537   * log_prefix.filenumber.log_suffix
538   * @param name Name of the WAL to parse
539   * @return prefix of the log
540   * @see AbstractFSWAL#getCurrentFileName()
541   */
542  public static String getWALPrefixFromWALName(String name) {
543    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
544    return name.substring(0, endIndex);
545  }
546}